add edge-triggered variant of StreamIter APIs

Instead of simply cloning the list of streams, the `*_drain()` APIs will
also drain the internal lists, so that streams are only reported once
each time they become flushable/writable/readable/...
diff --git a/nginx/nginx-1.16.patch b/nginx/nginx-1.16.patch
index ac559c1..852dac1 100644
--- a/nginx/nginx-1.16.patch
+++ b/nginx/nginx-1.16.patch
@@ -1935,7 +1935,7 @@
 +    ngx_http_v3_stream_t      *stream;
 +    uint64_t                   stream_id;
 +
-+    writable = quiche_conn_writable(h3c->connection->quic->conn);
++    writable = quiche_conn_writable_drain(h3c->connection->quic->conn);
 +
 +    while (quiche_stream_iter_next(writable, &stream_id)) {
 +        stream = ngx_http_v3_stream_lookup(h3c, stream_id);
diff --git a/quiche/include/quiche.h b/quiche/include/quiche.h
index 74e0263..3fec4cd 100644
--- a/quiche/include/quiche.h
+++ b/quiche/include/quiche.h
@@ -374,9 +374,15 @@
 // Returns an iterator over streams that have outstanding data to read.
 quiche_stream_iter *quiche_conn_readable(const quiche_conn *conn);
 
+// Returns an iterator over streams that have outstanding data to read.
+quiche_stream_iter *quiche_conn_readable_drain(quiche_conn *conn);
+
 // Returns an iterator over streams that can be written to.
 quiche_stream_iter *quiche_conn_writable(const quiche_conn *conn);
 
+// Returns an iterator over streams that can be written to.
+quiche_stream_iter *quiche_conn_writable_drain(quiche_conn *conn);
+
 // Returns the maximum possible size of egress UDP payloads.
 size_t quiche_conn_max_send_udp_payload_size(const quiche_conn *conn);
 
diff --git a/quiche/src/ffi.rs b/quiche/src/ffi.rs
index 8261aab..ebef8e2 100644
--- a/quiche/src/ffi.rs
+++ b/quiche/src/ffi.rs
@@ -858,11 +858,25 @@
 }
 
 #[no_mangle]
+pub extern fn quiche_conn_readable_drain(
+    conn: &mut Connection,
+) -> *mut StreamIter {
+    Box::into_raw(Box::new(conn.readable_drain()))
+}
+
+#[no_mangle]
 pub extern fn quiche_conn_writable(conn: &Connection) -> *mut StreamIter {
     Box::into_raw(Box::new(conn.writable()))
 }
 
 #[no_mangle]
+pub extern fn quiche_conn_writable_drain(
+    conn: &mut Connection,
+) -> *mut StreamIter {
+    Box::into_raw(Box::new(conn.writable_drain()))
+}
+
+#[no_mangle]
 pub extern fn quiche_conn_max_send_udp_payload_size(conn: &Connection) -> usize {
     conn.max_send_udp_payload_size()
 }
diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs
index dec4904..9a4c64d 100644
--- a/quiche/src/lib.rs
+++ b/quiche/src/lib.rs
@@ -4775,6 +4775,21 @@
         self.streams.readable()
     }
 
+    /// Returns an iterator over streams that have outstanding data to read.
+    ///
+    /// This is an edge-triggered variant of [`readable()`]: streams reported
+    /// by this method will be reported again only after being "re-armed".
+    ///
+    /// For example, while calling [`readable()`] multiple times in succession
+    /// will return the same list of readable streams, `readable_drain()` will
+    /// return streams only on the first call. The application will need to
+    /// read all of the pending data on the stream, and new data has to be
+    /// received before the stream is reported again.
+    #[inline]
+    pub fn readable_drain(&mut self) -> StreamIter {
+        self.streams.readable_drain()
+    }
+
     /// Returns an iterator over streams that can be written to.
     ///
     /// A "writable" stream is a stream that has enough flow control capacity to
@@ -4817,6 +4832,27 @@
         self.streams.writable()
     }
 
+    /// Returns an iterator over streams that can be written to.
+    ///
+    /// This is an edge-triggered variant of [`writable()`]: streams reported
+    /// by this method will be reported again only after being "re-armed".
+    ///
+    /// For example, while calling [`writable()`] multiple times in succession
+    /// will return the same list of writable streams, `writable_drain()` will
+    /// return streams only on the first call. The application will need to
+    /// fill the stream's buffer and flush the data on the wire before the
+    /// stream is reported again.
+    #[inline]
+    pub fn writable_drain(&mut self) -> StreamIter {
+        // If there is not enough connection-level send capacity, none of the
+        // streams are writable, so return an empty iterator.
+        if self.tx_cap == 0 {
+            return StreamIter::default();
+        }
+
+        self.streams.writable_drain()
+    }
+
     /// Returns the maximum possible size of egress UDP payloads.
     ///
     /// This is the maximum size of UDP payloads that can be sent, and depends
diff --git a/quiche/src/stream.rs b/quiche/src/stream.rs
index 27186cb..bd028a1 100644
--- a/quiche/src/stream.rs
+++ b/quiche/src/stream.rs
@@ -549,11 +549,23 @@
         StreamIter::from(&self.readable)
     }
 
+    /// Creates an iterator over streams that have outstanding data to read, and
+    /// clears the readable streams set.
+    pub fn readable_drain(&mut self) -> StreamIter {
+        StreamIter::drain(&mut self.readable)
+    }
+
     /// Creates an iterator over streams that can be written to.
     pub fn writable(&self) -> StreamIter {
         StreamIter::from(&self.writable)
     }
 
+    /// Creates an iterator over streams that can be written to, and clears the
+    /// writable streams set.
+    pub fn writable_drain(&mut self) -> StreamIter {
+        StreamIter::drain(&mut self.writable)
+    }
+
     /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
     pub fn almost_full(&self) -> StreamIter {
         StreamIter::from(&self.almost_full)
@@ -748,6 +760,13 @@
             streams: streams.iter().copied().collect(),
         }
     }
+
+    #[inline]
+    fn drain(streams: &mut StreamIdHashSet) -> Self {
+        StreamIter {
+            streams: streams.drain().collect(),
+        }
+    }
 }
 
 impl Iterator for StreamIter {