blob: d8fea0c4a0077e913780e6c9f414cfb03c4c3f75 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fidl_fuchsia_diagnostics::StreamMode;
use futures::prelude::*;
// we use parking_lot here instead of futures::lock because Cursor::get_next is recursive
use parking_lot::Mutex;
use std::{
collections::VecDeque,
default::Default,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
};
use tracing::{trace, warn};
/// A list that can be iterated despite concurrent insertions and deletions.
pub struct ArcList<T> {
inner: Arc<Mutex<InnerArcList<T>>>,
}
impl<T> Default for ArcList<T> {
fn default() -> Self {
Self { inner: Arc::new(Mutex::new(Default::default())) }
}
}
impl<T> ArcList<T> {
pub fn is_empty(&self) -> bool {
self.inner.lock().items.is_empty()
}
pub fn push_back(&self, item: T) {
self.inner.lock().push_back(item);
}
pub fn pop_front(&self) -> Option<Arc<T>> {
self.inner.lock().pop_front()
}
pub fn peek_front(&self) -> Option<Arc<T>> {
self.inner.lock().peek_front().map(|item| item.value)
}
pub fn cursor(&self, mode: StreamMode) -> Cursor<T> {
let id = self.inner.lock().new_cursor_id();
Cursor::new(id, self.clone(), mode)
}
/// End the stream, ignoring new values and causing Cursors to return None after the current ID.
pub fn terminate(&self) {
self.inner.lock().terminate();
}
#[cfg(test)]
pub fn final_entry(&self) -> u64 {
self.inner.lock().final_entry
}
}
impl<T> Clone for ArcList<T> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
struct ArcListItem<T> {
id: u64,
value: Arc<T>,
}
impl<T> Clone for ArcListItem<T> {
fn clone(&self) -> Self {
Self { id: self.id, value: self.value.clone() }
}
}
struct InnerArcList<T> {
/// Map from entries_seen at the point that the entry is added to the list, to the value with
/// that id.
items: VecDeque<ArcListItem<T>>,
/// The number of entries ever inserted into the list.
entries_seen: u64,
/// The number of entries ever removed from the list.
entries_popped: u64,
/// The last entry this list will yield.
final_entry: u64,
/// The next cursor this list will yield.
next_cursor_id: CursorId,
/// Wakers from subscribed cursors blocked on their next message.
pending_cursors: Vec<(CursorId, Waker)>,
}
impl<T> Default for InnerArcList<T> {
fn default() -> Self {
Self {
items: VecDeque::new(),
entries_seen: 0,
entries_popped: 0,
final_entry: std::u64::MAX,
next_cursor_id: CursorId::default(),
pending_cursors: Vec::new(),
}
}
}
impl<T> InnerArcList<T> {
fn push_back(&mut self, item: T) {
self.entries_seen += 1;
if self.entries_seen > self.final_entry {
warn!("push_back() must not be called after terminate()");
return;
}
let id = self.entries_seen;
self.items.push_back(ArcListItem { id, value: Arc::new(item) });
self.wake_pending();
}
fn pop_front(&mut self) -> Option<Arc<T>> {
self.items.pop_front().map(|item| {
self.entries_popped += 1;
item.value
})
}
fn peek_front(&self) -> Option<ArcListItem<T>> {
self.items.front().cloned()
}
fn first_starting_at(&self, id: u64) -> Option<ArcListItem<T>> {
self.items.front().and_then(|front_item| {
if front_item.id < id {
let index = id - front_item.id;
self.items.get(index as usize).cloned()
} else {
Some(front_item.clone())
}
})
}
fn new_cursor_id(&mut self) -> CursorId {
let new_next = CursorId(self.next_cursor_id.0 + 1);
std::mem::replace(&mut self.next_cursor_id, new_next)
}
fn wake_pending(&mut self) {
for (id, waker) in self.pending_cursors.drain(..) {
trace!("Waking {:?} for entry {}.", id, self.entries_seen);
waker.wake();
}
}
fn terminate(&mut self) {
tracing::debug!("terminating buffer");
self.final_entry = self.entries_seen;
self.wake_pending();
}
}
/// A unique identifier for each cursor used to manage wakers.
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq, PartialOrd, Ord)]
struct CursorId(u64);
/// A weak pointer into the buffer which is being concurrently iterated and modified.
///
/// A cursor iterates over nodes in the list, holding the id of the last node seen.
/// If that id is less than the first id found in the deque, the cursor starts at the beginning of
/// the list and first returns a count of items rolled out.
///
/// The count is maintained by giving each successive item in a list a monotonically increasing ID
/// and tracking the "high-water mark" of the largest/last ID seen.
///
/// # Modes
///
/// These IDs are also how we express snapshotting vs. subscribing. The mode determines the minimum
/// and maximum IDs the cursor will yield.
///
/// # Wraparound
///
/// IDs are stored as `u64` and the list would need to receive a new entry once every nanosecond
/// for ~580 consecutive years before ID allocations will wrap around. This eventuality is *not*
/// accounted for in the implementation. If you're reading this after observing a bug due to that,
/// congratulations.
pub struct Cursor<T> {
id: CursorId,
last_id_seen: Option<u64>,
until_id: u64,
list: ArcList<T>,
mode: StreamMode,
}
impl<T> Cursor<T> {
/// Construct a new cursor into the logs buffer. The `mode` passed determines the range over
/// which the cursor operates:
///
/// | mode | first ID yielded | last ID yielded |
/// |-----------|-------------------------|-------------------------|
/// | snapshot | 0 | max at time of snapshot |
/// | subscribe | max at time of snapshot | max ID possible |
/// | both | 0 | max ID possible |
fn new(id: CursorId, list: ArcList<T>, mode: StreamMode) -> Self {
let from = match mode {
StreamMode::Snapshot | StreamMode::SnapshotThenSubscribe => None,
StreamMode::Subscribe => {
let inner = list.inner.lock();
Some(inner.entries_seen)
}
};
let to = match mode {
StreamMode::Snapshot => list.inner.lock().entries_seen,
StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => std::u64::MAX,
};
Self { id, list, last_id_seen: from, until_id: to, mode }
}
fn maybe_register_for_wakeup(&mut self, cx: &mut Context<'_>) -> Poll<Option<LazyItem<T>>> {
let mut root = self.list.inner.lock();
let cursor_at_end = self.last_id_seen.unwrap_or(0) == root.final_entry;
let list_fully_drained = root.final_entry == root.entries_popped;
if root.entries_popped > self.last_id_seen.unwrap_or(0) {
// This happens when entries were popped before they could be returned,
// but there is not currently anything left to return. We need
// to update our position and return the number of entries that
// were popped.
let entries_missing = root.entries_popped - self.last_id_seen.unwrap_or(0);
self.last_id_seen = Some(root.entries_popped);
Poll::Ready(Some(LazyItem::ItemsRolledOut(entries_missing)))
} else if cursor_at_end || list_fully_drained {
trace!("{:?} has reached the end of the terminated stream.", self.id);
Poll::Ready(None)
} else if self.mode == StreamMode::Snapshot {
// There are no further entries to return, and we are in snapshot mode. Report that we
// are at the end of the stream rather than registering for a wakeup.
Poll::Ready(None)
} else {
trace!("Registering {:?} for wakeup.", self.id);
root.pending_cursors.push((self.id, cx.waker().clone()));
root.pending_cursors.sort_by_key(|&(id, _)| id);
root.pending_cursors.dedup_by_key(|&mut (id, _)| id);
Poll::Pending
}
}
}
impl<T> Stream for Cursor<T> {
type Item = LazyItem<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
trace!("{:?} polled.", self.id);
if self.last_id_seen.unwrap_or(0) >= self.until_id {
return Poll::Ready(None);
}
let next = match self.last_id_seen {
Some(id) => self.list.inner.lock().first_starting_at(id + 1),
None => {
// otherwise start again at the head
trace!("{:?} starting from the head of the list.", self.id);
self.list.inner.lock().peek_front()
}
};
if let Some(to_return) = next {
if to_return.id > self.until_id {
self.last_id_seen = Some(to_return.id);
// we're past the end of this cursor's valid range
trace!("{:?} is done.", self.id);
return Poll::Ready(None);
}
// the number we missed is equal to the difference between
// the last ID we saw and the ID *just before* the current value
let num_missed = (to_return.id - 1) - self.last_id_seen.unwrap_or(0);
let item = if num_missed > 0 {
// advance the cursor's high-water mark by the number we missed
// so we only report each rolled out item once
trace!("{:?} reporting {} missed items.", self.id, num_missed);
self.last_id_seen = Some(self.last_id_seen.unwrap_or(0) + num_missed);
LazyItem::ItemsRolledOut(num_missed)
} else {
// we haven't missed anything, proceed normally
trace!("{:?} yielding item {}.", self.id, to_return.id);
self.last_id_seen = Some(to_return.id);
LazyItem::Next(to_return.value)
};
Poll::Ready(Some(item))
} else {
// No further data is waiting in the stream. Depending on the stream mode, we may
// register for a wakeup.
self.maybe_register_for_wakeup(cx)
}
}
}
impl<T> std::fmt::Debug for Cursor<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Cursor")
.field("id", &self.id)
.field("last_id_seen", &self.last_id_seen)
.field("until_id", &self.until_id)
.finish()
}
}
/// The next element in the stream or a marker of the number of items rolled out since last polled.
#[derive(Debug, PartialEq)]
pub enum LazyItem<T> {
/// The next item in the stream.
Next(Arc<T>),
/// A count of the items dropped between the last call to poll_next and this one.
ItemsRolledOut(u64),
}
#[cfg(test)]
mod tests {
use super::*;
use futures::poll;
use std::{fmt::Debug, task::Poll};
impl<T: Debug> LazyItem<T> {
#[track_caller]
fn unwrap(self) -> Arc<T> {
match self {
LazyItem::Next(i) => i,
LazyItem::ItemsRolledOut(n) => panic!("{} unexpected rolled out items in test", n),
}
}
#[track_caller]
fn expect_rolled_out(self, expected: u64) {
match self {
LazyItem::Next(i) => {
panic!("expected {} rolled out items, found Next({:#?})", expected, i)
}
LazyItem::ItemsRolledOut(n) => {
assert_eq!(n, expected, "wrong number of rolled out items")
}
}
}
}
#[fuchsia::test]
fn list_is_empty() {
let list = ArcList::default();
assert!(list.is_empty());
list.push_back(1);
assert!(!list.is_empty());
list.pop_front();
assert!(list.is_empty());
}
#[fuchsia::test]
fn list_peek_front() {
let list = ArcList::default();
assert!(list.peek_front().is_none());
list.push_back(1);
list.push_back(2);
assert_eq!(*list.peek_front().unwrap(), 1);
list.pop_front();
assert_eq!(*list.peek_front().unwrap(), 2);
list.pop_front();
assert!(list.peek_front().is_none());
}
#[fuchsia::test]
async fn subscription_delivered_without_drops() {
let list = ArcList::default();
let mut early_sub = list.cursor(StreamMode::Subscribe);
let mut first_from_early_sub = early_sub.next();
assert_eq!(poll!(&mut first_from_early_sub), Poll::Pending);
list.push_back(1);
assert_eq!(*first_from_early_sub.await.unwrap().unwrap(), 1);
// this subscription starts after the 1 we just pushed
let mut middle_sub = list.cursor(StreamMode::Subscribe);
let (mut second_from_early_sub, mut second_from_middle_sub) =
(early_sub.next(), middle_sub.next());
assert_eq!(poll!(&mut second_from_early_sub), Poll::Pending);
assert_eq!(poll!(&mut second_from_middle_sub), Poll::Pending);
list.push_back(2);
assert_eq!(*second_from_early_sub.await.unwrap().unwrap(), 2);
assert_eq!(*second_from_middle_sub.await.unwrap().unwrap(), 2);
let mut late_sub = list.cursor(StreamMode::Subscribe);
let (mut third_from_early_sub, mut third_from_middle_sub, mut third_from_late_sub) =
(early_sub.next(), middle_sub.next(), late_sub.next());
assert_eq!(poll!(&mut third_from_early_sub), Poll::Pending);
assert_eq!(poll!(&mut third_from_middle_sub), Poll::Pending);
assert_eq!(poll!(&mut third_from_late_sub), Poll::Pending);
list.push_back(3);
assert_eq!(*third_from_early_sub.await.unwrap().unwrap(), 3);
assert_eq!(*third_from_middle_sub.await.unwrap().unwrap(), 3);
assert_eq!(*third_from_late_sub.await.unwrap().unwrap(), 3);
let mut nop_sub = list.cursor(StreamMode::Subscribe);
let (
mut fourth_from_early_sub,
mut fourth_from_middle_sub,
mut fourth_from_late_sub,
mut fourth_from_nop_sub,
) = (early_sub.next(), middle_sub.next(), late_sub.next(), nop_sub.next());
assert_eq!(poll!(&mut fourth_from_early_sub), Poll::Pending);
assert_eq!(poll!(&mut fourth_from_middle_sub), Poll::Pending);
assert_eq!(poll!(&mut fourth_from_late_sub), Poll::Pending);
assert_eq!(poll!(&mut fourth_from_nop_sub), Poll::Pending);
list.terminate();
assert_eq!(fourth_from_early_sub.await, None);
assert_eq!(fourth_from_middle_sub.await, None);
assert_eq!(fourth_from_late_sub.await, None);
assert_eq!(fourth_from_nop_sub.await, None);
}
#[fuchsia::test]
async fn snapshot_delivered_without_drops() {
let list = ArcList::default();
let mut dead_cursor = list.cursor(StreamMode::Snapshot);
assert_eq!(dead_cursor.next().await, None, "no items in the list");
list.push_back(1);
list.push_back(2);
list.push_back(3);
let mut middle_cursor = list.cursor(StreamMode::Snapshot);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 1);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 2);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 3);
assert_eq!(dead_cursor.next().await, None, "no items in the list at snapshot");
assert_eq!(dead_cursor.next().await, None, "no items in list at snapshot");
assert_eq!(middle_cursor.next().await, None, "no items left in list");
list.push_back(4);
list.push_back(5);
assert_eq!(dead_cursor.next().await, None, "no items in list at snapshot");
assert_eq!(middle_cursor.next().await, None, "no items left in list at snapshot");
let mut full_cursor = list.cursor(StreamMode::Snapshot);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 1);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 2);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 3);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 4);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 5);
assert_eq!(full_cursor.next().await, None, "no items left");
assert_eq!(dead_cursor.next().await, None, "no items in list at snapshot");
assert_eq!(middle_cursor.next().await, None, "no items left in list at snapshot");
}
#[fuchsia::test]
async fn snapshot_then_subscribe_gets_before_and_after() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
let mut middle_cursor = list.cursor(StreamMode::SnapshotThenSubscribe);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 1);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 2);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 3);
list.push_back(4);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 4);
list.push_back(5);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 5);
list.push_back(6);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 6);
list.terminate();
assert_eq!(middle_cursor.next().await, None);
}
#[fuchsia::test]
async fn subscription_drops_are_counted() {
let list: ArcList<i32> = ArcList::default();
let mut early_cursor = list.cursor(StreamMode::Subscribe);
list.push_back(1);
list.push_back(2);
list.push_back(3);
assert_eq!(*list.pop_front().unwrap(), 1);
early_cursor.next().await.unwrap().expect_rolled_out(1);
assert_eq!(*early_cursor.next().await.unwrap().unwrap(), 2);
let mut middle_cursor = list.cursor(StreamMode::Subscribe);
assert_eq!(*list.pop_front().unwrap(), 2);
assert_eq!(*list.pop_front().unwrap(), 3);
list.push_back(4);
list.push_back(5);
early_cursor.next().await.unwrap().expect_rolled_out(1);
assert_eq!(*early_cursor.next().await.unwrap().unwrap(), 4);
assert_eq!(*early_cursor.next().await.unwrap().unwrap(), 5);
assert_eq!(*list.pop_front().unwrap(), 4);
middle_cursor.next().await.unwrap().expect_rolled_out(1);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 5);
list.terminate();
assert_eq!(middle_cursor.next().await, None);
}
#[fuchsia::test]
async fn snapshot_drops_are_counted() {
let list: ArcList<i32> = ArcList::default();
let mut dead_cursor = list.cursor(StreamMode::Snapshot);
assert!(dead_cursor.next().await.is_none(), "no items in the list");
list.push_back(1);
list.push_back(2);
list.push_back(3);
list.push_back(4);
list.push_back(5);
let mut middle_cursor = list.cursor(StreamMode::Snapshot);
list.pop_front();
list.pop_front();
middle_cursor.next().await.unwrap().expect_rolled_out(2);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 3);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 4);
assert_eq!(*middle_cursor.next().await.unwrap().unwrap(), 5);
assert!(dead_cursor.next().await.is_none(), "no items in list at snapshot");
assert!(middle_cursor.next().await.is_none(), "no items left in list");
let mut full_cursor = list.cursor(StreamMode::Snapshot);
full_cursor.next().await.unwrap().expect_rolled_out(2);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 3);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 4);
assert_eq!(*full_cursor.next().await.unwrap().unwrap(), 5);
assert!(full_cursor.next().await.is_none(), "no items left");
assert!(dead_cursor.next().await.is_none(), "no items in list at snapshot");
assert!(middle_cursor.next().await.is_none(), "no items left in list at snapshot");
}
#[fuchsia::test]
async fn post_termination_cursors() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
list.push_back(4);
list.push_back(5);
list.terminate();
let snapshot: Vec<_> =
list.cursor(StreamMode::Snapshot).map(|i| *i.unwrap()).collect().await;
let subscribe: Vec<_> =
list.cursor(StreamMode::Subscribe).map(|i| *i.unwrap()).collect().await;
let both: Vec<_> =
list.cursor(StreamMode::SnapshotThenSubscribe).map(|i| *i.unwrap()).collect().await;
assert_eq!(snapshot, vec![1, 2, 3, 4, 5]);
assert!(subscribe.is_empty());
assert_eq!(both, vec![1, 2, 3, 4, 5]);
}
#[fuchsia::test]
async fn empty_post_termination_cursors() {
let list: ArcList<i32> = ArcList::default();
list.terminate();
let snapshot: Vec<_> =
list.cursor(StreamMode::Snapshot).map(|i| *i.unwrap()).collect().await;
let subscribe: Vec<_> =
list.cursor(StreamMode::Subscribe).map(|i| *i.unwrap()).collect().await;
let both: Vec<_> =
list.cursor(StreamMode::SnapshotThenSubscribe).map(|i| *i.unwrap()).collect().await;
assert!(snapshot.is_empty());
assert!(subscribe.is_empty());
assert!(both.is_empty());
}
#[fuchsia::test]
async fn drained_post_termination_cursors() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
list.push_back(4);
list.push_back(5);
list.terminate();
list.pop_front();
list.pop_front();
list.pop_front();
list.pop_front();
list.pop_front();
let mut snapshot_cursor = list.cursor(StreamMode::Snapshot);
snapshot_cursor.next().await.unwrap().expect_rolled_out(5);
let snapshot: Vec<_> = snapshot_cursor.map(|i| *i.unwrap()).collect().await;
let subscribe: Vec<_> =
list.cursor(StreamMode::Subscribe).map(|i| *i.unwrap()).collect().await;
let mut both_cursor = list.cursor(StreamMode::Snapshot);
both_cursor.next().await.unwrap().expect_rolled_out(5);
let both: Vec<_> = both_cursor.map(|i| *i.unwrap()).collect().await;
assert!(snapshot.is_empty());
assert!(subscribe.is_empty());
assert!(both.is_empty());
}
#[fuchsia::test]
async fn snapshot_does_not_hang_when_nothing_is_left() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
list.push_back(4);
list.push_back(5);
list.pop_front();
list.pop_front();
list.pop_front();
list.pop_front();
list.pop_front();
let mut cursor = list.cursor(StreamMode::Snapshot);
cursor.next().await.unwrap().expect_rolled_out(5);
let snapshot: Vec<_> = cursor.map(|i| *i.unwrap()).collect().await;
assert!(snapshot.is_empty());
}
#[fuchsia::test]
async fn snapshot_does_not_hang_when_values_are_popped_before_start() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
list.push_back(4);
list.push_back(5);
list.pop_front();
list.pop_front();
list.pop_front();
let mut cursor = list.cursor(StreamMode::Snapshot);
cursor.next().await.expect("initial value exists").expect_rolled_out(3);
let snapshot: Vec<_> = cursor.map(|i| *i.unwrap()).collect().await;
assert_eq!(2, snapshot.len());
}
#[fuchsia::test]
async fn popping_more_elements_than_exist_does_not_break_readers() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
let mut cursor = list.cursor(StreamMode::SnapshotThenSubscribe);
assert_eq!(*cursor.next().await.unwrap().unwrap(), 1);
assert_eq!(*cursor.next().await.unwrap().unwrap(), 2);
assert_eq!(*cursor.next().await.unwrap().unwrap(), 3);
let mut next = cursor.next();
assert_eq!(poll!(&mut next), Poll::Pending);
list.pop_front();
list.pop_front();
list.pop_front();
list.pop_front();
list.pop_front();
let mut next = cursor.next();
assert_eq!(poll!(&mut next), Poll::Pending);
list.push_back(4);
assert_eq!(*cursor.next().await.unwrap().unwrap(), 4);
}
#[fuchsia::test]
async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
let list: ArcList<i32> = ArcList::default();
list.push_back(1);
list.push_back(2);
list.push_back(3);
list.pop_front();
list.pop_front();
list.pop_front();
let mut cursor = list.cursor(StreamMode::SnapshotThenSubscribe);
cursor.next().await.unwrap().expect_rolled_out(3);
let mut next = cursor.next();
assert_eq!(poll!(&mut next), Poll::Pending);
list.push_back(4);
list.pop_front();
cursor.next().await.unwrap().expect_rolled_out(1);
let mut next = cursor.next();
assert_eq!(poll!(&mut next), Poll::Pending);
list.terminate();
let snapshot: Vec<_> = cursor.map(|i| *i.unwrap()).collect().await;
assert!(snapshot.is_empty());
}
}