blob: be32b07971e71ea9a904de0a8a6f8179bfd959c0 [file] [log] [blame]
// Copyright (C) 2018, Cloudflare, Inc.
// Copyright (C) 2018, Alessandro Ghedini
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use std::cmp;
use std::collections::hash_map;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use crate::Error;
use crate::Result;
use crate::ranges;
const MAX_WRITE_SIZE: usize = 1000;
/// Keeps track of QUIC streams and enforces stream limits.
#[derive(Default)]
pub struct StreamMap {
/// Map of streams indexed by stream ID.
streams: HashMap<u64, Stream>,
/// Peer's maximum bidirectional stream count limit.
peer_max_streams_bidi: u64,
/// Peer's maximum unidirectional stream count limit.
peer_max_streams_uni: u64,
/// The total number of bidirectional streams opened by the peer.
peer_opened_streams_bidi: u64,
/// The total number of unidirectional streams opened by the peer.
peer_opened_streams_uni: u64,
/// Local maximum bidirectional stream count limit.
local_max_streams_bidi: u64,
local_max_streams_bidi_next: u64,
/// Local maximum unidirectional stream count limit.
local_max_streams_uni: u64,
local_max_streams_uni_next: u64,
/// The total number of bidirectional streams opened by the local endpoint.
local_opened_streams_bidi: u64,
/// The total number of unidirectional streams opened by the local endpoint.
local_opened_streams_uni: u64,
/// Queue of stream IDs corresponding to streams that have buffered data
/// ready to be sent to the peer. This also implies that the stream has
/// enough flow control credits to send at least some of that data.
///
/// Streams are added to the back of the list, and removed from the front.
flushable: VecDeque<u64>,
/// Set of stream IDs corresponding to streams that have outstanding data
/// to read. This is used to generate a `StreamIter` of streams without
/// having to iterate over the full list of streams.
readable: HashSet<u64>,
/// Set of stream IDs corresponding to streams that have enough flow control
/// capacity to be written to, and is not finished. This is used to generate
/// a `StreamIter` of streams without having to iterate over the full list
/// of streams.
writable: HashSet<u64>,
/// Set of stream IDs corresponding to streams that are almost out of flow
/// control credit and need to send MAX_STREAM_DATA. This is used to
/// generate a `StreamIter` of streams without having to iterate over the
/// full list of streams.
almost_full: HashSet<u64>,
}
impl StreamMap {
pub fn new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap {
StreamMap {
local_max_streams_bidi: max_streams_bidi,
local_max_streams_bidi_next: max_streams_bidi,
local_max_streams_uni: max_streams_uni,
local_max_streams_uni_next: max_streams_uni,
..StreamMap::default()
}
}
/// Returns the stream with the given ID if it exists.
pub fn get(&self, id: u64) -> Option<&Stream> {
self.streams.get(&id)
}
/// Returns the mutable stream with the given ID if it exists.
pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
self.streams.get_mut(&id)
}
/// Returns the mutable stream with the given ID if it exists, or creates
/// a new one otherwise.
///
/// The `local` parameter indicates whether the stream's creation was
/// requested by the local application rather than the peer, and is
/// used to validate the requested stream ID, and to select the initial
/// flow control values from the local and remote transport parameters
/// (also passed as arguments).
///
/// This also takes care of enforcing both local and the peer's stream
/// count limits. If one of these limits is violated, the `StreamLimit`
/// error is returned.
pub(crate) fn get_or_create(
&mut self, id: u64, local_params: &crate::TransportParams,
peer_params: &crate::TransportParams, local: bool, is_server: bool,
) -> Result<&mut Stream> {
let stream = match self.streams.entry(id) {
hash_map::Entry::Vacant(v) => {
if local != is_local(id, is_server) {
return Err(Error::InvalidStreamState);
}
let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
// Locally-initiated bidirectional stream.
(true, true) => (
local_params.initial_max_stream_data_bidi_local,
peer_params.initial_max_stream_data_bidi_remote,
),
// Locally-initiated unidirectional stream.
(true, false) => (0, peer_params.initial_max_stream_data_uni),
// Remotely-initiated bidirectional stream.
(false, true) => (
local_params.initial_max_stream_data_bidi_remote,
peer_params.initial_max_stream_data_bidi_local,
),
// Remotely-initiated unidirectional stream.
(false, false) =>
(local_params.initial_max_stream_data_uni, 0),
};
// Enforce stream count limits.
match (is_local(id, is_server), is_bidi(id)) {
(true, true) => {
if self.local_opened_streams_bidi >=
self.peer_max_streams_bidi
{
return Err(Error::StreamLimit);
}
self.local_opened_streams_bidi += 1;
},
(true, false) => {
if self.local_opened_streams_uni >=
self.peer_max_streams_uni
{
return Err(Error::StreamLimit);
}
self.local_opened_streams_uni += 1;
},
(false, true) => {
if self.peer_opened_streams_bidi >=
self.local_max_streams_bidi
{
return Err(Error::StreamLimit);
}
self.peer_opened_streams_bidi += 1;
},
(false, false) => {
if self.peer_opened_streams_uni >=
self.local_max_streams_uni
{
return Err(Error::StreamLimit);
}
self.peer_opened_streams_uni += 1;
},
};
let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local);
v.insert(s)
},
hash_map::Entry::Occupied(v) => v.into_mut(),
};
// Stream might already be writable due to initial flow control limits.
if stream.is_writable() {
self.writable.insert(id);
}
Ok(stream)
}
/// Pushes the stream ID to the back of the flushable streams queue.
///
/// Note that the caller is responsible for checking that the specified
/// stream ID was not in the queue already before calling this.
///
/// Queueing a stream multiple times simultaneously means that it might be
/// unfairly scheduled more often than other streams, and might also cause
/// spurious cycles through the queue, so it should be avoided.
pub fn push_flushable(&mut self, stream_id: u64) {
self.flushable.push_back(stream_id);
}
/// Removes and returns the first stream ID from the flushable streams
/// queue.
///
/// Note that if the stream is still flushable after sending some of its
/// outstanding data, it needs to be added back to the queu.
pub fn pop_flushable(&mut self) -> Option<u64> {
self.flushable.pop_front()
}
/// Adds or removes the stream ID to/from the readable streams set.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
if readable {
self.readable.insert(stream_id);
} else {
self.readable.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the writable streams set.
///
/// This should also be called anytime a new stream is created, in addition
/// to when an existing stream becomes writable (or stops being writable).
///
/// If the stream was already in the list, this does nothing.
pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
if writable {
self.writable.insert(stream_id);
} else {
self.writable.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the almost full streams set.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
if almost_full {
self.almost_full.insert(stream_id);
} else {
self.almost_full.remove(&stream_id);
}
}
/// Updates the peer's maximum bidirectional stream count limit.
pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
}
/// Updates the peer's maximum unidirectional stream count limit.
pub fn update_peer_max_streams_uni(&mut self, v: u64) {
self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
}
/// Commits the new max_streams_bidi limit and returns it.
pub fn update_max_streams_bidi(&mut self) -> u64 {
self.local_max_streams_bidi = self.local_max_streams_bidi_next;
self.local_max_streams_bidi_next
}
/// Commits the new max_streams_uni limit and returns it.
pub fn update_max_streams_uni(&mut self) -> u64 {
self.local_max_streams_uni = self.local_max_streams_uni_next;
self.local_max_streams_uni_next
}
/// Drops completed stream.
///
/// This should only be called when Stream::is_complete() returns true for
/// the given stream.
pub fn collect(&mut self, stream_id: u64, local: bool) {
if !local {
// If the stream was created by the peer, give back a max streams
// credit.
if is_bidi(stream_id) {
self.local_max_streams_bidi_next =
self.local_max_streams_bidi_next.saturating_add(1);
} else {
self.local_max_streams_uni_next =
self.local_max_streams_uni_next.saturating_add(1);
}
}
}
/// Creates an iterator over streams that have outstanding data to read.
pub fn readable(&self) -> StreamIter {
StreamIter::from(&self.readable)
}
/// Creates an iterator over streams that can be written to.
pub fn writable(&self) -> StreamIter {
StreamIter::from(&self.writable)
}
/// Creates an iterator over streams that need to send MAX_STREAM_DATA.
pub fn almost_full(&self) -> StreamIter {
StreamIter::from(&self.almost_full)
}
/// Returns true if there are any streams that have data to write.
pub fn has_flushable(&self) -> bool {
!self.flushable.is_empty()
}
/// Returns true if there are any streams that need to update the local
/// flow control limit.
pub fn has_almost_full(&self) -> bool {
!self.almost_full.is_empty()
}
/// Returns true if the max bidirectional streams count needs to be updated
/// by sending a MAX_STREAMS frame to the peer.
pub fn should_update_max_streams_bidi(&self) -> bool {
self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
self.local_max_streams_bidi_next / 2 >
self.local_max_streams_bidi - self.peer_opened_streams_bidi
}
/// Returns true if the max unidirectional streams count needs to be updated
/// by sending a MAX_STREAMS frame to the peer.
pub fn should_update_max_streams_uni(&self) -> bool {
self.local_max_streams_uni_next != self.local_max_streams_uni &&
self.local_max_streams_uni_next / 2 >
self.local_max_streams_uni - self.peer_opened_streams_uni
}
/// Creates an iterator over all streams.
#[cfg(test)]
pub fn iter_mut(&mut self) -> hash_map::IterMut<u64, Stream> {
self.streams.iter_mut()
}
}
/// A QUIC stream.
#[derive(Default)]
pub struct Stream {
/// Receive-side stream buffer.
pub recv: RecvBuf,
/// Send-side stream buffer.
pub send: SendBuf,
/// Whether the stream is bidirectional.
pub bidi: bool,
/// Whether the stream was created by the local endpoint.
pub local: bool,
}
impl Stream {
/// Creates a new stream with the given flow control limits.
pub fn new(
max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
) -> Stream {
Stream {
recv: RecvBuf::new(max_rx_data),
send: SendBuf::new(max_tx_data),
bidi,
local,
}
}
/// Returns true if the stream has data to read.
pub fn is_readable(&self) -> bool {
self.recv.ready()
}
/// Returns true if the stream has enough flow control capacity to be
/// written to, and is not finished.
pub fn is_writable(&self) -> bool {
!self.send.shutdown &&
!self.send.is_fin() &&
self.send.off < self.send.max_data
}
/// Returns true if the stream has data to send and is allowed to send at
/// least some of it.
pub fn is_flushable(&self) -> bool {
self.send.ready() && self.send.off() < self.send.max_data
}
/// Returns true if the stream is complete.
///
/// For bidirectional streams this happens when both the receive and send
/// sides are complete. That is when all incoming data has been read by the
/// application, and when all outgoing data has been ACKed by the peer.
///
/// For unidirectional streams this happens when either the receive or send
/// side is complete, depending on whether the stream was created locally
/// or not.
pub fn is_complete(&self) -> bool {
match (self.bidi, self.local) {
// For bidirectional streams we need to check both receive and send
// sides for completion.
(true, _) => self.recv.is_fin() && self.send.is_complete(),
// For unidirectional streams generated locally, we only need to
// check the send side for completion.
(false, true) => self.send.is_complete(),
// For unidirectional streams generated by the peer, we only need
// to check the receive side for completion.
(false, false) => self.recv.is_fin(),
}
}
}
/// Returns true if the stream was created locally.
pub fn is_local(stream_id: u64, is_server: bool) -> bool {
(stream_id & 0x1) == (is_server as u64)
}
/// Returns true if the stream is bidirectional.
pub fn is_bidi(stream_id: u64) -> bool {
(stream_id & 0x2) == 0
}
/// An iterator over QUIC streams.
#[derive(Default)]
pub struct StreamIter {
streams: Vec<u64>,
}
impl StreamIter {
fn from(streams: &HashSet<u64>) -> Self {
StreamIter {
streams: streams.iter().copied().collect(),
}
}
}
impl Iterator for StreamIter {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
self.streams.pop()
}
}
impl ExactSizeIterator for StreamIter {
fn len(&self) -> usize {
self.streams.len()
}
}
/// Receive-side stream buffer.
///
/// Stream data received by the peer is buffered in a list of data chunks
/// ordered by offset in ascending order. Contiguous data can then be read
/// into a slice.
#[derive(Default)]
pub struct RecvBuf {
/// Chunks of data received from the peer that have not yet been read by
/// the application, ordered by offset.
data: BinaryHeap<RangeBuf>,
/// The lowest data offset that has yet to be read by the application.
off: u64,
/// The total length of data received on this stream.
len: u64,
/// The maximum offset the peer is allowed to send us.
max_data: u64,
/// The updated maximum offset the peer is allowed to send us.
max_data_next: u64,
/// The final stream offset received from the peer, if any.
fin_off: Option<u64>,
/// Whether incoming data is validated but not buffered.
drain: bool,
}
impl RecvBuf {
/// Creates a new receive buffer.
fn new(max_data: u64) -> RecvBuf {
RecvBuf {
max_data,
max_data_next: max_data,
..RecvBuf::default()
}
}
/// Inserts the given chunk of data in the buffer.
///
/// This also takes care of enforcing stream flow control limits, as well
/// as handling incoming data that overlaps data that is already in the
/// buffer.
pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
if buf.max_off() > self.max_data {
return Err(Error::FlowControl);
}
if let Some(fin_off) = self.fin_off {
// Stream's size is known, forbid data beyond that point.
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
// Stream's size is already known, forbid changing it.
if buf.fin() && fin_off != buf.max_off() {
return Err(Error::FinalSize);
}
}
// Stream's known size is lower than data already received.
if buf.fin() && buf.max_off() < self.len {
return Err(Error::FinalSize);
}
// We already saved the final offset, so there's nothing else we
// need to keep from the RangeBuf if it's empty.
if self.fin_off.is_some() && buf.is_empty() {
return Ok(());
}
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
// No need to store empty buffer that doesn't carry the fin flag.
if !buf.fin() && buf.is_empty() {
return Ok(());
}
// Check if data is fully duplicate, that is the buffer's max offset is
// lower or equal to the offset already stored in the recv buffer.
if self.off >= buf.max_off() {
// An exception is applied to empty range buffers, because an empty
// buffer's max offset matches the max offset of the recv buffer.
//
// By this point all spurious empty buffers should have already been
// discarded, so allowing empty buffers here should be safe.
if !buf.is_empty() {
return Ok(());
}
}
if self.drain {
return Ok(());
}
let mut tmp_buf = Some(buf);
while let Some(mut buf) = tmp_buf {
tmp_buf = None;
// Discard incoming data below current stream offset. Bytes up to
// `self.off` have already been received so we should not buffer
// them again. This is also important to make sure `ready()` doesn't
// get stuck when a buffer with lower offset than the stream's is
// buffered.
if self.off > buf.off() {
buf = buf.split_off((self.off - buf.off()) as usize);
}
for b in &self.data {
// New buffer is fully contained in existing buffer.
if buf.off() >= b.off() && buf.max_off() <= b.max_off() {
return Ok(());
}
// New buffer's start overlaps existing buffer.
if buf.off() >= b.off() && buf.off() < b.max_off() {
buf = buf.split_off((b.max_off() - buf.off()) as usize);
}
// New buffer's end overlaps existing buffer.
if buf.off() < b.off() && buf.max_off() > b.off() {
tmp_buf = Some(buf.split_off((b.off() - buf.off()) as usize));
}
}
self.len = cmp::max(self.len, buf.max_off());
self.data.push(buf);
}
Ok(())
}
/// Writes data from the receive buffer into the given output buffer.
///
/// Only contiguous data is written to the output buffer, starting from
/// offset 0. The offset is incremented as data is read out of the receive
/// buffer into the application buffer. If there is no data at the expected
/// read offset, the `Done` error is returned.
///
/// On success the amount of data read, and a flag indicating if there is
/// no more data in the buffer, are returned as a tuple.
pub fn pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut len = 0;
let mut cap = out.len();
if !self.ready() {
return Err(Error::Done);
}
while cap > 0 && self.ready() {
let mut buf = match self.data.pop() {
Some(v) => v,
None => break,
};
if buf.len() > cap {
let new_buf = RangeBuf {
data: buf.data.split_off(cap),
off: buf.off + cap as u64,
fin: buf.fin,
};
buf.fin = false;
self.data.push(new_buf);
}
out[len..len + buf.len()].copy_from_slice(&buf.data);
self.off += buf.len() as u64;
len += buf.len();
cap -= buf.len();
}
self.max_data_next = self.max_data_next.saturating_add(len as u64);
Ok((len, self.is_fin()))
}
/// Resets the stream at the given offset.
pub fn reset(&mut self, final_size: u64) -> Result<usize> {
// Stream's size is already known, forbid changing it.
if let Some(fin_off) = self.fin_off {
if fin_off != final_size {
return Err(Error::FinalSize);
}
}
// Stream's known size is lower than data already received.
if final_size < self.len {
return Err(Error::FinalSize);
}
self.fin_off = Some(final_size);
// Return how many bytes need to be removed from the connection flow
// control.
Ok((final_size - self.len) as usize)
}
/// Commits the new max_data limit and returns it.
pub fn update_max_data(&mut self) -> u64 {
self.max_data = self.max_data_next;
self.max_data
}
/// Shuts down receiving data.
pub fn shutdown(&mut self) -> Result<()> {
if self.drain {
return Err(Error::Done);
}
self.drain = true;
self.data.clear();
Ok(())
}
/// Returns true if we need to update the local flow control limit.
pub fn almost_full(&self) -> bool {
// Send MAX_STREAM_DATA when the new limit is at least double the
// amount of data that can be received before blocking.
self.fin_off.is_none() &&
self.max_data_next != self.max_data &&
self.max_data_next / 2 > self.max_data - self.len
}
/// Returns true if the receive-side of the stream is complete.
///
/// This happens when the stream's receive final size is known, and the
/// application has read all data from the stream.
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
/// Returns true if the stream has data to be read.
fn ready(&self) -> bool {
let buf = match self.data.peek() {
Some(v) => v,
None => return false,
};
buf.off == self.off
}
}
/// Send-side stream buffer.
///
/// Stream data scheduled to be sent to the peer is buffered in a list of data
/// chunks ordered by offset in ascending order. Contiguous data can then be
/// read into a slice.
///
/// By default, new data is appended at the end of the stream, but data can be
/// inserted at the start of the buffer (this is to allow data that needs to be
/// retransmitted to be re-buffered).
#[derive(Default)]
pub struct SendBuf {
/// Chunks of data to be sent, ordered by offset.
data: BinaryHeap<RangeBuf>,
/// The maximum offset of data buffered in the stream.
off: u64,
/// The amount of data that was ever written to this stream.
len: u64,
/// The maximum offset we are allowed to send to the peer.
max_data: u64,
/// The final stream offset written to the stream, if any.
fin_off: Option<u64>,
/// Whether the stream's send-side has been shut down.
shutdown: bool,
/// Ranges of data offsets that have been ACKed.
acked: ranges::RangeSet,
}
impl SendBuf {
/// Creates a new send buffer.
fn new(max_data: u64) -> SendBuf {
SendBuf {
max_data,
..SendBuf::default()
}
}
/// Inserts the given slice of data at the end of the buffer.
///
/// The number of bytes that were actually stored in the buffer is returned
/// (this may be lower than the size of the input buffer, in case of partial
/// writes).
pub fn push_slice(
&mut self, mut data: &[u8], mut fin: bool,
) -> Result<usize> {
let mut len = 0;
if self.shutdown {
// Since we won't write any more data anyway, pretend that we sent
// all data that was passed in.
return Ok(data.len());
}
if data.is_empty() {
// Create a dummy range buffer, in order to propagate the `fin` flag
// into `RangeBuf::push()`. This will be discarded later on.
let buf = RangeBuf::from(&[], self.off, fin);
return self.push(buf).map(|_| 0);
}
if data.len() > self.cap() {
// Truncate the input buffer according to the stream's capacity.
let len = self.cap();
data = &data[..len];
// We are not buffering the full input, so clear the fin flag.
fin = false;
}
// Split the input buffer into multiple RangeBufs. Otherwise a big
// buffer would need to be split later on when popping data, which
// would cause a partial copy of the buffer.
for chunk in data.chunks(MAX_WRITE_SIZE) {
len += chunk.len();
let fin = len == data.len() && fin;
let buf = RangeBuf::from(chunk, self.off, fin);
self.push(buf)?;
self.off += chunk.len() as u64;
}
Ok(data.len())
}
/// Inserts the given chunk of data in the buffer.
pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
if let Some(fin_off) = self.fin_off {
// Can't write past final offset.
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
// Can't "undo" final offset.
if buf.max_off() == fin_off && !buf.fin() {
return Err(Error::FinalSize);
}
}
if self.shutdown {
return Ok(());
}
// Don't queue data that was already fully ACK'd.
if self.ack_off() >= buf.max_off() {
return Ok(());
}
self.len += buf.len() as u64;
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
// We already recorded the final offset, so we can just discard the
// empty buffer now.
if buf.is_empty() {
return Ok(());
}
self.data.push(buf);
Ok(())
}
/// Returns contiguous data from the send buffer as a single `RangeBuf`.
pub fn pop(&mut self, max_data: usize) -> Result<RangeBuf> {
let mut out = RangeBuf::default();
out.data =
Vec::with_capacity(cmp::min(max_data as u64, self.len) as usize);
let mut out_len = max_data;
let mut out_off = self.data.peek().map_or_else(|| 0, RangeBuf::off);
while out_len > 0 &&
self.ready() &&
self.off() == out_off &&
self.off() < self.max_data
{
let mut buf = match self.data.pop() {
Some(v) => v,
None => break,
};
if buf.len() > out_len || buf.max_off() > self.max_data {
let new_len =
cmp::min(out_len, (self.max_data - buf.off()) as usize);
let new_buf = buf.split_off(new_len);
self.data.push(new_buf);
}
if out.is_empty() {
out.off = buf.off;
}
self.len -= buf.len() as u64;
out_len -= buf.len();
out_off = buf.max_off();
out.data.extend_from_slice(&buf.data);
}
// Override the `fin` flag set for the output buffer by matching the
// buffer's maximum offset against the stream's final offset (if known).
//
// This is more efficient than tracking `fin` using the range buffers
// themselves, and lets us avoid queueing empty buffers just so we can
// propagate the final size.
out.fin = self.fin_off == Some(out.max_off());
Ok(out)
}
/// Updates the max_data limit to the given value.
pub fn update_max_data(&mut self, max_data: u64) {
self.max_data = cmp::max(self.max_data, max_data);
}
/// Increments the ACK'd data offset.
pub fn ack(&mut self, off: u64, len: usize) {
self.acked.insert(off..off + len as u64);
}
/// Shuts down sending data.
pub fn shutdown(&mut self) -> Result<()> {
if self.shutdown {
return Err(Error::Done);
}
self.shutdown = true;
self.data.clear();
Ok(())
}
/// Returns true if all data in the stream has been sent.
///
/// This happens when the stream's send final size is knwon, and the
/// application has already written data up to that point.
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
/// Returns true if the send-side of the stream is complete.
///
/// This happens when the stream's send final size is known, and the peer
/// has already ACKed all stream data up to that point.
pub fn is_complete(&self) -> bool {
if let Some(fin_off) = self.fin_off {
if self.acked == (0..fin_off) {
return true;
}
}
false
}
/// Returns true if there is data to be written.
fn ready(&self) -> bool {
!self.data.is_empty()
}
/// Returns the lowest offset of data buffered.
fn off(&self) -> u64 {
match self.data.peek() {
Some(v) => v.off(),
None => self.off,
}
}
/// Returns the highest contiguously ACKed offset.
fn ack_off(&self) -> u64 {
match self.acked.iter().next() {
// Only consider the initial range if it contiguously covers the
// start of the stream (i.e. from offset 0).
Some(std::ops::Range { start: 0, end }) => end,
Some(_) | None => 0,
}
}
/// Returns the outgoing flow control capacity.
pub fn cap(&self) -> usize {
(self.max_data - self.off) as usize
}
}
/// Buffer holding data at a specific offset.
#[derive(Clone, Debug, Default, Eq)]
pub struct RangeBuf {
data: Vec<u8>,
off: u64,
fin: bool,
}
impl RangeBuf {
/// Creates a new `RangeBuf` from the given slice.
pub(crate) fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
RangeBuf {
data: Vec::from(buf),
off,
fin,
}
}
/// Returns whether `self` holds the final offset in the stream.
pub fn fin(&self) -> bool {
self.fin
}
/// Returns the starting offset of `self`.
pub fn off(&self) -> u64 {
self.off
}
/// Returns the final offset of `self`.
pub fn max_off(&self) -> u64 {
self.off() + self.len() as u64
}
/// Returns the length of `self`.
pub fn len(&self) -> usize {
self.data.len()
}
/// Returns true if `self` has a length of zero bytes.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Splits the buffer into two at the given index.
pub fn split_off(&mut self, at: usize) -> RangeBuf {
let buf = RangeBuf {
data: self.data.split_off(at),
off: self.off + at as u64,
fin: self.fin,
};
self.fin = false;
buf
}
}
impl std::ops::Deref for RangeBuf {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.data
}
}
impl std::ops::DerefMut for RangeBuf {
fn deref_mut(&mut self) -> &mut [u8] {
&mut self.data
}
}
impl Ord for RangeBuf {
fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
// Invert ordering to implement min-heap.
self.off.cmp(&other.off).reverse()
}
}
impl PartialOrd for RangeBuf {
fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for RangeBuf {
fn eq(&self, other: &RangeBuf) -> bool {
self.off == other.off
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn empty_stream_frame() {
let mut recv = RecvBuf::new(15);
assert_eq!(recv.len, 0);
let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let mut buf = [0; 32];
assert_eq!(recv.pop(&mut buf), Ok((5, false)));
// Don't store non-fin empty buffer.
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);
// Check flow control for empty buffer.
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.push(buf), Err(Error::FlowControl));
// Store fin empty buffer.
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
// Don't store additional fin empty buffers.
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
// Don't store additional fin non-empty buffers.
let buf = RangeBuf::from(b"aa", 3, true);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
// Validate final size with fin empty buffers.
let buf = RangeBuf::from(b"", 6, true);
assert_eq!(recv.push(buf), Err(Error::FinalSize));
let buf = RangeBuf::from(b"", 4, true);
assert_eq!(recv.push(buf), Err(Error::FinalSize));
let mut buf = [0; 32];
assert_eq!(recv.pop(&mut buf), Ok((0, true)));
}
#[test]
fn ordered_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, true);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"helloworldsomething");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn split_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
assert_eq!(len, 10);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somethingh");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
let (len, fin) = recv.pop(&mut buf[..5]).unwrap();
assert_eq!(len, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"ellow");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 15);
let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
assert_eq!(len, 4);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"orld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn incomplete_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethinghelloworld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn zero_len_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"", 9, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
}
#[test]
fn past_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"ello", 4, true);
let fourth = RangeBuf::from(b"ello", 5, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.push(third), Err(Error::FinalSize));
assert!(recv.push(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read2() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somehello");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read3() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somhellog");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read_multi() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"somethingsomething", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"hello", 12, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 17);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 18);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somhellogsomhellog");
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 18);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_start_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 8, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 13);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethingello");
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 13);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_end_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"something", 3, true);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 12);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"helsomething");
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 12);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 8, false);
let second = RangeBuf::from(b"something", 0, false);
let third = RangeBuf::from(b"moar", 11, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 15);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethinhelloar");
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 15);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read2() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"aaa", 0, false);
let second = RangeBuf::from(b"bbb", 2, false);
let third = RangeBuf::from(b"ccc", 4, false);
let fourth = RangeBuf::from(b"ddd", 6, false);
let fifth = RangeBuf::from(b"eee", 9, false);
let sixth = RangeBuf::from(b"fff", 11, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 4);
assert!(recv.push(sixth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert!(recv.push(fifth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 14);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"aabbbcdddeefff");
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 14);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn empty_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let write = send.pop(std::usize::MAX).unwrap();
assert_eq!(write.len(), 0);
assert_eq!(write.fin(), false);
}
#[test]
fn multi_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.push_slice(second, true).is_ok());
assert_eq!(send.len, 19);
let write = send.pop(128).unwrap();
assert_eq!(write.len(), 19);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"somethinghelloworld");
assert_eq!(send.len, 0);
}
#[test]
fn split_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.push_slice(second, true).is_ok());
assert_eq!(send.len, 19);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 10);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"somethingh");
assert_eq!(send.len, 9);
let write = send.pop(5).unwrap();
assert_eq!(write.off(), 10);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"ellow");
assert_eq!(send.len, 4);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 15);
assert_eq!(write.len(), 4);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn resend() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off(), 0);
let first = b"something";
let second = b"helloworld";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.off(), 0);
assert!(send.push_slice(second, true).is_ok());
assert_eq!(send.off(), 0);
let write1 = send.pop(4).unwrap();
assert_eq!(write1.off(), 0);
assert_eq!(write1.len(), 4);
assert_eq!(write1.fin(), false);
assert_eq!(&write1[..], b"some");
assert_eq!(send.len, 15);
assert_eq!(send.off(), 4);
let write2 = send.pop(5).unwrap();
assert_eq!(write2.off(), 4);
assert_eq!(write2.len(), 5);
assert_eq!(write2.fin(), false);
assert_eq!(&write2[..], b"thing");
assert_eq!(send.len, 10);
assert_eq!(send.off(), 9);
let write3 = send.pop(5).unwrap();
assert_eq!(write3.off(), 9);
assert_eq!(write3.len(), 5);
assert_eq!(write3.fin(), false);
assert_eq!(&write3[..], b"hello");
assert_eq!(send.len, 5);
assert_eq!(send.off(), 14);
send.push(write2).unwrap();
assert_eq!(send.len, 10);
assert_eq!(send.off(), 4);
send.push(write1).unwrap();
assert_eq!(send.len, 14);
assert_eq!(send.off(), 0);
let write4 = send.pop(11).unwrap();
assert_eq!(write4.off(), 0);
assert_eq!(write4.len(), 9);
assert_eq!(write4.fin(), false);
assert_eq!(&write4[..], b"something");
assert_eq!(send.len, 5);
assert_eq!(send.off(), 14);
let write5 = send.pop(11).unwrap();
assert_eq!(write5.off(), 14);
assert_eq!(write5.len(), 5);
assert_eq!(write5.fin(), true);
assert_eq!(&write5[..], b"world");
assert_eq!(send.len, 0);
assert_eq!(send.off(), 19);
}
#[test]
fn write_blocked_by_off() {
let mut send = SendBuf::default();
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert_eq!(send.push_slice(first, false), Ok(0));
assert_eq!(send.len, 0);
assert_eq!(send.push_slice(second, true), Ok(0));
assert_eq!(send.len, 0);
send.update_max_data(5);
assert_eq!(send.push_slice(first, false), Ok(5));
assert_eq!(send.len, 5);
assert_eq!(send.push_slice(second, true), Ok(0));
assert_eq!(send.len, 5);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"somet");
assert_eq!(send.len, 0);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 0);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"");
assert_eq!(send.len, 0);
send.update_max_data(15);
assert_eq!(send.push_slice(&first[5..], false), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.push_slice(second, true), Ok(6));
assert_eq!(send.len, 10);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 5);
assert_eq!(write.len(), 10);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"hinghellow");
assert_eq!(send.len, 0);
send.update_max_data(25);
assert_eq!(send.push_slice(&second[6..], true), Ok(4));
assert_eq!(send.len, 4);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 15);
assert_eq!(write.len(), 4);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn zero_len_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.push_slice(&[], true).is_ok());
assert_eq!(send.len, 9);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 9);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"something");
assert_eq!(send.len, 0);
}
#[test]
fn recv_flow_control() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.push(second), Ok(()));
assert_eq!(stream.recv.push(first), Ok(()));
assert!(!stream.recv.almost_full());
assert_eq!(stream.recv.push(third), Err(Error::FlowControl));
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(fin, false);
assert!(stream.recv.almost_full());
assert_eq!(stream.recv.update_max_data(), 25);
assert!(!stream.recv.almost_full());
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.push(third), Ok(()));
}
#[test]
fn recv_past_fin() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.push(second), Err(Error::FinalSize));
}
#[test]
fn recv_fin_dup() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.push(second), Ok(()));
let mut buf = [0; 32];
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert_eq!(fin, true);
}
#[test]
fn recv_fin_change() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.push(second), Ok(()));
assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_lower_than_received() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.push(second), Ok(()));
assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_flow_control() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.push(second), Ok(()));
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(fin, true);
assert!(!stream.recv.almost_full());
}
#[test]
fn recv_fin_reset_mismatch() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_dup() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(5), Ok(0));
assert_eq!(stream.recv.reset(5), Ok(0));
}
#[test]
fn recv_reset_change() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(5), Ok(0));
assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_lower_than_received() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(4), Err(Error::FinalSize));
}
#[test]
fn send_flow_control() {
let mut stream = Stream::new(0, 15, true, true);
let first = b"hello";
let second = b"world";
let third = b"something";
assert!(stream.send.push_slice(first, false).is_ok());
assert!(stream.send.push_slice(second, false).is_ok());
assert!(stream.send.push_slice(third, false).is_ok());
let write = stream.send.pop(25).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 15);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"helloworldsomet");
let write = stream.send.pop(25).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 0);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"");
let first = RangeBuf::from(b"helloworldsomet", 0, false);
assert_eq!(stream.send.push(first), Ok(()));
let write = stream.send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 10);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"helloworld");
let write = stream.send.pop(10).unwrap();
assert_eq!(write.off(), 10);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"somet");
}
#[test]
fn send_past_fin() {
let mut stream = Stream::new(0, 15, true, true);
let first = b"hello";
let second = b"world";
let third = b"third";
assert_eq!(stream.send.push_slice(first, false), Ok(5));
assert_eq!(stream.send.push_slice(second, true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.push_slice(third, false), Err(Error::FinalSize));
}
#[test]
fn send_fin_dup() {
let mut stream = Stream::new(0, 15, true, true);
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.send.push(first), Ok(()));
assert_eq!(stream.send.push(second), Ok(()));
}
#[test]
fn send_undo_fin() {
let mut stream = Stream::new(0, 15, true, true);
let first = b"hello";
let second = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.send.push_slice(first, true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.push(second), Err(Error::FinalSize));
}
#[test]
fn send_fin_max_data_match() {
let mut stream = Stream::new(0, 15, true, true);
let slice = b"hellohellohello";
assert!(stream.send.push_slice(slice, true).is_ok());
let write = stream.send.pop(15).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 15);
assert_eq!(write.fin(), true);
assert_eq!(write.data, slice);
}
#[test]
fn send_fin_zero_length() {
let mut stream = Stream::new(0, 15, true, true);
assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
assert_eq!(stream.send.push_slice(b"", true), Ok(0));
assert!(stream.send.is_fin());
let write = stream.send.pop(5).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), true);
assert_eq!(write.data, b"hello");
}
#[test]
fn send_ack() {
let mut stream = Stream::new(0, 15, true, true);
assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
assert_eq!(stream.send.push_slice(b"", true), Ok(0));
assert!(stream.send.is_fin());
let write = stream.send.pop(5).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"hello");
stream.send.ack(write.off(), write.len());
assert_eq!(stream.send.push(write), Ok(()));
let write = stream.send.pop(5).unwrap();
assert_eq!(write.off(), 5);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), true);
assert_eq!(write.data, b"world");
}
#[test]
fn send_ack_reordering() {
let mut stream = Stream::new(0, 15, true, true);
assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
assert_eq!(stream.send.push_slice(b"", true), Ok(0));
assert!(stream.send.is_fin());
let write1 = stream.send.pop(5).unwrap();
assert_eq!(write1.off(), 0);
assert_eq!(write1.len(), 5);
assert_eq!(write1.fin(), false);
assert_eq!(write1.data, b"hello");
let write2 = stream.send.pop(1).unwrap();
assert_eq!(write2.off(), 5);
assert_eq!(write2.len(), 1);
assert_eq!(write2.fin(), false);
assert_eq!(write2.data, b"w");
stream.send.ack(write2.off(), write2.len());
stream.send.ack(write1.off(), write1.len());
assert_eq!(stream.send.push(write1), Ok(()));
assert_eq!(stream.send.push(write2), Ok(()));
let write = stream.send.pop(5).unwrap();
assert_eq!(write.off(), 6);
assert_eq!(write.len(), 4);
assert_eq!(write.fin(), true);
assert_eq!(write.data, b"orld");
}
#[test]
fn recv_data_below_off() {
let mut stream = Stream::new(15, 0, true, true);
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
let mut buf = [0; 10];
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert_eq!(fin, false);
let first = RangeBuf::from(b"elloworld", 1, true);
assert_eq!(stream.recv.push(first), Ok(()));
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"world");
assert_eq!(fin, true);
}
#[test]
fn stream_complete() {
let mut stream = Stream::new(30, 30, true, true);
assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
assert!(!stream.send.is_complete());
assert!(!stream.send.is_fin());
assert_eq!(stream.send.push_slice(b"", true), Ok(0));
assert!(!stream.send.is_complete());
assert!(stream.send.is_fin());
let buf = RangeBuf::from(b"hello", 0, true);
assert!(stream.recv.push(buf).is_ok());
assert!(!stream.recv.is_fin());
stream.send.ack(6, 4);
assert!(!stream.send.is_complete());
let mut buf = [0; 2];
assert_eq!(stream.recv.pop(&mut buf), Ok((2, false)));
assert!(!stream.recv.is_fin());
stream.send.ack(1, 5);
assert!(!stream.send.is_complete());
stream.send.ack(0, 1);
assert!(stream.send.is_complete());
assert!(!stream.is_complete());
let mut buf = [0; 3];
assert_eq!(stream.recv.pop(&mut buf), Ok((3, true)));
assert!(stream.recv.is_fin());
assert!(stream.is_complete());
}
}