update the receiver flow control

This PR is to adopt [Chromium's approach]
(https://docs.google.com/document/d/1F2YfdDXKpy20WVKJueEf4abn_LVZHhMUMS5gX6Pgjl4/edit?usp=sharing) for the receiver flow control.

flowcontrol.rs contains a common framework for flow control updates.

Basically, the concept is to keep a `window` / 2 between the consumed
data offset and `max_data`. When the gap is smaller than window / 2,
we update `MAX_DATA` frame. Same applies to the stream updating
`MAX_STREAM_DATA`.

Auto-tuning of the window is also implemented. When `window`
update keeps happening within 2 x rtt, double `window` value.
By this `window` will be close to the actual BDP. Note that
the window only grows, not shrinks.

Also, make sure the connection window is always 1.5x higher
than any of the stream window. This is for making sure that
the connection flow limit is not a blocker for the stream
to grow.

`RecvBuf.push()` now returns the increased size of the last offset,
which will be used for updating `rx_data`. This is actually a bug fix.
Previously we increased `rx_data` by the size of the data received,
however when the data has an overlap (e.g. duplicate packet) we
should not increase `rx_data`. Also `rx_data_consumed` is added for
tracking `rx_data` offset consumed by the application.
diff --git a/src/flowcontrol.rs b/src/flowcontrol.rs
new file mode 100644
index 0000000..e596891
--- /dev/null
+++ b/src/flowcontrol.rs
@@ -0,0 +1,202 @@
+// Copyright (C) 2020, Cloudflare, Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright notice,
+//       this list of conditions and the following disclaimer.
+//
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+use std::time::Duration;
+use std::time::Instant;
+
+// When autotune the receiver window, how much we increase the window.
+const WINDOW_INCREASE_FACTOR: u64 = 2;
+
+// When autotune the receiver window, check if the last update is within RTT *
+// this constant.
+const WINDOW_TRIGGER_FACTOR: u32 = 2;
+
+#[derive(Default, Debug)]
+pub struct FlowControl {
+    /// Flow control limit.
+    max_data: u64,
+
+    /// The maximum receive window. This value is used for updating
+    /// flow control limit.
+    window: u64,
+
+    /// Last update time of max_data for autotuning the window.
+    last_update: Option<Instant>,
+}
+
+impl FlowControl {
+    pub fn new(max_data: u64, window: u64) -> Self {
+        Self {
+            max_data,
+
+            window,
+
+            last_update: None,
+        }
+    }
+
+    /// Returns the current window.
+    pub fn window(&self) -> u64 {
+        self.window
+    }
+
+    /// Returns the current max_data limit.
+    pub fn max_data(&self) -> u64 {
+        self.max_data
+    }
+
+    /// Returns true if the flow control needs to update max_data.
+    ///
+    /// This happens when the available window is smaller than the half
+    /// of the current window.
+    pub fn should_update_max_data(&self, consumed: u64) -> bool {
+        let available_window = self.max_data - consumed;
+
+        available_window < (self.window / 2)
+    }
+
+    /// Returns the new max_data limit.
+    pub fn max_data_next(&mut self, consumed: u64) -> u64 {
+        let available_window = self.max_data - consumed;
+
+        self.max_data + (self.window - available_window)
+    }
+
+    /// Commits the new max_data limit.
+    pub fn update_max_data(&mut self, consumed: u64, now: Instant) {
+        self.max_data = self.max_data_next(consumed);
+        self.last_update = Some(now);
+    }
+
+    /// Make sure the lower bound of the current window.
+    /// Returns true if the current window changed.
+    pub fn ensure_window_lower_bound(&mut self, min_window: u64) -> bool {
+        if min_window > self.window {
+            self.window = min_window;
+
+            return true;
+        }
+
+        false
+    }
+
+    /// Autotune the window size. When there is an another update
+    /// within RTT x 2, bump connection window x 1.5, capped by
+    /// max(stream window).
+    pub fn autotune_window(
+        &mut self, now: Instant, rtt: Duration, max_window: u64,
+    ) {
+        if let Some(last_update) = self.last_update {
+            if now - last_update < rtt * WINDOW_TRIGGER_FACTOR {
+                self.window = std::cmp::min(
+                    self.window * WINDOW_INCREASE_FACTOR,
+                    max_window,
+                );
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn max_data() {
+        let fc = FlowControl::new(100, 20);
+
+        assert_eq!(fc.max_data(), 100);
+    }
+
+    #[test]
+    fn should_update_max_data() {
+        let fc = FlowControl::new(100, 20);
+
+        assert_eq!(fc.should_update_max_data(85), false);
+        assert_eq!(fc.should_update_max_data(95), true);
+    }
+
+    #[test]
+    fn max_data_next() {
+        let mut fc = FlowControl::new(100, 20);
+        let consumed = 95;
+
+        assert_eq!(fc.should_update_max_data(consumed), true);
+        assert_eq!(fc.max_data_next(consumed), consumed + 20);
+    }
+
+    #[test]
+    fn update_max_data() {
+        let mut fc = FlowControl::new(100, 20);
+        let consumed = 95;
+
+        assert_eq!(fc.should_update_max_data(consumed), true);
+
+        let max_data_next = fc.max_data_next(consumed);
+        assert_eq!(max_data_next, consumed + 20);
+
+        fc.update_max_data(consumed, Instant::now());
+        assert_eq!(fc.max_data(), max_data_next);
+    }
+
+    #[test]
+    fn ensure_window_lower_bound() {
+        let w = 20;
+        let mut fc = FlowControl::new(100, w);
+
+        // Lower than current window x 1.5 (30).
+        assert_eq!(fc.ensure_window_lower_bound(w), false);
+
+        // Higher than current window x 1.5 (30).
+        assert_eq!(fc.ensure_window_lower_bound(w * 2), true);
+    }
+
+    #[test]
+    fn autotune_window() {
+        let w = 20;
+        let mut fc = FlowControl::new(100, w);
+        let consumed = 95;
+
+        assert_eq!(fc.should_update_max_data(consumed), true);
+
+        let max_data_next = fc.max_data_next(consumed);
+        assert_eq!(max_data_next, consumed + w);
+
+        fc.update_max_data(consumed, Instant::now());
+        assert_eq!(fc.max_data(), max_data_next);
+
+        // Window size should be doubled.
+        fc.autotune_window(Instant::now(), Duration::from_millis(100), 100);
+
+        let w = w * 2;
+        let consumed = 110;
+
+        assert_eq!(fc.should_update_max_data(consumed), true);
+
+        let max_data_next = fc.max_data_next(consumed);
+        assert_eq!(max_data_next, consumed + w);
+    }
+}
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index 6248688..aeb861d 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -822,7 +822,7 @@
             conn.trace_id(),
             stream_id,
             header_block.len(),
-            fin
+            fin,
         );
 
         b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
@@ -3098,6 +3098,11 @@
             Header::new(":path", "/test"),
         ];
 
+        let ev_headers = Event::Headers {
+            list: req.clone(),
+            has_body: false,
+        };
+
         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
 
         assert_eq!(
@@ -3107,6 +3112,11 @@
 
         s.advance().ok();
 
+        // The server should consume the request to update flow control credit.
+        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
+
+        s.advance().ok();
+
         // Once the server gives flow control credits back, we can send the
         // request.
         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
@@ -3144,6 +3154,11 @@
             Header::new(":path", "/test"),
         ];
 
+        let ev_headers = Event::Headers {
+            list: req.clone(),
+            has_body: true,
+        };
+
         assert_eq!(
             s.client.send_request(&mut s.pipe.client, &req, false),
             Ok(0)
@@ -3156,6 +3171,11 @@
 
         s.advance().ok();
 
+        // The server should consume the request to update flow control credit.
+        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
+
+        s.advance().ok();
+
         // Once the server gives flow control credits back, we can send the body.
         assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
     }
diff --git a/src/lib.rs b/src/lib.rs
index 352f1e9..0c95e4f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -313,6 +313,16 @@
 // frames size. We enforce the recommendation for forward compatibility.
 const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
 
+// The default size of the receiver connection flow control window.
+const DEFAULT_CONNECTION_WINDOW: u64 = 48 * 1024;
+
+// The maximum size of the receiver connection flow control window.
+const MAX_CONNECTION_WINDOW: u64 = 24 * 1024 * 1024;
+
+// How much larger the connection flow control window need to be larger than
+// the stream flow control window.
+const CONNECTION_WINDOW_FACTOR: f64 = 1.5;
+
 /// A specialized [`Result`] type for quiche operations.
 ///
 /// This type is used throughout quiche's public API for any operation that
@@ -812,12 +822,11 @@
     /// Total number of bytes received from the peer.
     rx_data: u64,
 
-    /// Local flow control limit for the connection.
-    max_rx_data: u64,
+    /// Total number of bytes read from the application.
+    rx_data_consumed: u64,
 
-    /// Updated local flow control limit for the connection. This is used to
-    /// trigger sending MAX_DATA frames after a certain threshold.
-    max_rx_data_next: u64,
+    /// Receiver flow controller.
+    flow_control: flowcontrol::FlowControl,
 
     /// Whether we send MAX_DATA frame.
     almost_full: bool,
@@ -1160,8 +1169,11 @@
             sent_count: 0,
 
             rx_data: 0,
-            max_rx_data,
-            max_rx_data_next: max_rx_data,
+            rx_data_consumed: 0,
+            flow_control: flowcontrol::FlowControl::new(
+                max_rx_data,
+                cmp::min(max_rx_data, DEFAULT_CONNECTION_WINDOW),
+            ),
             almost_full: false,
 
             tx_data: 0,
@@ -2174,36 +2186,9 @@
                 }
             }
 
-            // Create MAX_DATA frame as needed.
-            if self.almost_full {
-                let frame = frame::Frame::MaxData {
-                    max: self.max_rx_data_next,
-                };
-
-                if push_frame_to_pkt!(frames, frame, payload_len, left) {
-                    self.almost_full = false;
-
-                    // Commits the new max_rx_data limit.
-                    self.max_rx_data = self.max_rx_data_next;
-
-                    ack_eliciting = true;
-                    in_flight = true;
-                }
-            }
-
-            // Create DATA_BLOCKED frame.
-            if let Some(limit) = self.blocked_limit {
-                let frame = frame::Frame::DataBlocked { limit };
-
-                if push_frame_to_pkt!(frames, frame, payload_len, left) {
-                    self.blocked_limit = None;
-
-                    ack_eliciting = true;
-                    in_flight = true;
-                }
-            }
-
             // Create MAX_STREAM_DATA frames as needed.
+            // MAX_DATA may need to be sent when updating MAX_STREAM_DATA,
+            // so we send update MAX_STREAM_DATA before MAX_DATA.
             for stream_id in self.streams.almost_full() {
                 let stream = match self.streams.get_mut(stream_id) {
                     Some(v) => v,
@@ -2216,6 +2201,9 @@
                     },
                 };
 
+                // Autotune the stream window size.
+                stream.recv.autotune_window(now, self.recovery.rtt());
+
                 let frame = frame::Frame::MaxStreamData {
                     stream_id,
                     max: stream.recv.max_data_next(),
@@ -2224,6 +2212,19 @@
                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
                     stream.recv.update_max_data();
 
+                    // Make sure the connection window always has some
+                    // room compared to the stream window. If MAX_DATA
+                    // update is necessary, send a new one.
+                    if self.flow_control.ensure_window_lower_bound(
+                        (stream.recv.window() as f64 * CONNECTION_WINDOW_FACTOR)
+                            as u64,
+                    ) && self
+                        .flow_control
+                        .should_update_max_data(self.rx_data_consumed)
+                    {
+                        self.almost_full = true;
+                    }
+
                     self.streams.mark_almost_full(stream_id, false);
 
                     ack_eliciting = true;
@@ -2231,6 +2232,29 @@
                 }
             }
 
