h3: add body_capacity() helper

Previously, quiche kept all details about the relationship between
stream capacity and writable DATA internal. This could lead to awkward
situations where quiche would report a stream as writable,
`quiche::conn::stream_capacity()` could report a non-zero value, but
`send_body()` might not send anything due to DATA frame overheads.

This change introduces the `quiche::h3::conn::body_capacity()` helper
function to help applications determine the amount of usable capacity.
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index efee14d..5bf2880 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -1010,7 +1010,7 @@
             return Err(Error::Done);
         }
 
-        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
+        let overhead = 1 + // DATA frame type is always 1 byte long
             octets::varint_len(body.len() as u64);
 
         let stream_cap = match conn.stream_capacity(stream_id) {
@@ -1036,8 +1036,15 @@
             return Err(Error::Done);
         }
 
+        let data_frame_capacity = stream_cap - overhead;
+
+        // Mark stream blocked if we can't send anything  in the DATA frame.
+        if !body.is_empty() && data_frame_capacity == 0 {
+            let _ = conn.stream_writable(stream_id, overhead + body.len());
+        }
+
         // Cap the frame payload length to the stream's capacity.
-        let body_len = std::cmp::min(body.len(), stream_cap - overhead);
+        let body_len = std::cmp::min(body.len(), data_frame_capacity);
 
         // If we can't send the entire body, set the fin flag to false so the
         // application can try again later.
@@ -1072,6 +1079,55 @@
         Ok(written)
     }
 
+    /// Returns the stream's body capacity considering framing overhead.
+    ///
+    /// If the specified stream doesn't exist (including when it has already
+    /// been completed and closed), the FrameUnexpected or InvalidStreamState
+    /// error will be returned.
+    ///
+    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
+    /// [`InvalidStreamState`]: enum.Error.html#variant.InvalidStreamState
+    pub fn body_capacity(
+        &mut self, conn: &super::Connection, stream_id: u64,
+    ) -> Result<usize> {
+        // Validate that it is sane to send data on the stream.
+        if stream_id % 4 != 0 {
+            return Err(Error::FrameUnexpected);
+        }
+
+        match self.streams.get(&stream_id) {
+            Some(s) =>
+                if !s.local_initialized() {
+                    return Err(Error::FrameUnexpected);
+                },
+
+            None => {
+                return Err(Error::FrameUnexpected);
+            },
+        };
+
+        let stream_cap = match conn.stream_capacity(stream_id) {
+            Ok(v) => v,
+
+            Err(e) => {
+                if conn.stream_finished(stream_id) {
+                    self.streams.remove(&stream_id);
+                }
+
+                return Err(e.into());
+            },
+        };
+
+        let overhead = 1 + // DATA frame type is always 1 byte
+                             octets::varint_len(stream_cap as u64);
+
+        if stream_cap < overhead {
+            return Err(Error::StreamBlocked);
+        }
+
+        Ok(stream_cap - overhead)
+    }
+
     /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
     ///
     /// Support is signalled by the peer's SETTINGS, so this method always
@@ -3697,6 +3753,208 @@
     }
 
     #[test]
