stream: use BTreeMap in SendBuf instead of VecDeque
diff --git a/quiche/src/stream.rs b/quiche/src/stream.rs
index 9d0a446..189385f 100644
--- a/quiche/src/stream.rs
+++ b/quiche/src/stream.rs
@@ -1108,10 +1108,10 @@
#[derive(Debug, Default)]
pub struct SendBuf {
/// Chunks of data to be sent, ordered by offset.
- data: VecDeque<RangeBuf>,
+ data: BTreeMap<u64, RangeBuf>,
/// The index of the buffer that needs to be sent next.
- pos: usize,
+ pos: u64,
/// The maximum offset of data buffered in the stream.
off: u64,
@@ -1207,7 +1207,7 @@
let buf = RangeBuf::from(chunk, self.off, fin);
// The new data can simply be appended at the end of the send buffer.
- self.data.push_back(buf);
+ self.data.insert(self.off, buf);
self.off += chunk.len() as u64;
self.len += chunk.len() as u64;
@@ -1228,14 +1228,14 @@
self.off_front() == next_off &&
self.off_front() < self.max_data
{
- let buf = match self.data.get_mut(self.pos) {
+ let buf = match self.data.get_mut(&self.pos) {
Some(v) => v,
None => break,
};
if buf.is_empty() {
- self.pos += 1;
+ self.pos = buf.max_off();
continue;
}
@@ -1259,7 +1259,7 @@
break;
}
- self.pos += 1;
+ self.pos = next_off;
}
// Override the `fin` flag set for the output buffer by matching the
@@ -1306,10 +1306,10 @@
return;
}
- let mut drop_until = None;
-
// Drop contiguously acked data from the front of the buffer.
- for (i, buf) in self.data.iter_mut().enumerate() {
+ while let Some(entry) = self.data.first_entry() {
+ let buf = entry.get();
+
// Newly acked range is past highest contiguous acked range, so we
// can't drop it.
if buf.off >= ack_off {
@@ -1323,16 +1323,7 @@
}
// Newly acked range can be dropped.
- drop_until = Some(i);
- }
-
- if let Some(drop) = drop_until {
- self.data.drain(..=drop);
-
- // When a buffer is marked for retransmission, but then acked before
- // it could be retransmitted, we might end up decreasing the SendBuf
- // position too much, so make sure that doesn't happen.
- self.pos = self.pos.saturating_sub(drop + 1);
+ entry.remove();
}
}
@@ -1348,41 +1339,37 @@
return;
}
- for i in 0..self.data.len() {
- let buf = &mut self.data[i];
+ let mut pos = off;
- if buf.off >= max_off {
- break;
- }
+ while pos < max_off {
+ if let Some((_, mut buf)) = self.data.range_mut(..=pos).next_back() {
+ // Split the buffer into 2 if the retransmit range ends before the
+ // buffer's final offset.
+ let new_buf = if buf.off < max_off && max_off < buf.max_off() {
+ Some(buf.split_off((max_off - buf.off as u64) as usize))
+ } else {
+ None
+ };
- if off > buf.max_off() {
- continue;
- }
+ let prev_pos = buf.pos;
- // Split the buffer into 2 if the retransmit range ends before the
- // buffer's final offset.
- let new_buf = if buf.off < max_off && max_off < buf.max_off() {
- Some(buf.split_off((max_off - buf.off) as usize))
- } else {
- None
- };
+ // Reduce the buffer's position (expand the buffer) if the
+ // retransmit range is past the buffer's starting offset.
+ buf.pos = if off > buf.off && off <= buf.max_off() {
+ cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
+ } else {
+ buf.start
+ };
- let prev_pos = buf.pos;
+ self.pos = cmp::min(self.pos, buf.off);
- // Reduce the buffer's position (expand the buffer) if the retransmit
- // range is past the buffer's starting offset.
- buf.pos = if off > buf.off && off <= buf.max_off() {
- cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
- } else {
- buf.start
- };
+ self.len += (prev_pos - buf.pos) as u64;
- self.pos = cmp::min(self.pos, i);
+ pos = buf.max_off();
- self.len += (prev_pos - buf.pos) as u64;
-
- if let Some(b) = new_buf {
- self.data.insert(i + 1, b);
+ if let Some(b) = new_buf {
+ self.data.insert(b.off, b);
+ }
}
}
}
@@ -1442,13 +1429,12 @@
pub fn off_front(&self) -> u64 {
let mut pos = self.pos;
- // Skip empty buffers from the start of the queue.
- while let Some(b) = self.data.get(pos) {
+ while let Some(b) = self.data.get(&pos) {
if !b.is_empty() {
return b.off();
}
- pos += 1;
+ pos = b.max_off();
}
self.off