+            // Create MAX_DATA frame as needed.
+            if self.almost_full {
+                // Autotune the maximum window size.
+                self.flow_control.autotune_window(
+                    now,
+                    self.recovery.rtt(),
+                    MAX_CONNECTION_WINDOW,
+                );
+
+                let frame = frame::Frame::MaxData {
+                    max: self.flow_control.max_data_next(self.rx_data_consumed),
+                };
+
+                if push_frame_to_pkt!(frames, frame, payload_len, left) {
+                    self.flow_control
+                        .update_max_data(self.rx_data_consumed, now);
+                    self.almost_full = false;
+
+                    ack_eliciting = true;
+                    in_flight = true;
+                }
+            }
+
             // Create STREAM_DATA_BLOCKED frames as needed.
             for (stream_id, limit) in self
                 .streams
@@ -2247,6 +2271,18 @@
                     in_flight = true;
                 }
             }
+
+            // Create DATA_BLOCKED frame as needed.
+            if let Some(limit) = self.blocked_limit {
+                let frame = frame::Frame::DataBlocked { limit };
+
+                if push_frame_to_pkt!(frames, frame, payload_len, left) {
+                    self.blocked_limit = None;
+
+                    ack_eliciting = true;
+                    in_flight = true;
+                }
+            }
         }
 
         // Create CONNECTION_CLOSE frame.
