| // Copyright 2016 The Netstack Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package pipe |
| |
| // Rx is the receive side of the shared memory ring buffer. |
| type Rx struct { |
| p pipe |
| |
| tail uint64 |
| head uint64 |
| } |
| |
| // Init initializes the receive end of the pipe. In the initial state, the next |
| // slot to be inspected is the very first one. |
| func (r *Rx) Init(b []byte) { |
| r.p.init(b) |
| r.tail = 0xfffffffe * jump |
| r.head = r.tail |
| } |
| |
| // Pull reads the next buffer from the pipe, returning nil if there isn't one |
| // currently available. |
| // |
| // The returned slice is available until Flush() is next called. After that, it |
| // must not be touched. |
| func (r *Rx) Pull() []byte { |
| if r.head == r.tail+jump { |
| // We've already pulled the whole pipe. |
| return nil |
| } |
| |
| header := r.p.readAtomic(r.head) |
| if header&slotFree != 0 { |
| // The next slot is free, we can't pull it yet. |
| return nil |
| } |
| |
| payloadSize := header & slotSizeMask |
| newHead := r.head + payloadToSlotSize(payloadSize) |
| headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer)) |
| |
| // Check if this is a wrapping slot. If that's the case, it carries no |
| // data, so we just skip it and try again from the first slot. |
| if int64(newHead-headWrap) >= 0 { |
| if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 { |
| return nil |
| } |
| |
| if r.tail == r.head { |
| // If this is the first pull since the last Flush() |
| // call, we flush the state so that the sender can use |
| // this space if it needs to. |
| r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head)) |
| r.tail = newHead |
| } |
| |
| r.head = newHead |
| return r.Pull() |
| } |
| |
| // Grab the buffer before updating r.head. |
| b := r.p.data(r.head, payloadSize) |
| r.head = newHead |
| return b |
| } |
| |
| // Flush tells the transmitter that all buffers pulled since the last Flush() |
| // have been used, so the transmitter is free to used their slots for further |
| // transmission. |
| func (r *Rx) Flush() { |
| if r.head == r.tail { |
| return |
| } |
| r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail)) |
| r.tail = r.head |
| } |
| |
| // Bytes returns the byte slice on which the pipe operates. |
| func (r *Rx) Bytes() []byte { |
| return r.p.buffer |
| } |