h3: poll with closure API
diff --git a/examples/http3-client.rs b/examples/http3-client.rs
index b4a912e..56ab6e3 100644
--- a/examples/http3-client.rs
+++ b/examples/http3-client.rs
@@ -31,6 +31,8 @@
 
 use ring::rand::*;
 
+use quiche::h3::Connection;
+
 const MAX_DATAGRAM_SIZE: usize = 1350;
 
 const USAGE: &str = "Usage:
@@ -297,58 +299,61 @@
             http3_conn = Some(h3_conn);
         }
 
+        let mut p = |h3: &mut Connection, c: &mut quiche::Connection, s, ev| {
+            let mut buf = [0; 4096];
+
+            match ev {
+                quiche::h3::Event::Headers(headers) => {
+                    info!(
+                        "got response headers {:?} on stream id {}",
+                        headers, s
+                    );
+                },
+
+                quiche::h3::Event::Data => {
+                    if let Ok(read) = h3.recv_body(c, s, &mut buf) {
+                        debug!(
+                            "got {} bytes of response data on stream {}",
+                            read, s
+                        );
+
+                        print!("{}", unsafe {
+                            std::str::from_utf8_unchecked(&buf[..read])
+                        });
+                    }
+                },
+
+                quiche::h3::Event::Finished => {
+                    reqs_complete += 1;
+
+                    debug!("{}/{} responses received", reqs_complete, reqs_count);
+
+                    if reqs_complete == reqs_count {
+                        info!(
+                            "{}/{} response(s) received in {:?}, closing...",
+                            reqs_complete,
+                            reqs_count,
+                            req_start.elapsed()
+                        );
+
+                        match c.close(true, 0x00, b"kthxbye") {
+                            // Already closed.
+                            Ok(_) | Err(quiche::Error::Done) => (),
+
+                            Err(e) => panic!("error closing conn: {:?}", e),
+                        }
+                    }
+                },
+            }
+
+            Ok(())
+        };
+
         if let Some(http3_conn) = &mut http3_conn {
             // Process HTTP/3 events.
             loop {
-                match http3_conn.poll(&mut conn) {
-                    Ok((stream_id, quiche::h3::Event::Headers(headers))) => {
-                        info!(
-                            "got response headers {:?} on stream id {}",
-                            headers, stream_id
-                        );
-                    },
-
-                    Ok((stream_id, quiche::h3::Event::Data)) => {
-                        if let Ok(read) =
-                            http3_conn.recv_body(&mut conn, stream_id, &mut buf)
-                        {
-                            debug!(
-                                "got {} bytes of response data on stream {}",
-                                read, stream_id
-                            );
-
-                            print!("{}", unsafe {
-                                std::str::from_utf8_unchecked(&buf[..read])
-                            });
-                        }
-                    },
-
-                    Ok((_stream_id, quiche::h3::Event::Finished)) => {
-                        reqs_complete += 1;
-
-                        debug!(
-                            "{}/{} responses received",
-                            reqs_complete, reqs_count
-                        );
-
-                        if reqs_complete == reqs_count {
-                            info!(
-                                "{}/{} response(s) received in {:?}, closing...",
-                                reqs_complete,
-                                reqs_count,
-                                req_start.elapsed()
-                            );
-
-                            match conn.close(true, 0x00, b"kthxbye") {
-                                // Already closed.
-                                Ok(_) | Err(quiche::Error::Done) => (),
-
-                                Err(e) => panic!("error closing conn: {:?}", e),
-                            }
-
-                            break;
-                        }
-                    },
+                match http3_conn.poll2(&mut conn, &mut p) {
+                    Ok(()) => (),
 
                     Err(quiche::h3::Error::Done) => {
                         break;
diff --git a/examples/http3-server.rs b/examples/http3-server.rs
index a352290..e588201 100644
--- a/examples/http3-server.rs
+++ b/examples/http3-server.rs
@@ -29,10 +29,16 @@
 
 use std::net;
 
+use std::cell::RefCell;
+use std::rc::Rc;
+
 use std::collections::HashMap;
 
 use ring::rand::*;
 
+use quiche::h3::Connection;
+use quiche::h3::Event;
+
 const MAX_DATAGRAM_SIZE: usize = 1350;
 
 const USAGE: &str = "Usage:
@@ -59,13 +65,18 @@
 struct Client {
     conn: Box<quiche::Connection>,
 
-    http3_conn: Option<quiche::h3::Connection>,
+    http3_conn: Option<Connection>,
 
-    partial_responses: HashMap<u64, PartialResponse>,
+    // TODO: closures in Rust can't capture disjoint struct fields, so to avoid
+    // capturing the whole struct we need "interior mutability"
+    // (see https://github.com/rust-lang/rust/issues/53488)
+    partial_responses: Rc<RefCell<PartialMap>>,
 }
 
 type ClientMap = HashMap<Vec<u8>, (net::SocketAddr, Client)>;
 
+type PartialMap = HashMap<u64, PartialResponse>;
+
 fn main() {
     let mut buf = [0; 65535];
     let mut out = [0; MAX_DATAGRAM_SIZE];
@@ -283,7 +294,7 @@
                 let client = Client {
                     conn,
                     http3_conn: None,
-                    partial_responses: HashMap::new(),
+                    partial_responses: Rc::new(RefCell::new(PartialMap::new())),
                 };
 
                 clients.insert(scid.to_vec(), (src, client));
@@ -318,7 +329,7 @@
                     client.conn.trace_id()
                 );
 
-                let h3_conn = match quiche::h3::Connection::with_transport(
+                let h3_conn = match Connection::with_transport(
                     &mut client.conn,
                     &h3_config,
                 ) {
@@ -334,6 +345,27 @@
                 client.http3_conn = Some(h3_conn);
             }
 
+            let partial = client.partial_responses.clone();
+
+            let p = |h3: &mut Connection, c: &mut quiche::Connection, s, ev| {
+                let root = args.get_str("--root");
+
+                let mut partial = partial.borrow_mut();
+
+                match ev {
+                    Event::Headers(headers) =>
+                        handle_request(h3, c, s, &headers, root, &mut partial),
+
+                    Event::Data => {
+                        info!("{} got data on stream id {}", c.trace_id(), s);
+                    },
+
+                    Event::Finished => (),
+                }
+
+                Ok(())
+            };
+
             if client.http3_conn.is_some() {
                 // Handle writable streams.
                 for stream_id in client.conn.writable() {
@@ -344,25 +376,8 @@
                 loop {
                     let http3_conn = client.http3_conn.as_mut().unwrap();
 
-                    match http3_conn.poll(client.conn.as_mut()) {
-                        Ok((stream_id, quiche::h3::Event::Headers(headers))) => {
-                            handle_request(
-                                client,
-                                stream_id,
-                                &headers,
-                                args.get_str("--root"),
-                            );
-                        },
-
-                        Ok((stream_id, quiche::h3::Event::Data)) => {
-                            info!(
-                                "{} got data on stream id {}",
-                                client.conn.trace_id(),
-                                stream_id
-                            );
-                        },
-
-                        Ok((_stream_id, quiche::h3::Event::Finished)) => (),
+                    match http3_conn.poll2(&mut client.conn, p) {
+                        Ok(()) => (),
 
                         Err(quiche::h3::Error::Done) => {
                             break;
@@ -494,12 +509,9 @@
 
 /// Handles incoming HTTP/3 requests.
 fn handle_request(
-    client: &mut Client, stream_id: u64, headers: &[quiche::h3::Header],
-    root: &str,
+    http3_conn: &mut Connection, conn: &mut quiche::Connection, stream_id: u64,
+    headers: &[quiche::h3::Header], root: &str, partial: &mut PartialMap,
 ) {
-    let conn = &mut client.conn;
-    let http3_conn = &mut client.http3_conn.as_mut().unwrap();
-
     info!(
         "{} got request {:?} on stream id {}",
         conn.trace_id(),
@@ -532,7 +544,7 @@
 
     if written < body.len() {
         let response = PartialResponse { body, written };
-        client.partial_responses.insert(stream_id, response);
+        partial.insert(stream_id, response);
     }
 }
 
@@ -593,11 +605,13 @@
 
     debug!("{} stream {} is writable", conn.trace_id(), stream_id);
 
-    if !client.partial_responses.contains_key(&stream_id) {
+    if !client.partial_responses.borrow_mut().contains_key(&stream_id) {
         return;
     }
 
-    let resp = client.partial_responses.get_mut(&stream_id).unwrap();
+    let mut partial = client.partial_responses.borrow_mut();
+
+    let resp = partial.get_mut(&stream_id).unwrap();
     let body = &resp.body[resp.written..];
 
     let written = match http3_conn.send_body(conn, stream_id, body, true) {
@@ -614,7 +628,7 @@
     resp.written += written;
 
     if resp.written == resp.body.len() {
-        client.partial_responses.remove(&stream_id);
+        client.partial_responses.borrow_mut().remove(&stream_id);
     }
 }
 
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index 975d36b..6dc01d8 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -826,6 +826,59 @@
         Err(Error::Done)
     }
 
+    pub fn poll2<F>(
+        &mut self, conn: &mut super::Connection, mut f: F,
+    ) -> Result<()>
+    where
+        F: FnMut(
+            &mut Connection,
+            &mut super::Connection,
+            u64,
+            Event,
+        ) -> Result<()>,
+    {
+        // Process control streams first.
+        if let Some(stream_id) = self.peer_control_stream_id {
+            self.process_control_stream(conn, stream_id)?;
+        }
+
+        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
+            self.process_control_stream(conn, stream_id)?;
+        }
+
+        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
+            self.process_control_stream(conn, stream_id)?;
+        }
+
+        // Process finished streams list.
+        if let Some(finished) = self.finished_streams.pop_front() {
+            f(self, conn, finished, Event::Finished)?;
+        }
+
+        // Process HTTP/3 data from readable streams.
+        for s in conn.readable() {
+            trace!("{} stream id {} is readable", conn.trace_id(), s);
+
+            let ev = match self.process_readable_stream(conn, s) {
+                Ok(v) => Some(v),
+
+                Err(Error::Done) => None,
+
+                Err(e) => return Err(e),
+            };
+
+            if conn.stream_finished(s) {
+                self.finished_streams.push_back(s);
+            }
+
+            if let Some((stream_id, ev)) = ev {
+                f(self, conn, stream_id, ev)?;
+            }
+        }
+
+        Err(Error::Done)
+    }
+
     /// Allocates a new request stream ID for the local endpoint to use.
     fn get_available_request_stream(&mut self) -> Result<u64> {
         if self.highest_request_stream_id < std::u64::MAX {