@@ -2667,7 +2703,7 @@
 
         let (read, fin) = stream.recv.pop(out)?;
 
-        self.max_rx_data_next = self.max_rx_data_next.saturating_add(read as u64);
+        self.rx_data_consumed = self.rx_data_consumed.saturating_add(read as u64);
 
         let readable = stream.is_readable();
 
@@ -2687,6 +2723,13 @@
             self.streams.collect(stream_id, local);
         }
 
+        if self
+            .flow_control
+            .should_update_max_data(self.rx_data_consumed)
+        {
+            self.almost_full = true;
+        }
+
         qlog_with!(self.qlog_streamer, q, {
             let ev = qlog::event::Event::h3_data_moved(
                 stream_id.to_string(),
@@ -2699,10 +2742,6 @@
             q.add_event(ev).ok();
         });
 
-        if self.should_update_max_data() {
-            self.almost_full = true;
-        }
-
         Ok((read, fin))
     }
 
@@ -3622,7 +3661,7 @@
 
                 self.rx_data += stream.recv.reset(final_size)? as u64;
 
-                if self.rx_data > self.max_rx_data {
+                if self.rx_data > self.flow_control.max_data() {
                     return Err(Error::FlowControl);
                 }
             },
@@ -3752,13 +3791,6 @@
                     return Err(Error::InvalidStreamState);
                 }
 
