blob: 261e21f9ea7f2ba13bb5d97dac877f403b2dc451 [file] [log] [blame]
// Copyright 2016 The Netstack Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipe
// Rx is the receive side of the shared memory ring buffer.
type Rx struct {
p pipe
tail uint64
head uint64
}
// Init initializes the receive end of the pipe. In the initial state, the next
// slot to be inspected is the very first one.
func (r *Rx) Init(b []byte) {
r.p.init(b)
r.tail = 0xfffffffe * jump
r.head = r.tail
}
// Pull reads the next buffer from the pipe, returning nil if there isn't one
// currently available.
//
// The returned slice is available until Flush() is next called. After that, it
// must not be touched.
func (r *Rx) Pull() []byte {
if r.head == r.tail+jump {
// We've already pulled the whole pipe.
return nil
}
header := r.p.readAtomic(r.head)
if header&slotFree != 0 {
// The next slot is free, we can't pull it yet.
return nil
}
payloadSize := header & slotSizeMask
newHead := r.head + payloadToSlotSize(payloadSize)
headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
// Check if this is a wrapping slot. If that's the case, it carries no
// data, so we just skip it and try again from the first slot.
if int64(newHead-headWrap) >= 0 {
if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
return nil
}
if r.tail == r.head {
// If this is the first pull since the last Flush()
// call, we flush the state so that the sender can use
// this space if it needs to.
r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
r.tail = newHead
}
r.head = newHead
return r.Pull()
}
// Grab the buffer before updating r.head.
b := r.p.data(r.head, payloadSize)
r.head = newHead
return b
}
// Flush tells the transmitter that all buffers pulled since the last Flush()
// have been used, so the transmitter is free to used their slots for further
// transmission.
func (r *Rx) Flush() {
if r.head == r.tail {
return
}
r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
r.tail = r.head
}
// Bytes returns the byte slice on which the pipe operates.
func (r *Rx) Bytes() []byte {
return r.p.buffer
}