+    /// Tests that stream and body capacity is reported correctly.
+    fn body_capacity() {
+        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+        config
+            .load_cert_chain_from_pem_file("examples/cert.crt")
+            .unwrap();
+        config
+            .load_priv_key_from_pem_file("examples/cert.key")
+            .unwrap();
+        config.set_application_protos(b"\x02h3").unwrap();
+        config.set_initial_max_data(1000);
+        config.set_initial_max_stream_data_bidi_local(150);
+        config.set_initial_max_stream_data_bidi_remote(150);
+        config.set_initial_max_stream_data_uni(150);
+        config.set_initial_max_streams_bidi(100);
+        config.set_initial_max_streams_uni(5);
+        config.verify_peer(false);
+
+        let mut h3_config = Config::new().unwrap();
+
+        let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
+
+        s.handshake().unwrap();
+
+        // Can't query stream or body capacity before sending request.
+        assert_eq!(
+            s.pipe.client.stream_capacity(0),
+            Err(crate::Error::InvalidStreamState(0))
+        );
+        assert_eq!(
+            s.client.body_capacity(&s.pipe.client, 0),
+            Err(Error::FrameUnexpected)
+        );
+
+        let req = vec![
+            Header::new(b":method", b"GET"),
+            Header::new(b":scheme", b"https"),
+            Header::new(b":authority", b"quic.tech"),
+            Header::new(b":path", b"/test"),
+        ];
+
+        assert_eq!(
+            s.client.send_request(&mut s.pipe.client, &req, false),
+            Ok(0)
+        );
+
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(93));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(90));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(81));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(78));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(69));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(66));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(57));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(55));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(45));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(43));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(33));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(31));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(21));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(19));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+            Ok(10)
+        );
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(9));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(7));
+        assert_eq!(
+            s.client
+                .send_body(&mut s.pipe.client, 0, b"waytoomuchdata", false),
+            Ok(7)
+        );
+
+        // No remaining body capacity on client.
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(0));
+        assert_eq!(
+            s.client.body_capacity(&s.pipe.client, 0),
+            Err(Error::StreamBlocked)
+        );
+        assert_eq!(
+            s.client.send_body(&mut s.pipe.client, 0, b"", true),
+            Err(Error::Done)
+        );
+
+        // On the server, can't query stream or body capacity until their open.
+        assert_eq!(
+            s.pipe.server.stream_capacity(0),
+            Err(crate::Error::InvalidStreamState(0))
+        );
+        assert_eq!(
+            s.server.body_capacity(&s.pipe.server, 0),
+            Err(Error::FrameUnexpected)
+        );
+
+        s.advance().ok();
+
+        // Server hasn't consumed stream data, so still no capacity at client.
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(0));
+        assert_eq!(
+            s.client.body_capacity(&s.pipe.client, 0),
+            Err(Error::StreamBlocked)
+        );
+
+        // Server has stream capacity but body capacity is unavailable until it
+        // responds.
+        assert_eq!(s.pipe.server.stream_capacity(0), Ok(150));
+        assert_eq!(
+            s.server.body_capacity(&s.pipe.server, 0),
+            Err(Error::FrameUnexpected)
+        );
+
+        // Server process request to free capacity.
+        let ev_headers = Event::Headers {
+            list: req,
+            has_body: true,
+        };
+        let mut recv_buf = vec![0; 100];
+
+        assert_eq!(s.poll_server(), Ok((0, ev_headers)));
+        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
+        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(77));
+        assert_eq!(s.poll_server(), Err(Error::Done));
+
+        // Server sends a response sized to exactly available capacity.
+        let resp = s.send_response(0, false).unwrap();
+        assert_eq!(s.pipe.server.stream_capacity(0), Ok(98));
+        assert_eq!(s.server.body_capacity(&s.pipe.server, 0), Ok(95));
+
+        assert_eq!(
+            s.server
+                .send_body(&mut s.pipe.server, 0, &recv_buf[..95], true),
+            Ok(95)
+        );
+        assert_eq!(s.pipe.server.stream_capacity(0), Ok(0));
+        assert_eq!(
+            s.server.body_capacity(&s.pipe.server, 0),
+            Err(Error::StreamBlocked)
+        );
+
+        s.advance().ok();
+
+        // Once the server gives stream flow control credits back, client can send
+        // the body fin.
+        assert_eq!(s.pipe.client.stream_capacity(0), Ok(150));
+        assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(147));
+        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
+
+        // Client processes response, no credits are given back to the fin'd
+        // stream.
+        let ev_headers = Event::Headers {
+            list: resp,
+            has_body: true,
+        };
+
+        assert_eq!(s.poll_client(), Ok((0, ev_headers)));
+
+        assert_eq!(s.poll_client(), Ok((0, Event::Data)));
+        assert_eq!(s.recv_body_client(0, &mut recv_buf), Ok(95));
+
+        assert_eq!(s.poll_client(), Ok((0, Event::Finished)));
+        assert_eq!(s.poll_client(), Err(Error::Done));
+
+        s.advance().ok();
+
+        assert_eq!(s.pipe.server.stream_capacity(0), Ok(0));
+        assert_eq!(
+            s.server.body_capacity(&s.pipe.server, 0),
+            Err(Error::StreamBlocked)
+        );
+    }
+
+    #[test]
     /// Tests that receiving a H3_DATAGRAM setting is ok.
     fn dgram_setting() {
         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();