-                // Check for flow control limits.
-                let data_len = data.len() as u64;
-
-                if self.rx_data + data_len > self.max_rx_data {
-                    return Err(Error::FlowControl);
-                }
-
                 // Get existing stream or create a new one, but if the stream
                 // has already been closed and collected, ignore the frame.
                 //
@@ -3777,13 +3809,18 @@
                     Err(e) => return Err(e),
                 };
 
-                stream.recv.push(data)?;
+                let rx_inc = stream.recv.push(data)?;
 
                 if stream.is_readable() {
                     self.streams.mark_readable(stream_id, true);
                 }
 
-                self.rx_data += data_len;
+                // Check for the connection flow control limits.
+                if self.rx_data + rx_inc > self.flow_control.max_data() {
+                    return Err(Error::FlowControl);
+                }
+
+                self.rx_data = self.rx_data.saturating_add(rx_inc);
             },
 
             frame::Frame::MaxData { max } => {
@@ -3931,15 +3968,6 @@
         trace!("{} dropped epoch {} state", self.trace_id, epoch);
     }
 
-    /// Returns true if the connection-level flow control needs to be updated.
-    ///
-    /// This happens when the new max data limit is at least double the amount
-    /// of data that can be received before blocking.
-    fn should_update_max_data(&self) -> bool {
-        self.max_rx_data_next != self.max_rx_data &&
-            self.max_rx_data_next / 2 > self.max_rx_data - self.rx_data
-    }
-
     /// Returns the idle timeout value.
     ///
     /// `None` is returned if both end-points disabled the idle timeout.
