| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Implementation for the event emitting client end of the AccountListener FIDL interface, |
| //! sending events to listeners, optionally configured with filters, about changes in the accounts |
| //! presence and states during their lifetime. |
| |
| use account_common::{AccountAuthState, FidlAccountAuthState, LocalAccountId}; |
| use failure::Error; |
| use fidl_fuchsia_auth_account::{AccountListenerOptions, AccountListenerProxy}; |
| use fuchsia_inspect::vmo::{Metric, Node}; |
| use futures::future::*; |
| use futures::lock::Mutex; |
| use std::pin::Pin; |
| |
| use crate::inspect; |
| |
| /// Events emitted on account listeners |
| pub enum AccountEvent { |
| /// AccountAdded is emitted after an account has been added. |
| AccountAdded(LocalAccountId), |
| |
| /// AccountRemoved is emitted after an account has been removed. |
| AccountRemoved(LocalAccountId), |
| |
| /// AuthStateChanged is emitted after an account's auth state has changed. |
| #[allow(dead_code)] |
| AuthStateChanged(AccountAuthState), |
| } |
| |
| /// The client end of an account listener. |
| struct Client { |
| listener: AccountListenerProxy, |
| options: AccountListenerOptions, |
| } |
| |
| impl Client { |
| /// Create a new client, given the listener's client end and options used for filtering. |
| fn new(listener: AccountListenerProxy, options: AccountListenerOptions) -> Self { |
| Self { listener, options } |
| } |
| |
| fn should_send(&self, event: &AccountEvent) -> bool { |
| match event { |
| AccountEvent::AccountAdded(_) => self.options.add_account, |
| AccountEvent::AccountRemoved(_) => self.options.remove_account, |
| AccountEvent::AuthStateChanged(_) => false, // TODO: Implement |
| } |
| } |
| |
| fn send<'a>( |
| &'a self, |
| event: &'a AccountEvent, |
| ) -> impl Future<Output = Result<(), fidl::Error>> { |
| match event { |
| AccountEvent::AccountAdded(id) => { |
| self.listener.on_account_added(&mut id.clone().into()) |
| } |
| AccountEvent::AccountRemoved(id) => { |
| self.listener.on_account_removed(&mut id.clone().into()) |
| } |
| AccountEvent::AuthStateChanged(account_auth_state) => { |
| self.listener.on_auth_state_changed(&mut (&account_auth_state.clone()).into()) |
| } |
| } |
| } |
| |
| /// Emit a given event on the channel if it passes the filter in options. Returns a future |
| /// which will return once the listening end has responded. |
| fn possibly_send<'a>( |
| &'a self, |
| event: &'a AccountEvent, |
| ) -> impl Future<Output = Result<(), fidl::Error>> { |
| if self.should_send(&event) { |
| FutureObj::new(Box::pin(self.send(event))) |
| } else { |
| FutureObj::new(Box::pin(ok(()))) |
| } |
| } |
| } |
| |
| /// A type to maintain the set of account listener clients and distribute events to them. |
| pub struct AccountEventEmitter { |
| /// Collection of account listener clients. |
| clients: Mutex<Vec<Client>>, |
| |
| /// Helper for outputting listener information via fuchsia_inspect. |
| inspect: inspect::Listeners, |
| } |
| |
| impl AccountEventEmitter { |
| /// Create a new emitter with no clients. |
| pub fn new(parent: &Node) -> Result<AccountEventEmitter, Error> { |
| let inspect = inspect::Listeners::new(parent)?; |
| Ok(Self { clients: Mutex::new(Vec::new()), inspect }) |
| } |
| |
| /// Send an event to all active listeners filtered by their respective options. Awaits until |
| /// all messages have been confirmed by the servers. |
| pub async fn publish<'a>(&'a self, event: &'a AccountEvent) { |
| let mut clients_lock = await!(self.clients.lock()); |
| clients_lock.retain(|client| !client.listener.is_closed()); |
| let futures = (&*clients_lock) |
| .into_iter() |
| .map(|client| client.possibly_send(event)) |
| .map(|fut| Pin::<Box<_>>::from(Box::new(fut))); |
| let all_futures = join_all(futures); |
| std::mem::drop(clients_lock); |
| await!(all_futures); |
| let _ = self.inspect.events.add(1); |
| } |
| |
| /// Add a new listener to the collection. |
| pub async fn add_listener<'a>( |
| &'a self, |
| listener: AccountListenerProxy, |
| options: AccountListenerOptions, |
| initial_auth_states: &'a Vec<AccountAuthState>, |
| ) -> Result<(), fidl::Error> { |
| let mut clients_lock = await!(self.clients.lock()); |
| let future = if options.initial_state { |
| let mut v: Vec<FidlAccountAuthState> = initial_auth_states |
| .into_iter() |
| .map(|auth_state| FidlAccountAuthState::from(auth_state)) |
| .collect(); |
| FutureObj::new(Box::pin(listener.on_initialize(&mut v.iter_mut()))) |
| } else { |
| FutureObj::new(Box::pin(ok(()))) |
| }; |
| clients_lock.push(Client::new(listener, options)); |
| let _ = self.inspect.active.set(clients_lock.len() as u64); |
| std::mem::drop(clients_lock); |
| await!(future) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use fidl::endpoints::*; |
| use fidl_fuchsia_auth::AuthChangeGranularity; |
| use fidl_fuchsia_auth_account::{AccountListenerMarker, AccountListenerRequest}; |
| use fuchsia_inspect::vmo::Inspector; |
| use futures::prelude::*; |
| use lazy_static::lazy_static; |
| |
| lazy_static! { |
| static ref ACCOUNT_ID_ADD: LocalAccountId = LocalAccountId::new(1); |
| static ref ACCOUNT_ID_REMOVE: LocalAccountId = LocalAccountId::new(2); |
| static ref EVENT_ADDED: AccountEvent = AccountEvent::AccountAdded(ACCOUNT_ID_ADD.clone()); |
| static ref EVENT_REMOVED: AccountEvent = |
| AccountEvent::AccountRemoved(ACCOUNT_ID_REMOVE.clone()); |
| static ref EVENT_STATE_CHANGED: AccountEvent = |
| AccountEvent::AuthStateChanged(AccountAuthState { account_id: LocalAccountId::new(4) }); |
| static ref AUTH_STATE: AccountAuthState = |
| AccountAuthState { account_id: LocalAccountId::new(6) }; |
| static ref AUTH_STATES: Vec<AccountAuthState> = vec![AUTH_STATE.clone()]; |
| } |
| |
| /// Creates a new AccountEventEmitter whose inspect interface is not exported |
| fn create_account_event_emitter() -> AccountEventEmitter { |
| let inspector = Inspector::new().unwrap(); |
| AccountEventEmitter::new(inspector.root()).unwrap() |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_should_send_all() { |
| let options = AccountListenerOptions { |
| initial_state: true, |
| add_account: true, |
| remove_account: true, |
| granularity: AuthChangeGranularity { summary_changes: true }, |
| }; |
| let (listener, _) = create_proxy::<AccountListenerMarker>().unwrap(); |
| let client = Client::new(listener, options); |
| assert_eq!(client.should_send(&EVENT_ADDED), true); |
| assert_eq!(client.should_send(&EVENT_REMOVED), true); |
| assert_eq!(client.should_send(&EVENT_STATE_CHANGED), false); |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_should_send_none() { |
| let options = AccountListenerOptions { |
| initial_state: false, |
| add_account: false, |
| remove_account: false, |
| granularity: AuthChangeGranularity { summary_changes: false }, |
| }; |
| let (proxy, _) = create_proxy::<AccountListenerMarker>().unwrap(); |
| let client = Client::new(proxy, options); |
| assert_eq!(client.should_send(&EVENT_ADDED), false); |
| assert_eq!(client.should_send(&EVENT_REMOVED), false); |
| assert_eq!(client.should_send(&EVENT_STATE_CHANGED), false); |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_should_send_some() { |
| let options = AccountListenerOptions { |
| initial_state: true, |
| add_account: false, |
| remove_account: true, |
| granularity: AuthChangeGranularity { summary_changes: false }, |
| }; |
| let (proxy, _) = create_proxy::<AccountListenerMarker>().unwrap(); |
| let client = Client::new(proxy, options); |
| assert_eq!(client.should_send(&EVENT_ADDED), false); |
| assert_eq!(client.should_send(&EVENT_REMOVED), true); |
| assert_eq!(client.should_send(&EVENT_STATE_CHANGED), false); |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_single_listener() { |
| let options = AccountListenerOptions { |
| initial_state: false, |
| add_account: true, |
| remove_account: false, |
| granularity: AuthChangeGranularity { summary_changes: false }, |
| }; |
| let (client_end, mut stream) = create_request_stream::<AccountListenerMarker>().unwrap(); |
| let client = Client::new(client_end.into_proxy().unwrap(), options); |
| |
| // Expect only the AccountAdded event, the filter skips the AccountRemoved event |
| let serve_fut = async move { |
| let request = await!(stream.try_next()).unwrap(); |
| if let Some(AccountListenerRequest::OnAccountAdded { id, responder }) = request { |
| assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_ADD.clone()); |
| responder.send().unwrap(); |
| } else { |
| panic!("Unexpected message received"); |
| }; |
| if let Some(_) = await!(stream.try_next()).unwrap() { |
| panic!("Unexpected message, channel should be closed"); |
| } |
| }; |
| let request_fut = async move { |
| assert!(await!(client.possibly_send(&EVENT_ADDED)).is_ok()); |
| assert!(await!(client.possibly_send(&EVENT_REMOVED)).is_ok()); |
| }; |
| await!(join(serve_fut, request_fut)); |
| } |
| |
| /// Given two independent clients with different options/filters, send events and check |
| /// that the clients receive the expected messages. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_event_emitter() { |
| let options_1 = AccountListenerOptions { |
| initial_state: false, |
| add_account: true, |
| remove_account: false, |
| granularity: AuthChangeGranularity { summary_changes: false }, |
| }; |
| let options_2 = AccountListenerOptions { |
| initial_state: true, |
| add_account: false, |
| remove_account: true, |
| granularity: AuthChangeGranularity { summary_changes: false }, |
| }; |
| let (client_end_1, mut stream_1) = |
| create_request_stream::<AccountListenerMarker>().unwrap(); |
| let (client_end_2, mut stream_2) = |
| create_request_stream::<AccountListenerMarker>().unwrap(); |
| let listener_1 = client_end_1.into_proxy().unwrap(); |
| let listener_2 = client_end_2.into_proxy().unwrap(); |
| let account_event_emitter = create_account_event_emitter(); |
| |
| let serve_fut_1 = async move { |
| let request = await!(stream_1.try_next()).unwrap(); |
| if let Some(AccountListenerRequest::OnAccountAdded { id, responder }) = request { |
| assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_ADD.clone()); |
| responder.send().unwrap(); |
| } else { |
| panic!("Unexpected message received"); |
| }; |
| if let Some(_) = await!(stream_1.try_next()).unwrap() { |
| panic!("Unexpected message, channel should be closed"); |
| } |
| }; |
| |
| let serve_fut_2 = async move { |
| let request = await!(stream_2.try_next()).unwrap(); |
| if let Some(AccountListenerRequest::OnInitialize { account_auth_states, responder }) = |
| request |
| { |
| assert_eq!( |
| account_auth_states, |
| vec![FidlAccountAuthState::from(&AUTH_STATE.clone())] |
| ); |
| responder.send().unwrap(); |
| } else { |
| panic!("Unexpected message received"); |
| }; |
| let request = await!(stream_2.try_next()).unwrap(); |
| if let Some(AccountListenerRequest::OnAccountRemoved { id, responder }) = request { |
| assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_REMOVE.clone()); |
| responder.send().unwrap(); |
| } else { |
| panic!("Unexpected message received"); |
| }; |
| if let Some(_) = await!(stream_2.try_next()).unwrap() { |
| panic!("Unexpected message, channel should be closed"); |
| } |
| }; |
| |
| let request_fut = async move { |
| assert_eq!(account_event_emitter.inspect.active.get().unwrap(), 0); |
| assert!(await!(account_event_emitter.add_listener( |
| listener_1, |
| options_1, |
| &AUTH_STATES |
| )) |
| .is_ok()); |
| assert_eq!(account_event_emitter.inspect.active.get().unwrap(), 1); |
| assert!(await!(account_event_emitter.add_listener( |
| listener_2, |
| options_2, |
| &AUTH_STATES |
| )) |
| .is_ok()); |
| assert_eq!(account_event_emitter.inspect.active.get().unwrap(), 2); |
| |
| assert_eq!(account_event_emitter.inspect.events.get().unwrap(), 0); |
| await!(account_event_emitter.publish(&EVENT_ADDED)); |
| assert_eq!(account_event_emitter.inspect.events.get().unwrap(), 1); |
| await!(account_event_emitter.publish(&EVENT_REMOVED)); |
| assert_eq!(account_event_emitter.inspect.events.get().unwrap(), 2); |
| }; |
| await!(join3(serve_fut_1, serve_fut_2, request_fut)); |
| } |
| |
| /// Check that that stale clients are cleaned up, once the server is closed |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_cleanup_stale_clients() { |
| let options = AccountListenerOptions { |
| initial_state: false, |
| add_account: true, |
| remove_account: true, |
| granularity: AuthChangeGranularity { summary_changes: false }, |
| }; |
| let (client_end, mut stream) = create_request_stream::<AccountListenerMarker>().unwrap(); |
| let listener = client_end.into_proxy().unwrap(); |
| let account_event_emitter = create_account_event_emitter(); |
| assert!(await!(account_event_emitter.add_listener(listener, options, &AUTH_STATES)).is_ok()); |
| |
| let serve_fut = async move { |
| let request = await!(stream.try_next()).unwrap(); |
| if let Some(AccountListenerRequest::OnAccountAdded { id, responder }) = request { |
| assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_ADD.clone()); |
| responder.send().unwrap(); |
| } else { |
| panic!("Unexpected message received"); |
| }; |
| }; |
| |
| let request_fut = async move { |
| await!(account_event_emitter.publish(&EVENT_ADDED)); // Normal event |
| { |
| let clients_lock = await!(account_event_emitter.clients.lock()); |
| assert_eq!(clients_lock.len(), 1); // Listener remains |
| } |
| account_event_emitter |
| }; |
| let (_, account_event_emitter) = await!(join(serve_fut, request_fut)); |
| |
| // Now the server is dropped, so the new publish should trigger a drop of the client |
| await!(account_event_emitter.publish(&EVENT_REMOVED)); |
| let clients_lock = await!(account_event_emitter.clients.lock()); |
| assert!(clients_lock.is_empty()); |
| } |
| } |