blob: 0e7633f65bc2a779f8fdf66f9fd54052e387cb44 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
async_utils::stream::{StreamMap, Tagged, WithTag},
fuchsia_bluetooth::types::{HostId, PeerId},
futures::{
future::BoxFuture,
stream::{FusedStream, FuturesUnordered, Stream},
StreamExt,
},
std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
},
};
/// A Stream of outstanding Pairing Requests, indexed by Host. When polled, the Stream impl will
/// yield the next available completed request, when ready. `T` is the response type returned on
/// completion of a request.
pub struct PairingRequests<T> {
inner: StreamMap<HostId, FuturesUnordered<Tagged<PeerId, BoxFuture<'static, T>>>>,
}
impl<T> PairingRequests<T> {
/// Create a new empty PairingRequests<T>
pub fn empty() -> PairingRequests<T> {
PairingRequests { inner: StreamMap::empty() }
}
/// Insert a new pending request future, identified by HostId and PeerId
pub fn insert(&mut self, host: HostId, peer: PeerId, request: BoxFuture<'static, T>) {
self.inner
.inner_mut()
.entry(host)
.or_insert(Box::pin(FuturesUnordered::new()))
.push(request.tagged(peer))
}
/// Remove all pending requests for a given host, returning the PeerIds of those requests
pub fn remove_host_requests(&mut self, host: HostId) -> Option<Vec<PeerId>> {
self.inner.remove(&host).map(|mut futs| futs.iter_mut().map(|f| f.tag()).collect())
}
/// Remove all pending requests, returning the PeerIds of those requests
pub fn take_all_requests(&mut self) -> HashMap<HostId, Vec<PeerId>> {
self.inner
.inner_mut()
.drain()
.map(|(host, mut futs)| (host, futs.iter_mut().map(|f| f.tag()).collect()))
.collect()
}
}
impl<T> FusedStream for PairingRequests<T> {
// PairingRequests is never terminated - a new pending request be added at any time
fn is_terminated(&self) -> bool {
false
}
}
impl<T> Stream for PairingRequests<T> {
type Item = (PeerId, T);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}