@@ -5207,6 +5235,14 @@
         // Ignore ACK.
         iter.next().unwrap();
 
+        // MAX_STREAM_DATA comes first and MAX_DATA comes later.
+        assert_eq!(
+            iter.next(),
+            Some(&frame::Frame::MaxStreamData {
+                stream_id: 4,
+                max: 30
+            })
+        );
         assert_eq!(iter.next(), Some(&frame::Frame::MaxData { max: 46 }));
     }
 
@@ -5260,7 +5296,7 @@
 
         let frames = [frame::Frame::Stream {
             stream_id: 4,
-            data: stream::RangeBuf::from(b"aaaaaaa", 0, false),
+            data: stream::RangeBuf::from(b"aaaaaaaaa", 0, false),
         }];
 
         let pkt_type = packet::Type::Short;
@@ -5271,7 +5307,7 @@
 
         let frames = [frame::Frame::Stream {
             stream_id: 4,
-            data: stream::RangeBuf::from(b"a", 7, false),
+            data: stream::RangeBuf::from(b"a", 9, false),
         }];
 
         let len = pipe
@@ -5291,7 +5327,7 @@
             iter.next(),
             Some(&frame::Frame::MaxStreamData {
                 stream_id: 4,
-                max: 22,
+                max: 24,
             })
         );
     }
@@ -7724,6 +7760,7 @@
 mod crypto;
 mod dgram;
 mod ffi;
+mod flowcontrol;
 mod frame;
 pub mod h3;
 mod minmax;
diff --git a/src/stream.rs b/src/stream.rs
index f5e87c6..074b3d1 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -32,14 +32,23 @@
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::collections::VecDeque;
+use std::time::Duration;
+use std::time::Instant;
 
 use crate::Error;
 use crate::Result;
 
+use crate::flowcontrol;
 use crate::ranges;
 
 const DEFAULT_URGENCY: u8 = 127;
 
