blob: c06c6be8b41e5e0dbbf1e0ad4bbd7d253e57a1ad [file] [log] [blame]
// Copyright (C) 2018-2019, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use std::cmp;
use std::sync::Arc;
use std::sync::RwLock;
use std::collections::hash_map;
use std::collections::BTreeMap;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use crate::Error;
use crate::Result;
use crate::ranges;
const DEFAULT_URGENCY: u8 = 127;
#[cfg(test)]
const SEND_BUFFER_SIZE: usize = 5;
#[cfg(not(test))]
const SEND_BUFFER_SIZE: usize = 4096;
/// Keeps track of QUIC streams and enforces stream limits.
#[derive(Default)]
pub struct StreamMap {
/// Map of streams indexed by stream ID.
streams: HashMap<u64, Stream>,
/// Set of streams that were completed and garbage collected.
///
/// Instead of keeping the full stream state forever, we collect completed
/// streams to save memory, but we still need to keep track of previously
/// created streams, to prevent peers from re-creating them.
collected: HashSet<u64>,
/// Peer's maximum bidirectional stream count limit.
peer_max_streams_bidi: u64,
/// Peer's maximum unidirectional stream count limit.
peer_max_streams_uni: u64,
/// The total number of bidirectional streams opened by the peer.
peer_opened_streams_bidi: u64,
/// The total number of unidirectional streams opened by the peer.
peer_opened_streams_uni: u64,
/// Local maximum bidirectional stream count limit.
local_max_streams_bidi: u64,
local_max_streams_bidi_next: u64,
/// Local maximum unidirectional stream count limit.
local_max_streams_uni: u64,
local_max_streams_uni_next: u64,
/// The total number of bidirectional streams opened by the local endpoint.
local_opened_streams_bidi: u64,
/// The total number of unidirectional streams opened by the local endpoint.
local_opened_streams_uni: u64,
/// Queue of stream IDs corresponding to streams that have buffered data
/// ready to be sent to the peer. This also implies that the stream has
/// enough flow control credits to send at least some of that data.
///
/// Streams are grouped by their priority, where each urgency level has two
/// queues, one for non-incremental streams and one for incremental ones.
///
/// Streams with lower urgency level are scheduled first, and within the
/// same urgency level Non-incremental streams are scheduled first, in the
/// order of their stream IDs, and incremental streams are scheduled in a
/// round-robin fashion after all non-incremental streams have been flushed.
flushable: BTreeMap<u8, (BinaryHeap<std::cmp::Reverse<u64>>, VecDeque<u64>)>,
/// Set of stream IDs corresponding to streams that have outstanding data
/// to read. This is used to generate a `StreamIter` of streams without
/// having to iterate over the full list of streams.
readable: HashSet<u64>,
/// Set of stream IDs corresponding to streams that have enough flow control
/// capacity to be written to, and is not finished. This is used to generate
/// a `StreamIter` of streams without having to iterate over the full list
/// of streams.
writable: HashSet<u64>,
/// Set of stream IDs corresponding to streams that are almost out of flow
/// control credit and need to send MAX_STREAM_DATA. This is used to
/// generate a `StreamIter` of streams without having to iterate over the
/// full list of streams.
almost_full: HashSet<u64>,
/// Set of stream IDs corresponding to streams that are blocked. The value
/// of the map elements represents the offset of the stream at which the
/// blocking occurred.
blocked: HashMap<u64, u64>,
/// Set of stream IDs corresponding to streams that are reset. The value
/// of the map elements is a tuple of the error code and final size values
/// to include in the RESET_STREAM frame.
reset: HashMap<u64, (u64, u64)>,
/// Set of stream IDs corresponding to streams that are shutdown on the
/// receive side, and need to send a STOP_SENDING frame. The value of the
/// map elements is the error code to include in the STOP_SENDING frame.
stopped: HashMap<u64, u64>,
}
impl StreamMap {
pub fn new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap {
StreamMap {
local_max_streams_bidi: max_streams_bidi,
local_max_streams_bidi_next: max_streams_bidi,
local_max_streams_uni: max_streams_uni,
local_max_streams_uni_next: max_streams_uni,
..StreamMap::default()
}
}
/// Returns the stream with the given ID if it exists.
pub fn get(&self, id: u64) -> Option<&Stream> {
self.streams.get(&id)
}
/// Returns the mutable stream with the given ID if it exists.
pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
self.streams.get_mut(&id)
}
/// Returns the mutable stream with the given ID if it exists, or creates
/// a new one otherwise.
///
/// The `local` parameter indicates whether the stream's creation was
/// requested by the local application rather than the peer, and is
/// used to validate the requested stream ID, and to select the initial
/// flow control values from the local and remote transport parameters
/// (also passed as arguments).
///
/// This also takes care of enforcing both local and the peer's stream
/// count limits. If one of these limits is violated, the `StreamLimit`
/// error is returned.
pub(crate) fn get_or_create(
&mut self, id: u64, local_params: &crate::TransportParams,
peer_params: &crate::TransportParams, local: bool, is_server: bool,
) -> Result<&mut Stream> {
let stream = match self.streams.entry(id) {
hash_map::Entry::Vacant(v) => {
// Stream has already been closed and garbage collected.
if self.collected.contains(&id) {
return Err(Error::Done);
}
if local != is_local(id, is_server) {
return Err(Error::InvalidStreamState);
}
let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
// Locally-initiated bidirectional stream.
(true, true) => (
local_params.initial_max_stream_data_bidi_local,
peer_params.initial_max_stream_data_bidi_remote,
),
// Locally-initiated unidirectional stream.
(true, false) => (0, peer_params.initial_max_stream_data_uni),
// Remotely-initiated bidirectional stream.
(false, true) => (
local_params.initial_max_stream_data_bidi_remote,
peer_params.initial_max_stream_data_bidi_local,
),
// Remotely-initiated unidirectional stream.
(false, false) =>
(local_params.initial_max_stream_data_uni, 0),
};
// Enforce stream count limits.
match (is_local(id, is_server), is_bidi(id)) {
(true, true) => {
if self.local_opened_streams_bidi >=
self.peer_max_streams_bidi
{
return Err(Error::StreamLimit);
}
self.local_opened_streams_bidi += 1;
},
(true, false) => {
if self.local_opened_streams_uni >=
self.peer_max_streams_uni
{
return Err(Error::StreamLimit);
}
self.local_opened_streams_uni += 1;
},
(false, true) => {
if self.peer_opened_streams_bidi >=
self.local_max_streams_bidi
{
return Err(Error::StreamLimit);
}
self.peer_opened_streams_bidi += 1;
},
(false, false) => {
if self.peer_opened_streams_uni >=
self.local_max_streams_uni
{
return Err(Error::StreamLimit);
}
self.peer_opened_streams_uni += 1;
},
};
let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local);
v.insert(s)
},
hash_map::Entry::Occupied(v) => v.into_mut(),
};
// Stream might already be writable due to initial flow control limits.
if stream.is_writable() {
self.writable.insert(id);
}
Ok(stream)
}
/// Pushes the stream ID to the back of the flushable streams queue with
/// the specified urgency.
///
/// Note that the caller is responsible for checking that the specified
/// stream ID was not in the queue already before calling this.
///
/// Queueing a stream multiple times simultaneously means that it might be
/// unfairly scheduled more often than other streams, and might also cause
/// spurious cycles through the queue, so it should be avoided.
pub fn push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool) {
// Push the element to the back of the queue corresponding to the given
// urgency. If the queue doesn't exist yet, create it first.
let queues = self
.flushable
.entry(urgency)
.or_insert_with(|| (BinaryHeap::new(), VecDeque::new()));
if !incr {
// Non-incremental streams are scheduled in order of their stream ID.
queues.0.push(std::cmp::Reverse(stream_id))
} else {
// Incremental streams are scheduled in a round-robin fashion.
queues.1.push_back(stream_id)
};
}
/// Removes and returns the first stream ID from the flushable streams
/// queue with the specified urgency.
///
/// Note that if the stream is still flushable after sending some of its
/// outstanding data, it needs to be added back to the queue.
pub fn pop_flushable(&mut self) -> Option<u64> {
// Remove the first element from the queue corresponding to the lowest
// urgency that has elements.
let (node, clear) =
if let Some((urgency, queues)) = self.flushable.iter_mut().next() {
let node = if !queues.0.is_empty() {
queues.0.pop().map(|x| x.0)
} else {
queues.1.pop_front()
};
let clear = if queues.0.is_empty() && queues.1.is_empty() {
Some(*urgency)
} else {
None
};
(node, clear)
} else {
(None, None)
};
// Remove the queue from the list of queues if it is now empty, so that
// the next time `pop_flushable()` is called the next queue with elements
// is used.
if let Some(urgency) = &clear {
self.flushable.remove(urgency);
}
node
}
/// Adds or removes the stream ID to/from the readable streams set.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
if readable {
self.readable.insert(stream_id);
} else {
self.readable.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the writable streams set.
///
/// This should also be called anytime a new stream is created, in addition
/// to when an existing stream becomes writable (or stops being writable).
///
/// If the stream was already in the list, this does nothing.
pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
if writable {
self.writable.insert(stream_id);
} else {
self.writable.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the almost full streams set.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
if almost_full {
self.almost_full.insert(stream_id);
} else {
self.almost_full.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the blocked streams set with the
/// given offset value.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64) {
if blocked {
self.blocked.insert(stream_id, off);
} else {
self.blocked.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the reset streams set with the
/// given error code and final size values.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_reset(
&mut self, stream_id: u64, reset: bool, error_code: u64, final_size: u64,
) {
if reset {
self.reset.insert(stream_id, (error_code, final_size));
} else {
self.reset.remove(&stream_id);
}
}
/// Adds or removes the stream ID to/from the stopped streams set with the
/// given error code.
///
/// If the stream was already in the list, this does nothing.
pub fn mark_stopped(
&mut self, stream_id: u64, stopped: bool, error_code: u64,
) {
if stopped {
self.stopped.insert(stream_id, error_code);
} else {
self.stopped.remove(&stream_id);
}
}
/// Updates the peer's maximum bidirectional stream count limit.
pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
}
/// Updates the peer's maximum unidirectional stream count limit.
pub fn update_peer_max_streams_uni(&mut self, v: u64) {
self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
}
/// Commits the new max_streams_bidi limit.
pub fn update_max_streams_bidi(&mut self) {
self.local_max_streams_bidi = self.local_max_streams_bidi_next;
}
/// Returns the new max_streams_bidi limit.
pub fn max_streams_bidi_next(&mut self) -> u64 {
self.local_max_streams_bidi_next
}
/// Commits the new max_streams_uni limit.
pub fn update_max_streams_uni(&mut self) {
self.local_max_streams_uni = self.local_max_streams_uni_next;
}
/// Returns the new max_streams_uni limit.
pub fn max_streams_uni_next(&mut self) -> u64 {
self.local_max_streams_uni_next
}
/// Returns the number of bidirectional streams that can be created
/// before the peer's stream count limit is reached.
pub fn peer_streams_left_bidi(&self) -> u64 {
self.peer_max_streams_bidi - self.local_opened_streams_bidi
}
/// Returns the number of unidirectional streams that can be created
/// before the peer's stream count limit is reached.
pub fn peer_streams_left_uni(&self) -> u64 {
self.peer_max_streams_uni - self.local_opened_streams_uni
}
/// Drops completed stream.
///
/// This should only be called when Stream::is_complete() returns true for
/// the given stream.
pub fn collect(&mut self, stream_id: u64, local: bool) {
if !local {
// If the stream was created by the peer, give back a max streams
// credit.
if is_bidi(stream_id) {
self.local_max_streams_bidi_next =
self.local_max_streams_bidi_next.saturating_add(1);
} else {
self.local_max_streams_uni_next =
self.local_max_streams_uni_next.saturating_add(1);
}
}
self.streams.remove(&stream_id);
self.collected.insert(stream_id);
}
/// Creates an iterator over streams that have outstanding data to read.
pub fn readable(&self) -> StreamIter {
StreamIter::from(&self.readable)
}
/// Creates an iterator over streams that can be written to.
pub fn writable(&self) -> StreamIter {
StreamIter::from(&self.writable)
}
/// Creates an iterator over streams that need to send MAX_STREAM_DATA.
pub fn almost_full(&self) -> StreamIter {
StreamIter::from(&self.almost_full)
}
/// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
self.blocked.iter()
}
/// Creates an iterator over streams that need to send RESET_STREAM.
pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
self.reset.iter()
}
/// Creates an iterator over streams that need to send STOP_SENDING.
pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
self.stopped.iter()
}
/// Returns true if there are any streams that have data to write.
pub fn has_flushable(&self) -> bool {
!self.flushable.is_empty()
}
/// Returns true if there are any streams that have data to read.
pub fn has_readable(&self) -> bool {
!self.readable.is_empty()
}
/// Returns true if there are any streams that need to update the local
/// flow control limit.
pub fn has_almost_full(&self) -> bool {
!self.almost_full.is_empty()
}
/// Returns true if there are any streams that are blocked.
pub fn has_blocked(&self) -> bool {
!self.blocked.is_empty()
}
/// Returns true if there are any streams that are reset.
pub fn has_reset(&self) -> bool {
!self.reset.is_empty()
}
/// Returns true if there are any streams that need to send STOP_SENDING.
pub fn has_stopped(&self) -> bool {
!self.stopped.is_empty()
}
/// Returns true if the max bidirectional streams count needs to be updated
/// by sending a MAX_STREAMS frame to the peer.
pub fn should_update_max_streams_bidi(&self) -> bool {
self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
self.local_max_streams_bidi_next / 2 >
self.local_max_streams_bidi - self.peer_opened_streams_bidi
}
/// Returns true if the max unidirectional streams count needs to be updated
/// by sending a MAX_STREAMS frame to the peer.
pub fn should_update_max_streams_uni(&self) -> bool {
self.local_max_streams_uni_next != self.local_max_streams_uni &&
self.local_max_streams_uni_next / 2 >
self.local_max_streams_uni - self.peer_opened_streams_uni
}
/// Returns the number of active streams in the map.
#[cfg(test)]
pub fn len(&self) -> usize {
self.streams.len()
}
}
/// A QUIC stream.
#[derive(Default)]
pub struct Stream {
/// Receive-side stream buffer.
pub recv: RecvBuf,
/// Send-side stream buffer.
pub send: SendBuf,
/// Whether the stream is bidirectional.
pub bidi: bool,
/// Whether the stream was created by the local endpoint.
pub local: bool,
/// Application data.
pub data: Option<Box<dyn std::any::Any + Send + Sync>>,
/// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
pub urgency: u8,
/// Whether the stream can be flushed incrementally. Default is `true`.
pub incremental: bool,
}
impl Stream {
/// Creates a new stream with the given flow control limits.
pub fn new(
max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
) -> Stream {
Stream {
recv: RecvBuf::new(max_rx_data),
send: SendBuf::new(max_tx_data),
bidi,
local,
data: None,
urgency: DEFAULT_URGENCY,
incremental: true,
}
}
/// Returns true if the stream has data to read.
pub fn is_readable(&self) -> bool {
self.recv.ready()
}
/// Returns true if the stream has enough flow control capacity to be
/// written to, and is not finished.
pub fn is_writable(&self) -> bool {
!self.send.shutdown &&
!self.send.is_fin() &&
self.send.off < self.send.max_data
}
/// Returns true if the stream has data to send and is allowed to send at
/// least some of it.
pub fn is_flushable(&self) -> bool {
self.send.ready() && self.send.off_front() < self.send.max_data
}
/// Returns true if the stream is complete.
///
/// For bidirectional streams this happens when both the receive and send
/// sides are complete. That is when all incoming data has been read by the
/// application, and when all outgoing data has been acked by the peer.
///
/// For unidirectional streams this happens when either the receive or send
/// side is complete, depending on whether the stream was created locally
/// or not.
pub fn is_complete(&self) -> bool {
match (self.bidi, self.local) {
// For bidirectional streams we need to check both receive and send
// sides for completion.
(true, _) => self.recv.is_fin() && self.send.is_complete(),
// For unidirectional streams generated locally, we only need to
// check the send side for completion.
(false, true) => self.send.is_complete(),
// For unidirectional streams generated by the peer, we only need
// to check the receive side for completion.
(false, false) => self.recv.is_fin(),
}
}
}
/// Returns true if the stream was created locally.
pub fn is_local(stream_id: u64, is_server: bool) -> bool {
(stream_id & 0x1) == (is_server as u64)
}
/// Returns true if the stream is bidirectional.
pub fn is_bidi(stream_id: u64) -> bool {
(stream_id & 0x2) == 0
}
/// An iterator over QUIC streams.
#[derive(Default)]
pub struct StreamIter {
streams: Vec<u64>,
}
impl StreamIter {
#[inline]
fn from(streams: &HashSet<u64>) -> Self {
StreamIter {
streams: streams.iter().copied().collect(),
}
}
}
impl Iterator for StreamIter {
type Item = u64;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.streams.pop()
}
}
impl ExactSizeIterator for StreamIter {
#[inline]
fn len(&self) -> usize {
self.streams.len()
}
}
/// Receive-side stream buffer.
///
/// Stream data received by the peer is buffered in a list of data chunks
/// ordered by offset in ascending order. Contiguous data can then be read
/// into a slice.
#[derive(Debug, Default)]
pub struct RecvBuf {
/// Chunks of data received from the peer that have not yet been read by
/// the application, ordered by offset.
data: BinaryHeap<RangeBuf>,
/// The lowest data offset that has yet to be read by the application.
off: u64,
/// The total length of data received on this stream.
len: u64,
/// The maximum offset the peer is allowed to send us.
max_data: u64,
/// The updated maximum offset the peer is allowed to send us.
max_data_next: u64,
/// The final stream offset received from the peer, if any.
fin_off: Option<u64>,
/// Whether incoming data is validated but not buffered.
drain: bool,
}
impl RecvBuf {
/// Creates a new receive buffer.
fn new(max_data: u64) -> RecvBuf {
RecvBuf {
max_data,
max_data_next: max_data,
..RecvBuf::default()
}
}
/// Inserts the given chunk of data in the buffer.
///
/// This also takes care of enforcing stream flow control limits, as well
/// as handling incoming data that overlaps data that is already in the
/// buffer.
pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
if buf.max_off() > self.max_data {
return Err(Error::FlowControl);
}
if let Some(fin_off) = self.fin_off {
// Stream's size is known, forbid data beyond that point.
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
// Stream's size is already known, forbid changing it.
if buf.fin() && fin_off != buf.max_off() {
return Err(Error::FinalSize);
}
}
// Stream's known size is lower than data already received.
if buf.fin() && buf.max_off() < self.len {
return Err(Error::FinalSize);
}
// We already saved the final offset, so there's nothing else we
// need to keep from the RangeBuf if it's empty.
if self.fin_off.is_some() && buf.is_empty() {
return Ok(());
}
// No need to process an empty buffer with the fin flag, if we already
// know the final size.
if buf.fin() && buf.is_empty() && self.fin_off.is_some() {
return Ok(());
}
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
// No need to store empty buffer that doesn't carry the fin flag.
if !buf.fin() && buf.is_empty() {
return Ok(());
}
// Check if data is fully duplicate, that is the buffer's max offset is
// lower or equal to the offset already stored in the recv buffer.
if self.off >= buf.max_off() {
// An exception is applied to empty range buffers, because an empty
// buffer's max offset matches the max offset of the recv buffer.
//
// By this point all spurious empty buffers should have already been
// discarded, so allowing empty buffers here should be safe.
if !buf.is_empty() {
return Ok(());
}
}
if self.drain {
return Ok(());
}
let mut tmp_buf = Some(buf);
while let Some(mut buf) = tmp_buf {
tmp_buf = None;
// Discard incoming data below current stream offset. Bytes up to
// `self.off` have already been received so we should not buffer
// them again. This is also important to make sure `ready()` doesn't
// get stuck when a buffer with lower offset than the stream's is
// buffered.
if self.off > buf.off() {
buf = buf.split_off((self.off - buf.off()) as usize);
}
for b in &self.data {
// New buffer is fully contained in existing buffer.
if buf.off() >= b.off() && buf.max_off() <= b.max_off() {
return Ok(());
}
// New buffer's start overlaps existing buffer.
if buf.off() >= b.off() && buf.off() < b.max_off() {
buf = buf.split_off((b.max_off() - buf.off()) as usize);
}
// New buffer's end overlaps existing buffer.
if buf.off() < b.off() && buf.max_off() > b.off() {
tmp_buf = Some(buf.split_off((b.off() - buf.off()) as usize));
}
}
self.len = cmp::max(self.len, buf.max_off());
self.data.push(buf);
}
Ok(())
}
/// Writes data from the receive buffer into the given output buffer.
///
/// Only contiguous data is written to the output buffer, starting from
/// offset 0. The offset is incremented as data is read out of the receive
/// buffer into the application buffer. If there is no data at the expected
/// read offset, the `Done` error is returned.
///
/// On success the amount of data read, and a flag indicating if there is
/// no more data in the buffer, are returned as a tuple.
pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut len = 0;
let mut cap = out.len();
if !self.ready() {
return Err(Error::Done);
}
while cap > 0 && self.ready() {
let mut buf = match self.data.peek_mut() {
Some(v) => v,
None => break,
};
let buf_len = cmp::min(buf.len(), cap);
buf.with(|s| out[len..len + buf_len].copy_from_slice(&s[..buf_len]));
self.off += buf_len as u64;
len += buf_len;
cap -= buf_len;
if buf_len < buf.len() {
buf.consume(buf_len);
// We reached the maximum capacity, so end here.
break;
}
std::collections::binary_heap::PeekMut::pop(buf);
}
self.max_data_next = self.max_data_next.saturating_add(len as u64);
Ok((len, self.is_fin()))
}
/// Resets the stream at the given offset.
pub fn reset(&mut self, final_size: u64) -> Result<usize> {
// Stream's size is already known, forbid changing it.
if let Some(fin_off) = self.fin_off {
if fin_off != final_size {
return Err(Error::FinalSize);
}
}
// Stream's known size is lower than data already received.
if final_size < self.len {
return Err(Error::FinalSize);
}
self.fin_off = Some(final_size);
// Return how many bytes need to be removed from the connection flow
// control.
Ok((final_size - self.len) as usize)
}
/// Commits the new max_data limit.
pub fn update_max_data(&mut self) {
self.max_data = self.max_data_next;
}
/// Return the new max_data limit.
pub fn max_data_next(&mut self) -> u64 {
self.max_data_next
}
/// Shuts down receiving data.
pub fn shutdown(&mut self) -> Result<()> {
if self.drain {
return Err(Error::Done);
}
self.drain = true;
self.data.clear();
self.off = self.max_off();
Ok(())
}
/// Returns the lowest offset of data buffered.
#[allow(dead_code)]
pub fn off_front(&self) -> u64 {
self.off
}
/// Returns true if we need to update the local flow control limit.
pub fn almost_full(&self) -> bool {
// Send MAX_STREAM_DATA when the new limit is at least double the
// amount of data that can be received before blocking.
self.fin_off.is_none() &&
self.max_data_next != self.max_data &&
self.max_data_next / 2 > self.max_data - self.len
}
/// Returns the largest offset ever received.
pub fn max_off(&self) -> u64 {
self.len
}
/// Returns true if the receive-side of the stream is complete.
///
/// This happens when the stream's receive final size is known, and the
/// application has read all data from the stream.
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
/// Returns true if the stream has data to be read.
fn ready(&self) -> bool {
let buf = match self.data.peek() {
Some(v) => v,
None => return false,
};
buf.off() == self.off
}
}
/// Send-side stream buffer.
///
/// Stream data scheduled to be sent to the peer is buffered in a list of data
/// chunks ordered by offset in ascending order. Contiguous data can then be
/// read into a slice.
///
/// By default, new data is appended at the end of the stream, but data can be
/// inserted at the start of the buffer (this is to allow data that needs to be
/// retransmitted to be re-buffered).
#[derive(Debug, Default)]
pub struct SendBuf {
/// Chunks of data to be sent, ordered by offset.
data: VecDeque<RangeBuf>,
/// The index of the buffer that needs to be sent next.
pos: usize,
/// The maximum offset of data buffered in the stream.
off: u64,
/// The amount of data currently buffered.
len: u64,
/// The maximum offset we are allowed to send to the peer.
max_data: u64,
/// The final stream offset written to the stream, if any.
fin_off: Option<u64>,
/// Whether the stream's send-side has been shut down.
shutdown: bool,
/// Ranges of data offsets that have been acked.
acked: ranges::RangeSet,
/// The error code received via STOP_SENDING.
error: Option<u64>,
}
impl SendBuf {
/// Creates a new send buffer.
fn new(max_data: u64) -> SendBuf {
SendBuf {
max_data,
..SendBuf::default()
}
}
/// Inserts the given slice of data at the end of the buffer.
///
/// The number of bytes that were actually stored in the buffer is returned
/// (this may be lower than the size of the input buffer, in case of partial
/// writes).
pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
let max_off = self.off + data.len() as u64;
// Get the stream send capacity. This will return an error if the stream
// was stopped.
let capacity = self.cap()?;
if data.len() > capacity {
// Truncate the input buffer according to the stream's capacity.
let len = capacity;
data = &data[..len];
// We are not buffering the full input, so clear the fin flag.
fin = false;
}
if let Some(fin_off) = self.fin_off {
// Can't write past final offset.
if max_off > fin_off {
return Err(Error::FinalSize);
}
// Can't "undo" final offset.
if max_off == fin_off && !fin {
return Err(Error::FinalSize);
}
}
if fin {
self.fin_off = Some(max_off);
}
// Don't queue data that was already fully acked.
if self.ack_off() >= max_off {
return Ok(data.len());
}
// We already recorded the final offset, so we can just discard the
// empty buffer now.
if data.is_empty() {
return Ok(data.len());
}
let mut len = 0;
// Try to fill the last buffer in the queue first, if it has capacity.
if let Some(back) = self.data.back_mut() {
let spare = back.spare_capacity();
if spare > 0 {
len = cmp::min(spare, data.len());
back.extend_from_slice(&data[..len])?;
back.fin = len == data.len() && fin;
self.off += len as u64;
self.len += len as u64;
data = &data[len..];
if !back.is_empty() {
self.pos = cmp::min(self.pos, self.data.len() - 1);
}
}
}
// Split the remaining input data into consistently-sized buffers to
// avoid fragmentation.
for chunk in data.chunks(SEND_BUFFER_SIZE) {
len += chunk.len();
let fin = len == data.len() && fin;
let buf = RangeBuf::from_with_capacity(
chunk,
self.off,
fin,
SEND_BUFFER_SIZE,
)?;
// The new data can simply be appended at the end of the send buffer.
self.data.push_back(buf);
self.off += chunk.len() as u64;
self.len += chunk.len() as u64;
}
Ok(len)
}
/// Writes data from the send buffer into the given output buffer.
pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut out_len = out.len();
let out_off = self.off_front();
let mut next_off = out_off;
while out_len > 0 &&
self.ready() &&
self.off_front() == next_off &&
self.off_front() < self.max_data
{
let buf = match self.data.get_mut(self.pos) {
Some(v) => v,
None => break,
};
if buf.is_empty() {
self.pos += 1;
continue;
}
let buf_len = cmp::min(buf.len(), out_len);
// Copy data to the output buffer.
let out_pos = (next_off - out_off) as usize;
buf.with(|s| {
(&mut out[out_pos..out_pos + buf_len])
.copy_from_slice(&s[..buf_len])
});
self.len -= buf_len as u64;
out_len -= buf_len;
next_off = buf.off() + buf_len as u64;
if !buf.is_empty() && buf_len < buf.len() {
buf.consume(buf_len);
// We reached the maximum capacity, so end here.
break;
}
buf.consume(buf_len);
self.pos += 1;
}
// Override the `fin` flag set for the output buffer by matching the
// buffer's maximum offset against the stream's final offset (if known).
//
// This is more efficient than tracking `fin` using the range buffers
// themselves, and lets us avoid queueing empty buffers just so we can
// propagate the final size.
let fin = self.fin_off == Some(next_off);
Ok((out.len() - out_len, fin))
}
/// Updates the max_data limit to the given value.
pub fn update_max_data(&mut self, max_data: u64) {
self.max_data = cmp::max(self.max_data, max_data);
}
/// Increments the acked data offset.
pub fn ack(&mut self, off: u64, len: usize) {
self.acked.insert(off..off + len as u64);
}
pub fn ack_and_drop(&mut self, off: u64, len: usize) {
self.ack(off, len);
let ack_off = self.ack_off();
if self.data.is_empty() {
return;
}
if off > ack_off {
return;
}
let mut drop_until = None;
// Drop contiguously acked data from the front of the buffer.
for (i, buf) in self.data.iter_mut().enumerate() {
// Newly acked range is past highest contiguous acked range, so we
// can't drop it.
if buf.off >= ack_off {
break;
}
// Highest contiguous acked range falls within newly acked range,
// so we can't drop it.
if buf.off < ack_off && ack_off < buf.max_off() {
break;
}
// Newly acked range can be dropped.
drop_until = Some(i);
}
if let Some(drop) = drop_until {
self.data.drain(..=drop);
// When a buffer is marked for retransmission, but then acked before
// it could be retransmitted, we might end up decreasing the SendBuf
// position too much, so make sure that doesn't happen.
self.pos = self.pos.saturating_sub(drop + 1);
}
}
pub fn retransmit(&mut self, off: u64, len: usize) {
let max_off = off + len as u64;
let ack_off = self.ack_off();
if self.data.is_empty() {
return;
}
if max_off <= ack_off {
return;
}
for i in 0..self.data.len() {
let buf = &mut self.data[i];
if buf.off >= max_off {
break;
}
if off > buf.max_off() {
continue;
}
// Split the buffer into 2 if the retransmit range ends before the
// buffer's final offset.
let new_buf = if buf.off < max_off && max_off < buf.max_off() {
Some(buf.split_off((max_off - buf.off as u64) as usize))
} else {
None
};
// Advance the buffer's position if the retransmit range is past
// the buffer's starting offset.
buf.pos = if off > buf.off && off <= buf.max_off() {
cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
} else {
buf.start
};
self.pos = cmp::min(self.pos, i);
self.len += buf.len() as u64;
if let Some(b) = new_buf {
self.data.insert(i + 1, b);
}
}
}
/// Resets the stream at the current offset and clears all buffered data.
pub fn reset(&mut self) -> Result<u64> {
self.write(b"", true)?;
// Drop all buffered data.
self.data.clear();
// Mark all data as acked.
self.ack(0, self.off as usize);
self.pos = 0;
self.len = 0;
Ok(self.fin_off.unwrap())
}
/// Resets the streams and records the received error code.
///
/// Calling this again after the first time has no effect.
pub fn stop(&mut self, error_code: u64) -> Result<u64> {
if self.error.is_some() {
return Err(Error::Done);
}
let fin_off = self.reset()?;
self.error = Some(error_code);
Ok(fin_off)
}
/// Shuts down sending data.
pub fn shutdown(&mut self) -> Result<u64> {
if self.shutdown {
return Err(Error::Done);
}
self.shutdown = true;
self.reset()
}
/// Returns the largest offset of data buffered.
#[allow(dead_code)]
pub fn off_back(&self) -> u64 {
self.off
}
/// Returns the lowest offset of data buffered.
pub fn off_front(&self) -> u64 {
let mut pos = self.pos;
// Skip empty buffers from the start of the queue.
while let Some(b) = self.data.get(pos) {
if !b.is_empty() {
return b.off();
}
pos += 1;
}
self.off
}
/// The maximum offset we are allowed to send to the peer.
pub fn max_off(&self) -> u64 {
self.max_data
}
/// Returns true if all data in the stream has been sent.
///
/// This happens when the stream's send final size is knwon, and the
/// application has already written data up to that point.
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
/// Returns true if the send-side of the stream is complete.
///
/// This happens when the stream's send final size is known, and the peer
/// has already acked all stream data up to that point.
pub fn is_complete(&self) -> bool {
if let Some(fin_off) = self.fin_off {
if self.acked == (0..fin_off) {
return true;
}
}
false
}
/// Returns true if the stream was stopped before completion.
pub fn is_stopped(&self) -> bool {
self.error.is_some()
}
/// Returns true if there is data to be written.
fn ready(&self) -> bool {
!self.data.is_empty() && self.off_front() < self.off
}
/// Returns the highest contiguously acked offset.
fn ack_off(&self) -> u64 {
match self.acked.iter().next() {
// Only consider the initial range if it contiguously covers the
// start of the stream (i.e. from offset 0).
Some(std::ops::Range { start: 0, end }) => end,
Some(_) | None => 0,
}
}
/// Returns the outgoing flow control capacity.
pub fn cap(&self) -> Result<usize> {
// The stream was stopped, so return the error code instead.
if let Some(e) = self.error {
return Err(Error::StreamStopped(e));
}
Ok((self.max_data - self.off) as usize)
}
}
/// Buffer holding data at a specific offset.
///
/// The data is stored in a `Vec<u8>` in such a way that it can be shared
/// between multiple `RangeBuf` objects.
///
/// Each `RangeBuf` will have its own view of that buffer, where the `start`
/// value indicates the initial offset within the `Vec`, and `len` indicates the
/// number of bytes, starting from `start` that are included.
///
/// In addition, `pos` indicates the current offset within the `Vec`, starting
/// from the very beginning of the `Vec`.
///
/// Finally, `off` is the starting offset for the specific `RangeBuf` within the
/// stream the buffer belongs to.
#[derive(Clone, Debug, Default)]
pub struct RangeBuf {
/// The internal buffer holding the data.
///
/// To avoid neeless allocations when a RangeBuf is split, this field is
/// reference-counted and can be shared between multiple RangeBuf objects,
/// and sliced using the `start` and `len` values.
///
/// In order to allow the inner `Vec` to be mutated, an `RwLock` is used to
/// achieve interior mutability (`RefCell` can't be used here because it
/// does not implement `Sync`, which is required by `Arc` to make the type
/// `Send`-able).
data: Arc<RwLock<Vec<u8>>>,
/// The initial offset within the internal buffer.
start: usize,
/// The current offset within the internal buffer.
pos: usize,
/// The number of bytes in the buffer, from the initial offset.
len: usize,
/// The offset of the buffer within a stream.
off: u64,
/// Whether this contains the final byte in the stream.
fin: bool,
}
impl RangeBuf {
/// Creates a new `RangeBuf` from the given slice.
pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
RangeBuf {
data: Arc::new(RwLock::new(Vec::from(buf))),
start: 0,
pos: 0,
len: buf.len(),
off,
fin,
}
}
/// Creates a new `RangeBuf` from the given slice and capacity.
///
/// `cap` bytes are allocated for the underlying data store, regardless of
/// the size of the input data. When the data is smaller than the capacity,
/// the remaining space can be filled by calling `extend_from_slice()`.
pub fn from_with_capacity(
buf: &[u8], off: u64, fin: bool, cap: usize,
) -> Result<RangeBuf> {
if cap < buf.len() {
return Err(Error::BufferTooShort);
}
let mut vec = Vec::with_capacity(SEND_BUFFER_SIZE);
vec.extend_from_slice(buf);
Ok(RangeBuf {
data: Arc::new(RwLock::new(vec)),
start: 0,
pos: 0,
len: buf.len(),
off,
fin,
})
}
/// Returns whether `self` holds the final offset in the stream.
pub fn fin(&self) -> bool {
self.fin
}
/// Returns the starting offset of `self`.
pub fn off(&self) -> u64 {
(self.off - self.start as u64) + self.pos as u64
}
/// Returns the final offset of `self`.
pub fn max_off(&self) -> u64 {
self.off() + self.len() as u64
}
/// Returns the length of `self`.
pub fn len(&self) -> usize {
self.len - (self.pos - self.start)
}
/// Returns the amount of bytes that can be written to the internal buffer.
pub fn spare_capacity(&self) -> usize {
let r = self.data.read().unwrap();
r.capacity() - r.len()
}
/// Returns true if `self` has a length of zero bytes.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Consumes the starting `count` bytes of `self`.
pub fn consume(&mut self, count: usize) {
self.pos += count;
}
/// Applies the given function to the internal buffer.
pub fn with<F, T>(&self, f: F) -> T
where
F: FnOnce(&[u8]) -> T,
{
let r = self.data.read().unwrap();
f(&r[self.pos..self.start + self.len])
}
/// Extends the internal buffer with the given slice.
pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<()> {
if self.spare_capacity() < other.len() {
return Err(Error::BufferTooShort);
}
let mut w = self.data.write().unwrap();
w.extend_from_slice(other);
self.len += other.len();
Ok(())
}
/// Splits the buffer into two at the given index.
pub fn split_off(&mut self, at: usize) -> RangeBuf {
if at > self.len {
panic!(
"`at` split index (is {}) should be <= len (is {})",
at, self.len
);
}
let buf = RangeBuf {
data: self.data.clone(),
start: self.start + at,
pos: cmp::max(self.pos, self.start + at),
len: self.len - at,
off: self.off + at as u64,
fin: self.fin,
};
self.pos = cmp::min(self.pos, self.start + at);
self.len = at;
self.fin = false;
buf
}
}
impl Ord for RangeBuf {
fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
// Invert ordering to implement min-heap.
self.off.cmp(&other.off).reverse()
}
}
impl PartialOrd for RangeBuf {
fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for RangeBuf {
fn eq(&self, other: &RangeBuf) -> bool {
self.off == other.off
}
}
// Implement Eq explicitly because RwLock prevents it from being derived.
impl Eq for RangeBuf {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn empty_stream_frame() {
let mut recv = RecvBuf::new(15);
assert_eq!(recv.len, 0);
let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let mut buf = [0; 32];
assert_eq!(recv.emit(&mut buf), Ok((5, false)));
// Don't store non-fin empty buffer.
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);
// Check flow control for empty buffer.
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.write(buf), Err(Error::FlowControl));
// Store fin empty buffer.
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
// Don't store additional fin empty buffers.
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
// Don't store additional fin non-empty buffers.
let buf = RangeBuf::from(b"aa", 3, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
// Validate final size with fin empty buffers.
let buf = RangeBuf::from(b"", 6, true);
assert_eq!(recv.write(buf), Err(Error::FinalSize));
let buf = RangeBuf::from(b"", 4, true);
assert_eq!(recv.write(buf), Err(Error::FinalSize));
let mut buf = [0; 32];
assert_eq!(recv.emit(&mut buf), Ok((0, true)));
}
#[test]
fn ordered_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"helloworldsomething");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn split_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
assert_eq!(len, 10);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somethingh");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
assert_eq!(len, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"ellow");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 15);
let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
assert_eq!(len, 4);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"orld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn incomplete_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethinghelloworld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn zero_len_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
}
#[test]
fn past_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"ello", 4, true);
let fourth = RangeBuf::from(b"ello", 5, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.write(third), Err(Error::FinalSize));
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read2() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somehello");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read3() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somhellog");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read_multi() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"somethingsomething", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"hello", 12, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 17);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 18);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somhellogsomhellog");
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 18);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_start_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 8, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 13);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethingello");
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 13);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_end_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"something", 3, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 12);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"helsomething");
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 12);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 8, false);
let second = RangeBuf::from(b"something", 0, false);
let third = RangeBuf::from(b"moar", 11, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 15);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethinhelloar");
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 15);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read2() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"aaa", 0, false);
let second = RangeBuf::from(b"bbb", 2, false);
let third = RangeBuf::from(b"ccc", 4, false);
let fourth = RangeBuf::from(b"ddd", 6, false);
let fifth = RangeBuf::from(b"eee", 9, false);
let sixth = RangeBuf::from(b"fff", 11, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 4);
assert!(recv.write(sixth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert!(recv.write(fifth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 14);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"aabbbcdddeefff");
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 14);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn empty_write() {
let mut buf = [0; 5];
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let (written, fin) = send.emit(&mut buf).unwrap();
assert_eq!(written, 0);
assert_eq!(fin, false);
}
#[test]
fn multi_write() {
let mut buf = [0; 128];
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(second, true).is_ok());
assert_eq!(send.len, 19);
let (written, fin) = send.emit(&mut buf[..128]).unwrap();
assert_eq!(written, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..written], b"somethinghelloworld");
assert_eq!(send.len, 0);
}
#[test]
fn split_write() {
let mut buf = [0; 10];
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(second, true).is_ok());
assert_eq!(send.len, 19);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"somethingh");
assert_eq!(send.len, 9);
assert_eq!(send.off_front(), 10);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"ellow");
assert_eq!(send.len, 4);
assert_eq!(send.off_front(), 15);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 4);
assert_eq!(fin, true);
assert_eq!(&buf[..written], b"orld");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 19);
}
#[test]
fn resend() {
let mut buf = [0; 15];
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.off_front(), 0);
assert!(send.write(second, true).is_ok());
assert_eq!(send.off_front(), 0);
assert_eq!(send.len, 19);
let (written, fin) = send.emit(&mut buf[..4]).unwrap();
assert_eq!(written, 4);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"some");
assert_eq!(send.len, 15);
assert_eq!(send.off_front(), 4);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"thing");
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 9);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"hello");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 14);
send.retransmit(4, 5);
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 4);
send.retransmit(0, 4);
assert_eq!(send.len, 14);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..11]).unwrap();
assert_eq!(written, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"something");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 14);
let (written, fin) = send.emit(&mut buf[..11]).unwrap();
assert_eq!(written, 5);
assert_eq!(fin, true);
assert_eq!(&buf[..written], b"world");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 19);
}
#[test]
fn write_blocked_by_off() {
let mut buf = [0; 10];
let mut send = SendBuf::default();
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert_eq!(send.write(first, false), Ok(0));
assert_eq!(send.len, 0);
assert_eq!(send.write(second, true), Ok(0));
assert_eq!(send.len, 0);
send.update_max_data(5);
assert_eq!(send.write(first, false), Ok(5));
assert_eq!(send.len, 5);
assert_eq!(send.write(second, true), Ok(0));
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"somet");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 5);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 0);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"");
assert_eq!(send.len, 0);
send.update_max_data(15);
assert_eq!(send.write(&first[5..], false), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.write(second, true), Ok(6));
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 5);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert_eq!(fin, false);
assert_eq!(&buf[..10], b"hinghellow");
assert_eq!(send.len, 0);
send.update_max_data(25);
assert_eq!(send.write(&second[6..], true), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.off_front(), 15);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 4);
assert_eq!(fin, true);
assert_eq!(&buf[..written], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn zero_len_write() {
let mut buf = [0; 10];
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(&[], true).is_ok());
assert_eq!(send.len, 9);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 9);
assert_eq!(fin, true);
assert_eq!(&buf[..written], b"something");
assert_eq!(send.len, 0);
}
#[test]
fn recv_flow_control() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.write(second), Ok(()));
assert_eq!(stream.recv.write(first), Ok(()));
assert!(!stream.recv.almost_full());
assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(fin, false);
assert!(stream.recv.almost_full());
stream.recv.update_max_data();
assert_eq!(stream.recv.max_data_next(), 25);
assert!(!stream.recv.almost_full());
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.write(third), Ok(()));
}
#[test]
fn recv_past_fin() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
}
#[test]
fn recv_fin_dup() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.write(second), Ok(()));
let mut buf = [0; 32];
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert_eq!(fin, true);
}
#[test]
fn recv_fin_change() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.write(second), Ok(()));
assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_lower_than_received() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.write(second), Ok(()));
assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_flow_control() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.write(second), Ok(()));
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(fin, true);
assert!(!stream.recv.almost_full());
}
#[test]
fn recv_fin_reset_mismatch() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_dup() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(5), Ok(0));
assert_eq!(stream.recv.reset(5), Ok(0));
}
#[test]
fn recv_reset_change() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(5), Ok(0));
assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_lower_than_received() {
let mut stream = Stream::new(15, 0, true, true);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(4), Err(Error::FinalSize));
}
#[test]
fn send_flow_control() {
let mut buf = [0; 25];
let mut stream = Stream::new(0, 15, true, true);
let first = b"hello";
let second = b"world";
let third = b"something";
assert!(stream.send.write(first, false).is_ok());
assert!(stream.send.write(second, false).is_ok());
assert!(stream.send.write(third, false).is_ok());
assert_eq!(stream.send.off_front(), 0);
let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
assert_eq!(written, 15);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"helloworldsomet");
assert_eq!(stream.send.off_front(), 15);
let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
assert_eq!(written, 0);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"");
stream.send.retransmit(0, 15);
assert_eq!(stream.send.off_front(), 0);
let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"helloworld");
assert_eq!(stream.send.off_front(), 10);
let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..written], b"somet");
}
#[test]
fn send_past_fin() {
let mut stream = Stream::new(0, 15, true, true);
let first = b"hello";
let second = b"world";
let third = b"third";
assert_eq!(stream.send.write(first, false), Ok(5));
assert_eq!(stream.send.write(second, true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
}