h3: avoid marking a stream as finished multiple times

If an application keeps calling `recv_body()` on a stream that is
already finished, we end up adding it to the finished streams queue
multiple times.

The application shouldn't really do this, but do the sane thing if it
does happen.
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index bd50549..59813c8 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -1174,7 +1174,7 @@
         // While body is being received, the stream is marked as finished only
         // when all data is read by the application.
         if conn.stream_finished(stream_id) {
-            self.finished_streams.push_back(stream_id);
+            self.process_finished_stream(stream_id);
         }
 
         if total == 0 {
@@ -1799,6 +1799,8 @@
 
                     break;
                 },
+
+                stream::State::Finished => break,
             }
         }
 
@@ -1806,15 +1808,22 @@
     }
 
     fn process_finished_stream(&mut self, stream_id: u64) {
-        let stream = match self.streams.get(&stream_id) {
+        let stream = match self.streams.get_mut(&stream_id) {
             Some(v) => v,
 
             None => return,
         };
 
+        if stream.state() == stream::State::Finished {
+            return;
+        }
+
         match stream.ty() {
-            Some(stream::Type::Request) | Some(stream::Type::Push) =>
-                self.finished_streams.push_back(stream_id),
+            Some(stream::Type::Request) | Some(stream::Type::Push) => {
+                stream.finished();
+
+                self.finished_streams.push_back(stream_id);
+            },
 
             _ => (),
         };
@@ -4121,6 +4130,32 @@
     }
 
     #[test]
+    /// Tests that streams are marked as finished only once.
+    fn finished_once() {
+        let mut s = Session::default().unwrap();
+        s.handshake().unwrap();
+
+        let (stream, req) = s.send_request(false).unwrap();
+        let body = s.send_body_client(stream, true).unwrap();
+
+        let mut recv_buf = vec![0; body.len()];
+
+        let ev_headers = Event::Headers {
+            list: req,
+            has_body: true,
+        };
+
+        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+
+        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+
+        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
+        assert_eq!(s.poll_server(), Err(Error::Done));
+    }
+
+    #[test]
     /// Tests that the Data event is properly re-armed.
     fn data_event_rearm() {
         let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
diff --git a/src/h3/stream.rs b/src/h3/stream.rs
index b8439ed..58e2873 100644
--- a/src/h3/stream.rs
+++ b/src/h3/stream.rs
@@ -73,6 +73,9 @@
 
     /// Reading and discarding data.
     Drain,
+
+    /// All data has been read.
+    Finished,
 }
 
 impl Type {
@@ -487,6 +490,11 @@
         Ok((len, fin))
     }
 
+    /// Marks the stream as finished.
+    pub fn finished(&mut self) {
+        let _ = self.state_transition(State::Finished, 0, false);
+    }
+
     /// Tries to read DATA payload from the given cursor.
     ///
     /// This is intended to replace `try_consume_data()` in tests, in order to