stream: allow RangeBuf to be partially consumed without splitting (#643)

When "popping" a RangeBuf that exceeded the maximum capacity of the
output buffer, we would simply split-off the internal buffer up to the
maximum cap, and then push a new RangeBuf back into the send or recv
buffer.

This is inefficient for two reasons:

1. Splitting off a Vec means duplicating the remainder of the buffer
   into a new Vec. Depending on the size of the original buffer this
   could be very expensive, which is why we had a hack to split-off
   buffers received by the application before creating the original
   RangeBuf.

2. Pushing back the new RangeBuf into the send or recv buffer is
   potentially expensive as a binary heap is used.

The new popping implementations allow for a RangeBuf to be partially
consumed at its start, without the need for splitting it off.

The downside is that the "consumed" bytes will only be freed when the
whole RangeBuf is consumed, but that should be fine.
diff --git a/src/stream.rs b/src/stream.rs
index 53a7142..f5e87c6 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -40,8 +40,6 @@
 
 const DEFAULT_URGENCY: u8 = 127;
 
-const MAX_WRITE_SIZE: usize = 1000;
-
 /// Keeps track of QUIC streams and enforces stream limits.
 #[derive(Default)]
 pub struct StreamMap {
@@ -596,7 +594,7 @@
 /// Stream data received by the peer is buffered in a list of data chunks
 /// ordered by offset in ascending order. Contiguous data can then be read
 /// into a slice.
-#[derive(Default)]
+#[derive(Debug, Default)]
 pub struct RecvBuf {
     /// Chunks of data received from the peer that have not yet been read by
     /// the application, ordered by offset.
@@ -753,30 +751,29 @@
         }
 
         while cap > 0 && self.ready() {
-            let mut buf = match self.data.pop() {
+            let mut buf = match self.data.peek_mut() {
                 Some(v) => v,
 
                 None => break,
             };
 
-            if buf.len() > cap {
-                let new_buf = RangeBuf {
-                    data: buf.data.split_off(cap),
-                    off: buf.off + cap as u64,
-                    fin: buf.fin,
-                };
+            let buf_len = cmp::min(buf.len(), cap);
 
-                buf.fin = false;
+            out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
 
-                self.data.push(new_buf);
+            self.off += buf_len as u64;
+
+            len += buf_len;
+            cap -= buf_len;
+
+            if buf_len < buf.len() {
+                buf.consume(buf_len);
+
+                // We reached the maximum capacity, so end here.
+                break;
             }
 
-            out[len..len + buf.len()].copy_from_slice(&buf.data);
-
-            self.off += buf.len() as u64;
-
-            len += buf.len();
-            cap -= buf.len();
+            std::collections::binary_heap::PeekMut::pop(buf);
         }
 
         self.max_data_next = self.max_data_next.saturating_add(len as u64);
@@ -863,7 +860,7 @@
             None => return false,
         };
 
-        buf.off == self.off
+        buf.off() == self.off
     }
 }
 
@@ -876,7 +873,7 @@
 /// By default, new data is appended at the end of the stream, but data can be
 /// inserted at the start of the buffer (this is to allow data that needs to be
 /// retransmitted to be re-buffered).
-#[derive(Default)]
+#[derive(Debug, Default)]
 pub struct SendBuf {
     /// Chunks of data to be sent, ordered by offset.
     data: BinaryHeap<RangeBuf>,
@@ -917,8 +914,6 @@
     pub fn push_slice(
         &mut self, mut data: &[u8], mut fin: bool,
     ) -> Result<usize> {
-        let mut len = 0;
-
         if self.shutdown {
             // Since we won't write any more data anyway, pretend that we sent
             // all data that was passed in.
@@ -942,19 +937,10 @@
             fin = false;
         }
 
-        // Split the input buffer into multiple RangeBufs. Otherwise a big
-        // buffer would need to be split later on when popping data, which
-        // would cause a partial copy of the buffer.
-        for chunk in data.chunks(MAX_WRITE_SIZE) {
-            len += chunk.len();
+        let buf = RangeBuf::from(data, self.off, fin);
+        self.push(buf)?;
 
-            let fin = len == data.len() && fin;
-
-            let buf = RangeBuf::from(chunk, self.off, fin);
-            self.push(buf)?;
-
-            self.off += chunk.len() as u64;
-        }
+        self.off += data.len() as u64;
 
         Ok(data.len())
     }
@@ -1014,30 +1000,33 @@
             self.off_front() == out_off &&
             self.off_front() < self.max_data
         {
-            let mut buf = match self.data.pop() {
+            let mut buf = match self.data.peek_mut() {
                 Some(v) => v,
 
                 None => break,
             };
 
-            if buf.len() > out_len || buf.max_off() > self.max_data {
-                let new_len =
-                    cmp::min(out_len, (self.max_data - buf.off()) as usize);
-                let new_buf = buf.split_off(new_len);
-
-                self.data.push(new_buf);
-            }
+            let buf_len = cmp::min(buf.len(), out_len);
 
             if out.is_empty() {
-                out.off = buf.off;
+                out.off = buf.off();
             }
 
-            self.len -= buf.len() as u64;
+            self.len -= buf_len as u64;
 
-            out_len -= buf.len();
-            out_off = buf.max_off();
+            out_len -= buf_len;
+            out_off = buf.off() + buf_len as u64;
 
-            out.data.extend_from_slice(&buf.data);
+            out.data.extend_from_slice(&buf[..buf_len]);
+
+            if buf_len < buf.len() {
+                buf.consume(buf_len);
+
+                // We reached the maximum capacity, so end here.
+                break;
+            }
+
+            std::collections::binary_heap::PeekMut::pop(buf);
         }
 
         // Override the `fin` flag set for the output buffer by matching the
@@ -1145,8 +1134,17 @@
 /// Buffer holding data at a specific offset.
 #[derive(Clone, Debug, Default, Eq)]
 pub struct RangeBuf {
+    /// The internal buffer holding the data.
     data: Vec<u8>,
+
+    /// The starting offset within `data`. This allows partially consuming a
+    /// buffer without duplicating the data.
+    pos: usize,
+
+    /// The starting offset within a stream.
     off: u64,
+
+    /// Whether this contains the final byte in the stream.
     fin: bool,
 }
 
@@ -1155,6 +1153,7 @@
     pub(crate) fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
         RangeBuf {
             data: Vec::from(buf),
+            pos: 0,
             off,
             fin,
         }
@@ -1167,7 +1166,7 @@
 
     /// Returns the starting offset of `self`.
     pub fn off(&self) -> u64 {
-        self.off
+        self.off + self.pos as u64
     }
 
     /// Returns the final offset of `self`.
@@ -1177,7 +1176,7 @@
 
     /// Returns the length of `self`.
     pub fn len(&self) -> usize {
-        self.data.len()
+        self.data.len() - self.pos
     }
 
     /// Returns true if `self` has a length of zero bytes.
@@ -1185,10 +1184,16 @@
         self.len() == 0
     }
 
+    /// Consumes the starting `count` bytes of `self`.
+    pub fn consume(&mut self, count: usize) {
+        self.pos += count;
+    }
+
     /// Splits the buffer into two at the given index.
     pub fn split_off(&mut self, at: usize) -> RangeBuf {
         let buf = RangeBuf {
             data: self.data.split_off(at),
+            pos: 0,
             off: self.off + at as u64,
             fin: self.fin,
         };
@@ -1203,13 +1208,13 @@
     type Target = [u8];
 
     fn deref(&self) -> &[u8] {
-        &self.data
+        &self.data[self.pos..]
     }
 }
 
 impl std::ops::DerefMut for RangeBuf {
     fn deref_mut(&mut self) -> &mut [u8] {
-        &mut self.data
+        &mut self.data[self.pos..]
     }
 }