+/// The default size of the receiver stream flow control window.
+const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
+
+/// The maximum size of the receiver stream flow control window.
+const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
+
 /// Keeps track of QUIC streams and enforces stream limits.
 #[derive(Default)]
 pub struct StreamMap {
@@ -606,11 +615,8 @@
     /// The total length of data received on this stream.
     len: u64,
 
-    /// The maximum offset the peer is allowed to send us.
-    max_data: u64,
-
-    /// The updated maximum offset the peer is allowed to send us.
-    max_data_next: u64,
+    /// Receiver flow controller.
+    flow_control: flowcontrol::FlowControl,
 
     /// The final stream offset received from the peer, if any.
     fin_off: Option<u64>,
@@ -623,8 +629,10 @@
     /// Creates a new receive buffer.
     fn new(max_data: u64) -> RecvBuf {
         RecvBuf {
-            max_data,
-            max_data_next: max_data,
+            flow_control: flowcontrol::FlowControl::new(
+                max_data,
+                cmp::min(max_data, DEFAULT_STREAM_WINDOW),
+            ),
             ..RecvBuf::default()
         }
     }
@@ -634,8 +642,12 @@
     /// This also takes care of enforcing stream flow control limits, as well
     /// as handling incoming data that overlaps data that is already in the
     /// buffer.
-    pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
-        if buf.max_off() > self.max_data {
+    ///
+    /// Returns how many bytes increased in the largest offset.
+    /// It will return 0 when the inserted data is duplicated or overlapped and
+    /// doesn't move the largest offset.
+    pub fn push(&mut self, buf: RangeBuf) -> Result<u64> {
+        if buf.max_off() > self.max_data() {
             return Err(Error::FlowControl);
         }
 
@@ -659,13 +671,13 @@
         // We already saved the final offset, so there's nothing else we
         // need to keep from the RangeBuf if it's empty.
         if self.fin_off.is_some() && buf.is_empty() {
-            return Ok(());
+            return Ok(0);
         }
 
         // No need to process an empty buffer with the fin flag, if we already
         // know the final size.
         if buf.fin() && buf.is_empty() && self.fin_off.is_some() {
-            return Ok(());
+            return Ok(0);
         }
 
         if buf.fin() {
@@ -674,7 +686,7 @@
 
         // No need to store empty buffer that doesn't carry the fin flag.
         if !buf.fin() && buf.is_empty() {
-            return Ok(());
+            return Ok(0);
         }
 
         // Check if data is fully duplicate, that is the buffer's max offset is
@@ -686,16 +698,19 @@
             // By this point all spurious empty buffers should have already been
             // discarded, so allowing empty buffers here should be safe.
             if !buf.is_empty() {
-                return Ok(());
+                return Ok(0);
             }
         }
 
         if self.drain {
-            return Ok(());
+            return Ok(0);
         }
 
         let mut tmp_buf = Some(buf);
 
+        // How many bytes the receive buffer right edge (max_off()) increased.
+        let mut len_inc = 0;
+
         while let Some(mut buf) = tmp_buf {
             tmp_buf = None;
 
@@ -711,7 +726,7 @@
             for b in &self.data {
                 // New buffer is fully contained in existing buffer.
                 if buf.off() >= b.off() && buf.max_off() <= b.max_off() {
-                    return Ok(());
+                    return Ok(0);
                 }
 
                 // New buffer's start overlaps existing buffer.
@@ -725,12 +740,17 @@
                 }
             }
 
-            self.len = cmp::max(self.len, buf.max_off());
+            self.len = if buf.max_off() > self.len {
+                len_inc += buf.max_off() - self.len;
+                buf.max_off()
+            } else {
+                self.len
+            };
 
             self.data.push(buf);
         }
 
-        Ok(())
+        Ok(len_inc)
     }
 
     /// Writes data from the receive buffer into the given output buffer.
@@ -776,8 +796,6 @@
             std::collections::binary_heap::PeekMut::pop(buf);
         }
 
-        self.max_data_next = self.max_data_next.saturating_add(len as u64);
-
         Ok((len, self.is_fin()))
     }
 
@@ -802,14 +820,30 @@
         Ok((final_size - self.len) as usize)
     }
 
+    /// Return the current window.
+    pub fn window(&mut self) -> u64 {
+        self.flow_control.window()
+    }
+
+    /// Return the current max_data limit.
+    pub fn max_data(&mut self) -> u64 {
+        self.flow_control.max_data()
+    }
+
     /// Commits the new max_data limit.
     pub fn update_max_data(&mut self) {
-        self.max_data = self.max_data_next;
+        self.flow_control.update_max_data(self.off, Instant::now());
     }
 
     /// Return the new max_data limit.
     pub fn max_data_next(&mut self) -> u64 {
-        self.max_data_next
+        self.flow_control.max_data_next(self.off)
+    }
+
+    /// Autotune the window size.
+    pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
+        self.flow_control
+            .autotune_window(now, rtt, MAX_STREAM_WINDOW);
     }
 
     /// Shuts down receiving data.
@@ -831,13 +865,10 @@
         self.off
     }
 
