blob: 3fec8c8d15e36f0647d9ed98d8f7986bc8cd619a [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![deny(missing_docs)]
//! An unordered map of streams.
#[macro_use]
extern crate rental;
#[cfg(test)]
mod test;
use futures::{
lock::{Mutex, MutexLockFuture},
stream::{FusedStream, SelectAll},
Stream,
};
use rental::*;
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
rental! {
mod rentals {
use super::*;
#[rental]
pub struct ArcMutexWithLockFuture<T: 'static> {
store: Arc<Mutex<T>>,
lock: Option<MutexLockFuture<'store, T>>,
}
}
}
struct StreamEntry<K: 'static, St: 'static> {
key: K,
lock: rentals::ArcMutexWithLockFuture<HashMap<K, St>>,
}
impl<K: 'static, St: 'static> Unpin for StreamEntry<K, St> {}
impl<K: Copy + Eq + Hash + 'static, St: Stream + FusedStream + Unpin + 'static> Stream
for StreamEntry<K, St>
{
type Item = St::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let key = self.key;
self.lock.rent_all_mut(|store_and_lock| {
let mut lock_fut = match store_and_lock.lock.take() {
Some(lock_fut) => lock_fut,
None => store_and_lock.store.lock(),
};
match Pin::new(&mut lock_fut).poll(cx) {
Poll::Pending => {
*store_and_lock.lock = Some(lock_fut);
Poll::Pending
}
Poll::Ready(mut guard) => {
let stream = if let Some(stream) = guard.get_mut(&key) {
stream
} else {
return Poll::Ready(None);
};
if stream.is_terminated() {
guard.remove(&key);
return Poll::Ready(None);
}
match Pin::new(stream).poll_next(cx) {
Poll::Ready(None) => {
guard.remove(&key);
Poll::Ready(None)
}
poll_result => poll_result,
}
}
}
})
}
}
/// A map of streams.
///
/// When polled as a stream, this will yield the element of whichever stream has an element ready
/// first, similar to `SelectAll`.
///
/// The map will never terminate as a stream. When there are no elements, the stream will pend.
///
/// Streams added to the map may be removed or modified using the key with which they were inserted.
pub struct StreamMap<K: 'static, St: 'static> {
store: Arc<Mutex<HashMap<K, St>>>,
streams: SelectAll<StreamEntry<K, St>>,
}
impl<K: Copy + Eq + Hash + 'static, St: Stream + FusedStream + Unpin + 'static> Default
for StreamMap<K, St>
{
fn default() -> Self {
StreamMap { store: Arc::default(), streams: SelectAll::new() }
}
}
impl<K: Copy + Eq + Hash + 'static, St: Stream + FusedStream + Unpin + 'static> StreamMap<K, St> {
/// Creates a new stream map with no streams, which will pend.
pub fn new() -> Self {
Self::default()
}
/// Adds a stream to the map. It will be polled the next time the map is polled.
/// The stream may be removed or modified later using the same key.
///
/// If another stream was already present under this same key, it is returned.
pub async fn insert(&mut self, key: K, stream: St) -> Option<St> {
self.streams.push(StreamEntry {
key,
lock: rentals::ArcMutexWithLockFuture::new(self.store.clone(), |_| None),
});
self.store.lock().await.insert(key, stream)
}
/// Returns true if the map contains an entry with the given key.
pub async fn contains_key(&self, key: K) -> bool {
self.store.lock().await.contains_key(&key)
}
/// Ceases polling and moves to caller the stream added under this key, if there was one.
pub async fn remove(&self, key: K) -> Option<St> {
self.store.lock().await.remove(&key)
}
/// Executes `f` on a mutable reference to the stream added under the given key, if it exists.
/// Returns whether the function was executed.
pub async fn with_elem(&self, key: K, f: impl FnOnce(&mut St)) -> bool {
match self.store.lock().await.get_mut(&key) {
Some(e) => {
f(e);
true
}
None => false,
}
}
/// Execute `f` on an immutable reference to every stream.
pub async fn for_each_stream(&self, mut f: impl FnMut(K, &St)) {
self.store.lock().await.iter().for_each(move |(k, st)| f(*k, st));
}
/// Execute `f` on a mutable reference to every stream.
pub async fn for_each_stream_mut(&mut self, mut f: impl FnMut(K, &mut St)) {
self.store.lock().await.iter_mut().for_each(move |(k, st)| f(*k, st));
}
#[cfg(test)]
pub fn store(&self) -> Arc<Mutex<HashMap<K, St>>> {
self.store.clone()
}
}
impl<K: Copy + Eq + Hash, St: Stream + FusedStream + Unpin> Stream for StreamMap<K, St> {
type Item = St::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.streams).poll_next(cx) {
Poll::Ready(None) => Poll::Pending,
poll_result => poll_result,
}
}
}
impl<K: Copy + Eq + Hash, St: Stream + FusedStream + Unpin> FusedStream for StreamMap<K, St> {
fn is_terminated(&self) -> bool {
false
}
}