h3: add body_capacity() helper
Previously, quiche kept all details about the relationship between
stream capacity and writable DATA internal. This could lead to awkward
situations where quiche would report a stream as writable,
`quiche::conn::stream_capacity()` could report a non-zero value, but
`send_body()` might not send anything due to DATA frame overheads.
This change introduces the `quiche::h3::conn::body_capacity()` helper
function to help applications determine the amount of usable capacity.
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index efee14d..5bf2880 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -1010,7 +1010,7 @@
return Err(Error::Done);
}
- let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
+ let overhead = 1 + // DATA frame type is always 1 byte long
octets::varint_len(body.len() as u64);
let stream_cap = match conn.stream_capacity(stream_id) {
@@ -1036,8 +1036,15 @@
return Err(Error::Done);
}
+ let data_frame_capacity = stream_cap - overhead;
+
+ // Mark stream blocked if we can't send anything in the DATA frame.
+ if !body.is_empty() && data_frame_capacity == 0 {
+ let _ = conn.stream_writable(stream_id, overhead + body.len());
+ }
+
// Cap the frame payload length to the stream's capacity.
- let body_len = std::cmp::min(body.len(), stream_cap - overhead);
+ let body_len = std::cmp::min(body.len(), data_frame_capacity);
// If we can't send the entire body, set the fin flag to false so the
// application can try again later.
@@ -1072,6 +1079,55 @@
Ok(written)
}
+ /// Returns the stream's body capacity considering framing overhead.
+ ///
+ /// If the specified stream doesn't exist (including when it has already
+ /// been completed and closed), the FrameUnexpected or InvalidStreamState
+ /// error will be returned.
+ ///
+ /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
+ /// [`InvalidStreamState`]: enum.Error.html#variant.InvalidStreamState
+ pub fn body_capacity(
+ &mut self, conn: &super::Connection, stream_id: u64,
+ ) -> Result<usize> {
+ // Validate that it is sane to send data on the stream.
+ if stream_id % 4 != 0 {
+ return Err(Error::FrameUnexpected);
+ }
+
+ match self.streams.get(&stream_id) {
+ Some(s) =>
+ if !s.local_initialized() {
+ return Err(Error::FrameUnexpected);
+ },
+
+ None => {
+ return Err(Error::FrameUnexpected);
+ },
+ };
+
+ let stream_cap = match conn.stream_capacity(stream_id) {
+ Ok(v) => v,
+
+ Err(e) => {
+ if conn.stream_finished(stream_id) {
+ self.streams.remove(&stream_id);
+ }
+
+ return Err(e.into());
+ },
+ };
+
+ let overhead = 1 + // DATA frame type is always 1 byte
+ octets::varint_len(stream_cap as u64);
+
+ if stream_cap < overhead {
+ return Err(Error::StreamBlocked);
+ }
+
+ Ok(stream_cap - overhead)
+ }
+
/// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
///
/// Support is signalled by the peer's SETTINGS, so this method always
@@ -3697,6 +3753,208 @@
}
#[test]
+ /// Tests that stream and body capacity is reported correctly.
+ fn body_capacity() {
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(1000);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+
+ let mut h3_config = Config::new().unwrap();
+
+ let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
+
+ s.handshake().unwrap();
+
+ // Can't query stream or body capacity before sending request.
+ assert_eq!(
+ s.pipe.client.stream_capacity(0),
+ Err(crate::Error::InvalidStreamState(0))
+ );
+ assert_eq!(
+ s.client.body_capacity(&s.pipe.client, 0),
+ Err(Error::FrameUnexpected)
+ );
+
+ let req = vec![
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
+ ];
+
+ assert_eq!(
+ s.client.send_request(&mut s.pipe.client, &req, false),
+ Ok(0)
+ );
+
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(93));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(90));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(81));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(78));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(69));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(66));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(57));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(55));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(45));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(43));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(33));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(31));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(21));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(19));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"hellohello", false),
+ Ok(10)
+ );
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(9));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(7));
+ assert_eq!(
+ s.client
+ .send_body(&mut s.pipe.client, 0, b"waytoomuchdata", false),
+ Ok(7)
+ );
+
+ // No remaining body capacity on client.
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(0));
+ assert_eq!(
+ s.client.body_capacity(&s.pipe.client, 0),
+ Err(Error::StreamBlocked)
+ );
+ assert_eq!(
+ s.client.send_body(&mut s.pipe.client, 0, b"", true),
+ Err(Error::Done)
+ );
+
+ // On the server, can't query stream or body capacity until their open.
+ assert_eq!(
+ s.pipe.server.stream_capacity(0),
+ Err(crate::Error::InvalidStreamState(0))
+ );
+ assert_eq!(
+ s.server.body_capacity(&s.pipe.server, 0),
+ Err(Error::FrameUnexpected)
+ );
+
+ s.advance().ok();
+
+ // Server hasn't consumed stream data, so still no capacity at client.
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(0));
+ assert_eq!(
+ s.client.body_capacity(&s.pipe.client, 0),
+ Err(Error::StreamBlocked)
+ );
+
+ // Server has stream capacity but body capacity is unavailable until it
+ // responds.
+ assert_eq!(s.pipe.server.stream_capacity(0), Ok(150));
+ assert_eq!(
+ s.server.body_capacity(&s.pipe.server, 0),
+ Err(Error::FrameUnexpected)
+ );
+
+ // Server process request to free capacity.
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+ let mut recv_buf = vec![0; 100];
+
+ assert_eq!(s.poll_server(), Ok((0, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((0, Event::Data)));
+ assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(77));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Server sends a response sized to exactly available capacity.
+ let resp = s.send_response(0, false).unwrap();
+ assert_eq!(s.pipe.server.stream_capacity(0), Ok(98));
+ assert_eq!(s.server.body_capacity(&s.pipe.server, 0), Ok(95));
+
+ assert_eq!(
+ s.server
+ .send_body(&mut s.pipe.server, 0, &recv_buf[..95], true),
+ Ok(95)
+ );
+ assert_eq!(s.pipe.server.stream_capacity(0), Ok(0));
+ assert_eq!(
+ s.server.body_capacity(&s.pipe.server, 0),
+ Err(Error::StreamBlocked)
+ );
+
+ s.advance().ok();
+
+ // Once the server gives stream flow control credits back, client can send
+ // the body fin.
+ assert_eq!(s.pipe.client.stream_capacity(0), Ok(150));
+ assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(147));
+ assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
+
+ // Client processes response, no credits are given back to the fin'd
+ // stream.
+ let ev_headers = Event::Headers {
+ list: resp,
+ has_body: true,
+ };
+
+ assert_eq!(s.poll_client(), Ok((0, ev_headers)));
+
+ assert_eq!(s.poll_client(), Ok((0, Event::Data)));
+ assert_eq!(s.recv_body_client(0, &mut recv_buf), Ok(95));
+
+ assert_eq!(s.poll_client(), Ok((0, Event::Finished)));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
+ s.advance().ok();
+
+ assert_eq!(s.pipe.server.stream_capacity(0), Ok(0));
+ assert_eq!(
+ s.server.body_capacity(&s.pipe.server, 0),
+ Err(Error::StreamBlocked)
+ );
+ }
+
+ #[test]
/// Tests that receiving a H3_DATAGRAM setting is ok.
fn dgram_setting() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();