| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use super::filter::*; |
| use crate::{proxies::player::PlayerProxyEvent, Result, SessionId, MAX_EVENTS_SENT_WITHOUT_ACK}; |
| use fidl::client::QueryResponseFut; |
| use fidl_fuchsia_media_sessions2::*; |
| use futures::{ |
| future::{self, Ready}, |
| stream::FuturesOrdered, |
| task::{Context, Poll}, |
| Sink, Stream, |
| }; |
| use std::{collections::HashSet, pin::Pin}; |
| |
| /// Implements a sink to a client implementation of `fuchsia.media.sessions2.SessionsWatcher`. |
| /// |
| /// Vends events to clients and provides back pressure when they have not ACKd already sent events. |
| pub struct FlowControlledProxySink { |
| proxy: SessionsWatcherProxy, |
| acks: FuturesOrdered<QueryResponseFut<()>>, |
| } |
| |
| impl From<SessionsWatcherProxy> for FlowControlledProxySink { |
| fn from(proxy: SessionsWatcherProxy) -> FlowControlledProxySink { |
| FlowControlledProxySink { proxy, acks: FuturesOrdered::new() } |
| } |
| } |
| |
| impl Sink<(SessionId, PlayerProxyEvent)> for FlowControlledProxySink { |
| type Error = anyhow::Error; |
| fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
| if self.acks.len() < MAX_EVENTS_SENT_WITHOUT_ACK { |
| return Poll::Ready(Ok(())); |
| } |
| |
| match Pin::new(&mut self.acks).poll_next(cx) { |
| Poll::Pending => Poll::Pending, |
| Poll::Ready(None) | Poll::Ready(Some(Ok(_))) => { |
| // We are now below the ACK limit and can send another event. |
| Poll::Ready(Ok(())) |
| } |
| Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e.into())), |
| } |
| } |
| |
| fn start_send( |
| mut self: Pin<&mut Self>, |
| (id, event): (SessionId, PlayerProxyEvent), |
| ) -> Result<()> { |
| let ack_fut = match event { |
| PlayerProxyEvent::Updated(delta) => self.proxy.session_updated(id, &delta()), |
| PlayerProxyEvent::Removed => self.proxy.session_removed(id), |
| }; |
| self.acks.push(ack_fut); |
| |
| Ok(()) |
| } |
| |
| fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
| while let Poll::Ready(Some(r)) = Pin::new(&mut self.acks).poll_next(cx) { |
| if let Err(e) = r { |
| return Poll::Ready(Err(e.into())); |
| } |
| } |
| |
| if self.acks.is_empty() { |
| Poll::Ready(Ok(())) |
| } else { |
| Poll::Pending |
| } |
| } |
| |
| fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
| self.poll_flush(cx) |
| } |
| } |
| |
| pub fn watcher_filter( |
| filter: Filter, |
| ) -> impl FnMut( |
| FilterApplicant<(SessionId, PlayerProxyEvent)>, |
| ) -> Ready<Option<(SessionId, PlayerProxyEvent)>> { |
| let mut allow_list = HashSet::new(); |
| |
| move |event| { |
| let allowed_now = filter.filter(&event); |
| |
| let (id, event) = event.applicant; |
| let allowed_before = allow_list.contains(&id); |
| |
| future::ready(if allowed_now { |
| allow_list.insert(id); |
| Some((id, event)) |
| } else if allowed_before { |
| allow_list.remove(&id); |
| |
| // The client was watching this player, so we notify them it is |
| // removed from their watch set. |
| Some((id, PlayerProxyEvent::Removed)) |
| } else { |
| None |
| }) |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use assert_matches::assert_matches; |
| use fidl::endpoints::create_endpoints; |
| use futures::{stream, Future, SinkExt, StreamExt}; |
| use futures_test::task::*; |
| use std::sync::Arc; |
| |
| #[fuchsia::test] |
| async fn back_pressure_when_acks_behind() -> Result<()> { |
| let (watcher_client, watcher_server) = create_endpoints::<SessionsWatcherMarker>(); |
| let mut under_test: FlowControlledProxySink = watcher_client.into_proxy()?.into(); |
| let mut watcher_requests = watcher_server.into_stream()?; |
| |
| let mut ctx = noop_context(); |
| let ready_when_empty = Pin::new(&mut under_test).poll_ready(&mut ctx); |
| assert_matches!(ready_when_empty, Poll::Ready(Ok(()))); |
| |
| let mut dummy_stream = stream::iter( |
| (0..MAX_EVENTS_SENT_WITHOUT_ACK).map(|_| Ok((064, PlayerProxyEvent::Removed))), |
| ); |
| let mut send_all_fut = SinkExt::send_all(&mut under_test, &mut dummy_stream); |
| let mut ack_responders = vec![]; |
| while ack_responders.len() < MAX_EVENTS_SENT_WITHOUT_ACK { |
| let _ = Pin::new(&mut send_all_fut).poll(&mut ctx); |
| match Pin::new(&mut watcher_requests).poll_next(&mut ctx) { |
| Poll::Ready(Some(Ok(responder))) => ack_responders.push( |
| responder.into_session_removed().expect("Taking out removal event we sent").1, |
| ), |
| Poll::Ready(e) => panic!("Expected request stream to continue; got {:?}", e), |
| _ => {} |
| }; |
| } |
| |
| let ready_when_full_of_acks = Pin::new(&mut under_test).poll_ready(&mut ctx); |
| assert_matches!(ready_when_full_of_acks, Poll::Pending); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test] |
| async fn player_filter() -> Result<()> { |
| let make_event = |player_id: SessionId, is_active| { |
| FilterApplicant::new( |
| WatchOptions { only_active: Some(is_active), ..Default::default() }, |
| (player_id, PlayerProxyEvent::Updated(Arc::new(|| Default::default()))), |
| ) |
| }; |
| |
| let mut dummy_stream = |
| stream::iter((0u64..4u64).map(|i| make_event(i, false))).filter_map(watcher_filter( |
| Filter::new(WatchOptions { only_active: Some(true), ..Default::default() }), |
| )); |
| |
| assert_matches!(dummy_stream.next().await, None); |
| |
| let mut dummy_stream = |
| stream::iter((0u64..4u64).map(|i| make_event(i, true))).filter_map(watcher_filter( |
| Filter::new(WatchOptions { only_active: Some(true), ..Default::default() }), |
| )); |
| |
| assert_matches!(dummy_stream.next().await, Some(_)); |
| |
| Ok(()) |
| } |
| } |