blob: 7b046fb6ead6b0d1ee349f313ddbda901e62b8d5 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::*;
use anyhow;
use fuchsia_async as fasync;
use futures::{
self,
channel::mpsc,
executor::block_on,
prelude::*,
task::{Context, Poll},
};
use futures_test::task::new_count_waker;
use maplit::hashset;
use std::collections::HashSet;
use test_util::assert_matches;
#[fasync::run_until_stalled]
#[test]
async fn ready_items_are_yielded() -> Result<(), anyhow::Error> {
let mut stream_map = StreamMap::new();
let mut expected_output = HashSet::new();
for i in 0..100usize {
let (mut sender, receiver) = mpsc::channel::<usize>(1);
sender.send(i).await?;
stream_map.insert(i, receiver).await;
expected_output.insert(i);
}
for i in 0..100usize {
let next_fut = stream_map.next();
fasync::pin_mut!(next_fut);
assert_eq!(futures::poll!(next_fut), Poll::Ready(Some(i)));
}
Ok(())
}
#[fasync::run(4)]
#[test]
async fn concurrently_ready_items_are_yielded() -> Result<(), anyhow::Error> {
let mut stream_map = StreamMap::new();
let mut expected_output = HashSet::new();
for i in 0..1000usize {
let (mut sender, receiver) = mpsc::channel::<usize>(1);
fasync::spawn(async move {
sender.send(i).await.expect("Sending message");
});
stream_map.insert(i, receiver).await;
expected_output.insert(i);
}
for _ in 0..1000usize {
assert_matches!(stream_map.next().await, Some(_));
}
let next_fut = stream_map.next();
fasync::pin_mut!(next_fut);
assert_eq!(futures::poll!(next_fut), Poll::Pending);
Ok(())
}
#[fasync::run_until_stalled]
#[test]
async fn removed_streams_are_terminated() -> Result<(), anyhow::Error> {
let mut stream_map = StreamMap::new();
let (mut sender1, receiver1) = mpsc::channel(1);
let (mut sender2, receiver2) = mpsc::channel(1);
sender1.send(1u32).await?;
sender2.send(2u32).await?;
let insert1 = stream_map.insert(1u32, receiver1).await;
let insert2 = stream_map.insert(2u32, receiver2).await;
assert_matches!(insert1, None);
assert_matches!(insert2, None);
assert_matches!(stream_map.remove(1u32).await, Some(_));
for expected in vec![Poll::Ready(Some(2)), Poll::Pending] {
let next_fut = stream_map.next();
fasync::pin_mut!(next_fut);
assert_eq!(futures::poll!(next_fut), expected);
}
Ok(())
}
#[fasync::run_until_stalled]
#[test]
async fn with_elem_has_effect() -> Result<(), anyhow::Error> {
let mut stream_map = StreamMap::new();
let (_sender, receiver) = mpsc::channel::<u32>(1);
stream_map.insert(1u32, receiver).await;
let mut run_for_present_element = false;
assert!(
stream_map
.with_elem(1u32, |_| {
run_for_present_element = true;
})
.await
);
assert!(run_for_present_element);
let mut run_for_absent_element = false;
assert!(
!stream_map
.with_elem(2u32, |_| {
run_for_absent_element = true;
})
.await
);
assert!(!run_for_absent_element);
Ok(())
}
#[test]
fn awoken_for_mutex_guard() -> Result<(), anyhow::Error> {
let mut stream_map = StreamMap::new();
let (_sender1, receiver1) = mpsc::channel::<usize>(1);
block_on(stream_map.insert(0usize, receiver1));
let (_sender2, receiver2) = mpsc::channel::<usize>(1);
block_on(stream_map.insert(0usize, receiver2));
let store_handle = stream_map.store();
let guard = block_on(store_handle.lock());
let (waker, wake_count) = new_count_waker();
let mut ctx = Context::from_waker(&waker);
let next_fut = stream_map.next();
fasync::pin_mut!(next_fut);
assert_eq!(next_fut.poll(&mut ctx), Poll::Pending);
assert_eq!(wake_count.get(), 0);
drop(guard);
assert_eq!(wake_count.get(), 1);
Ok(())
}
#[fasync::run_singlethreaded]
#[test]
async fn ended_stream_is_removed_from_map() -> Result<(), anyhow::Error> {
let mut stream_map = StreamMap::new();
let (sender, receiver) = mpsc::channel::<usize>(1);
stream_map.insert(0usize, receiver).await;
let store = stream_map.store();
let stream_count = || async { store.lock().await.len() };
let next_fut = stream_map.next();
fasync::pin_mut!(next_fut);
assert_eq!(futures::poll!(next_fut), Poll::Pending);
assert_eq!(stream_count().await, 1);
drop(sender);
let next_fut = stream_map.next();
fasync::pin_mut!(next_fut);
assert_eq!(futures::poll!(next_fut), Poll::Pending);
assert_eq!(stream_count().await, 0);
Ok(())
}
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
struct TestStream(Option<usize>);
impl FusedStream for TestStream {
fn is_terminated(&self) -> bool {
self.0.is_none()
}
}
impl Stream for TestStream {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.0.take())
}
}
#[fasync::run_singlethreaded(test)]
async fn for_each_stream() {
let mut stream_map: StreamMap<usize, TestStream> = StreamMap::new();
stream_map.insert(0, TestStream(Some(0))).await;
stream_map.insert(1, TestStream(Some(1))).await;
stream_map.insert(2, TestStream(Some(2))).await;
let mut seen = hashset! {};
stream_map
.for_each_stream(|key: usize, stream: &TestStream| {
seen.insert((key, *stream));
})
.await;
assert_eq!(
seen,
hashset! {(0, TestStream(Some(0))), (1, TestStream(Some(1))), (2, TestStream(Some(2)))}
);
}
#[fasync::run_singlethreaded(test)]
async fn for_each_stream_mut() {
let mut stream_map = StreamMap::new();
stream_map.insert(0, TestStream(Some(0))).await;
stream_map.insert(1, TestStream(Some(1))).await;
stream_map.insert(2, TestStream(Some(2))).await;
stream_map
.for_each_stream_mut(|k, v| {
*v = TestStream(Some(k + 1));
})
.await;
let mut seen = hashset! {};
stream_map
.for_each_stream(|key: usize, stream: &TestStream| {
seen.insert((key, *stream));
})
.await;
assert_eq!(
seen,
hashset! {(0, TestStream(Some(1))), (1, TestStream(Some(2))), (2, TestStream(Some(3)))}
);
}
#[fasync::run_singlethreaded(test)]
async fn contains_key() {
let mut stream_map = StreamMap::new();
stream_map.insert(0usize, TestStream(Some(0))).await;
assert!(stream_map.contains_key(0).await);
assert!(!stream_map.contains_key(1).await);
stream_map.remove(0usize).await;
assert!(!stream_map.contains_key(0).await);
}