-    /// Returns true if we need to update the local flow control limit.
+    /// Returns true if we need to update the stream flow control limit.
     pub fn almost_full(&self) -> bool {
-        // Send MAX_STREAM_DATA when the new limit is at least double the
-        // amount of data that can be received before blocking.
         self.fin_off.is_none() &&
-            self.max_data_next != self.max_data &&
-            self.max_data_next / 2 > self.max_data - self.len
+            self.flow_control.should_update_max_data(self.off)
     }
 
     /// Returns true if the receive-side of the stream is complete.
@@ -1988,8 +2019,8 @@
         let second = RangeBuf::from(b"world", 5, false);
         let third = RangeBuf::from(b"something", 10, false);
 
-        assert_eq!(stream.recv.push(second), Ok(()));
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(second), Ok(10));
+        assert_eq!(stream.recv.push(first), Ok(0));
         assert!(!stream.recv.almost_full());
 
         assert_eq!(stream.recv.push(third), Err(Error::FlowControl));
@@ -2005,7 +2036,7 @@
         assert!(!stream.recv.almost_full());
 
         let third = RangeBuf::from(b"something", 10, false);
-        assert_eq!(stream.recv.push(third), Ok(()));
+        assert_eq!(stream.recv.push(third), Ok(9));
     }
 
     #[test]
@@ -2016,7 +2047,7 @@
         let first = RangeBuf::from(b"hello", 0, true);
         let second = RangeBuf::from(b"world", 5, false);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
         assert_eq!(stream.recv.push(second), Err(Error::FinalSize));
     }
 
@@ -2028,8 +2059,8 @@
         let first = RangeBuf::from(b"hello", 0, true);
         let second = RangeBuf::from(b"hello", 0, true);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
-        assert_eq!(stream.recv.push(second), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
+        assert_eq!(stream.recv.push(second), Ok(0));
 
         let mut buf = [0; 32];
 
@@ -2046,7 +2077,7 @@
         let first = RangeBuf::from(b"hello", 0, true);
         let second = RangeBuf::from(b"world", 5, true);
 
-        assert_eq!(stream.recv.push(second), Ok(()));
+        assert_eq!(stream.recv.push(second), Ok(10));
         assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
     }
 
@@ -2058,7 +2089,7 @@
         let first = RangeBuf::from(b"hello", 0, true);
         let second = RangeBuf::from(b"world", 5, false);
 
-        assert_eq!(stream.recv.push(second), Ok(()));
+        assert_eq!(stream.recv.push(second), Ok(10));
         assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
     }
 
@@ -2072,8 +2103,8 @@
         let first = RangeBuf::from(b"hello", 0, false);
         let second = RangeBuf::from(b"world", 5, true);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
-        assert_eq!(stream.recv.push(second), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
+        assert_eq!(stream.recv.push(second), Ok(5));
 
         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
         assert_eq!(&buf[..len], b"helloworld");
@@ -2089,7 +2120,7 @@
 
         let first = RangeBuf::from(b"hello", 0, true);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
         assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
     }
 
@@ -2100,7 +2131,7 @@
 
         let first = RangeBuf::from(b"hello", 0, false);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
         assert_eq!(stream.recv.reset(5), Ok(0));
         assert_eq!(stream.recv.reset(5), Ok(0));
     }
@@ -2112,7 +2143,7 @@
 
         let first = RangeBuf::from(b"hello", 0, false);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
         assert_eq!(stream.recv.reset(5), Ok(0));
         assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
     }
@@ -2124,7 +2155,7 @@
 
         let first = RangeBuf::from(b"hello", 0, false);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
         assert_eq!(stream.recv.reset(4), Err(Error::FinalSize));
     }
 
@@ -2304,7 +2335,7 @@
 
         let first = RangeBuf::from(b"hello", 0, false);
 
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
 
         let mut buf = [0; 10];
 
@@ -2313,7 +2344,7 @@
         assert_eq!(fin, false);
 
         let first = RangeBuf::from(b"elloworld", 1, true);
-        assert_eq!(stream.recv.push(first), Ok(()));
+        assert_eq!(stream.recv.push(first), Ok(5));
 
         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
         assert_eq!(&buf[..len], b"world");