blob: 4a3d701e1f51b5d85e61dc5973942acb04a52807 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Implementation for the event emitting client end of the AccountListener FIDL interface,
//! sending events to listeners, optionally configured with filters, about changes in the accounts
//! presence and states during their lifetime.
use account_common::{AccountAuthState, FidlAccountAuthState, LocalAccountId};
use failure::Error;
use fidl_fuchsia_auth_account::{AccountListenerOptions, AccountListenerProxy};
use fuchsia_inspect::vmo::{Metric, Node};
use futures::future::*;
use futures::lock::Mutex;
use std::pin::Pin;
use crate::inspect;
/// Events emitted on account listeners
pub enum AccountEvent {
/// AccountAdded is emitted after an account has been added.
AccountAdded(LocalAccountId),
/// AccountRemoved is emitted after an account has been removed.
AccountRemoved(LocalAccountId),
/// AuthStateChanged is emitted after an account's auth state has changed.
#[allow(dead_code)]
AuthStateChanged(AccountAuthState),
}
/// The client end of an account listener.
struct Client {
listener: AccountListenerProxy,
options: AccountListenerOptions,
}
impl Client {
/// Create a new client, given the listener's client end and options used for filtering.
fn new(listener: AccountListenerProxy, options: AccountListenerOptions) -> Self {
Self { listener, options }
}
fn should_send(&self, event: &AccountEvent) -> bool {
match event {
AccountEvent::AccountAdded(_) => self.options.add_account,
AccountEvent::AccountRemoved(_) => self.options.remove_account,
AccountEvent::AuthStateChanged(_) => false, // TODO: Implement
}
}
fn send<'a>(
&'a self,
event: &'a AccountEvent,
) -> impl Future<Output = Result<(), fidl::Error>> {
match event {
AccountEvent::AccountAdded(id) => {
self.listener.on_account_added(&mut id.clone().into())
}
AccountEvent::AccountRemoved(id) => {
self.listener.on_account_removed(&mut id.clone().into())
}
AccountEvent::AuthStateChanged(account_auth_state) => {
self.listener.on_auth_state_changed(&mut (&account_auth_state.clone()).into())
}
}
}
/// Emit a given event on the channel if it passes the filter in options. Returns a future
/// which will return once the listening end has responded.
fn possibly_send<'a>(
&'a self,
event: &'a AccountEvent,
) -> impl Future<Output = Result<(), fidl::Error>> {
if self.should_send(&event) {
FutureObj::new(Box::pin(self.send(event)))
} else {
FutureObj::new(Box::pin(ok(())))
}
}
}
/// A type to maintain the set of account listener clients and distribute events to them.
pub struct AccountEventEmitter {
/// Collection of account listener clients.
clients: Mutex<Vec<Client>>,
/// Helper for outputting listener information via fuchsia_inspect.
inspect: inspect::Listeners,
}
impl AccountEventEmitter {
/// Create a new emitter with no clients.
pub fn new(parent: &Node) -> Result<AccountEventEmitter, Error> {
let inspect = inspect::Listeners::new(parent)?;
Ok(Self { clients: Mutex::new(Vec::new()), inspect })
}
/// Send an event to all active listeners filtered by their respective options. Awaits until
/// all messages have been confirmed by the servers.
pub async fn publish<'a>(&'a self, event: &'a AccountEvent) {
let mut clients_lock = await!(self.clients.lock());
clients_lock.retain(|client| !client.listener.is_closed());
let futures = (&*clients_lock)
.into_iter()
.map(|client| client.possibly_send(event))
.map(|fut| Pin::<Box<_>>::from(Box::new(fut)));
let all_futures = join_all(futures);
std::mem::drop(clients_lock);
await!(all_futures);
let _ = self.inspect.events.add(1);
}
/// Add a new listener to the collection.
pub async fn add_listener<'a>(
&'a self,
listener: AccountListenerProxy,
options: AccountListenerOptions,
initial_auth_states: &'a Vec<AccountAuthState>,
) -> Result<(), fidl::Error> {
let mut clients_lock = await!(self.clients.lock());
let future = if options.initial_state {
let mut v: Vec<FidlAccountAuthState> = initial_auth_states
.into_iter()
.map(|auth_state| FidlAccountAuthState::from(auth_state))
.collect();
FutureObj::new(Box::pin(listener.on_initialize(&mut v.iter_mut())))
} else {
FutureObj::new(Box::pin(ok(())))
};
clients_lock.push(Client::new(listener, options));
let _ = self.inspect.active.set(clients_lock.len() as u64);
std::mem::drop(clients_lock);
await!(future)
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints::*;
use fidl_fuchsia_auth::AuthChangeGranularity;
use fidl_fuchsia_auth_account::{AccountListenerMarker, AccountListenerRequest};
use fuchsia_inspect::vmo::Inspector;
use futures::prelude::*;
use lazy_static::lazy_static;
lazy_static! {
static ref ACCOUNT_ID_ADD: LocalAccountId = LocalAccountId::new(1);
static ref ACCOUNT_ID_REMOVE: LocalAccountId = LocalAccountId::new(2);
static ref EVENT_ADDED: AccountEvent = AccountEvent::AccountAdded(ACCOUNT_ID_ADD.clone());
static ref EVENT_REMOVED: AccountEvent =
AccountEvent::AccountRemoved(ACCOUNT_ID_REMOVE.clone());
static ref EVENT_STATE_CHANGED: AccountEvent =
AccountEvent::AuthStateChanged(AccountAuthState { account_id: LocalAccountId::new(4) });
static ref AUTH_STATE: AccountAuthState =
AccountAuthState { account_id: LocalAccountId::new(6) };
static ref AUTH_STATES: Vec<AccountAuthState> = vec![AUTH_STATE.clone()];
}
/// Creates a new AccountEventEmitter whose inspect interface is not exported
fn create_account_event_emitter() -> AccountEventEmitter {
let inspector = Inspector::new().unwrap();
AccountEventEmitter::new(inspector.root()).unwrap()
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_should_send_all() {
let options = AccountListenerOptions {
initial_state: true,
add_account: true,
remove_account: true,
granularity: AuthChangeGranularity { summary_changes: true },
};
let (listener, _) = create_proxy::<AccountListenerMarker>().unwrap();
let client = Client::new(listener, options);
assert_eq!(client.should_send(&EVENT_ADDED), true);
assert_eq!(client.should_send(&EVENT_REMOVED), true);
assert_eq!(client.should_send(&EVENT_STATE_CHANGED), false);
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_should_send_none() {
let options = AccountListenerOptions {
initial_state: false,
add_account: false,
remove_account: false,
granularity: AuthChangeGranularity { summary_changes: false },
};
let (proxy, _) = create_proxy::<AccountListenerMarker>().unwrap();
let client = Client::new(proxy, options);
assert_eq!(client.should_send(&EVENT_ADDED), false);
assert_eq!(client.should_send(&EVENT_REMOVED), false);
assert_eq!(client.should_send(&EVENT_STATE_CHANGED), false);
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_should_send_some() {
let options = AccountListenerOptions {
initial_state: true,
add_account: false,
remove_account: true,
granularity: AuthChangeGranularity { summary_changes: false },
};
let (proxy, _) = create_proxy::<AccountListenerMarker>().unwrap();
let client = Client::new(proxy, options);
assert_eq!(client.should_send(&EVENT_ADDED), false);
assert_eq!(client.should_send(&EVENT_REMOVED), true);
assert_eq!(client.should_send(&EVENT_STATE_CHANGED), false);
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_single_listener() {
let options = AccountListenerOptions {
initial_state: false,
add_account: true,
remove_account: false,
granularity: AuthChangeGranularity { summary_changes: false },
};
let (client_end, mut stream) = create_request_stream::<AccountListenerMarker>().unwrap();
let client = Client::new(client_end.into_proxy().unwrap(), options);
// Expect only the AccountAdded event, the filter skips the AccountRemoved event
let serve_fut = async move {
let request = await!(stream.try_next()).unwrap();
if let Some(AccountListenerRequest::OnAccountAdded { id, responder }) = request {
assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_ADD.clone());
responder.send().unwrap();
} else {
panic!("Unexpected message received");
};
if let Some(_) = await!(stream.try_next()).unwrap() {
panic!("Unexpected message, channel should be closed");
}
};
let request_fut = async move {
assert!(await!(client.possibly_send(&EVENT_ADDED)).is_ok());
assert!(await!(client.possibly_send(&EVENT_REMOVED)).is_ok());
};
await!(join(serve_fut, request_fut));
}
/// Given two independent clients with different options/filters, send events and check
/// that the clients receive the expected messages.
#[fuchsia_async::run_until_stalled(test)]
async fn test_event_emitter() {
let options_1 = AccountListenerOptions {
initial_state: false,
add_account: true,
remove_account: false,
granularity: AuthChangeGranularity { summary_changes: false },
};
let options_2 = AccountListenerOptions {
initial_state: true,
add_account: false,
remove_account: true,
granularity: AuthChangeGranularity { summary_changes: false },
};
let (client_end_1, mut stream_1) =
create_request_stream::<AccountListenerMarker>().unwrap();
let (client_end_2, mut stream_2) =
create_request_stream::<AccountListenerMarker>().unwrap();
let listener_1 = client_end_1.into_proxy().unwrap();
let listener_2 = client_end_2.into_proxy().unwrap();
let account_event_emitter = create_account_event_emitter();
let serve_fut_1 = async move {
let request = await!(stream_1.try_next()).unwrap();
if let Some(AccountListenerRequest::OnAccountAdded { id, responder }) = request {
assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_ADD.clone());
responder.send().unwrap();
} else {
panic!("Unexpected message received");
};
if let Some(_) = await!(stream_1.try_next()).unwrap() {
panic!("Unexpected message, channel should be closed");
}
};
let serve_fut_2 = async move {
let request = await!(stream_2.try_next()).unwrap();
if let Some(AccountListenerRequest::OnInitialize { account_auth_states, responder }) =
request
{
assert_eq!(
account_auth_states,
vec![FidlAccountAuthState::from(&AUTH_STATE.clone())]
);
responder.send().unwrap();
} else {
panic!("Unexpected message received");
};
let request = await!(stream_2.try_next()).unwrap();
if let Some(AccountListenerRequest::OnAccountRemoved { id, responder }) = request {
assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_REMOVE.clone());
responder.send().unwrap();
} else {
panic!("Unexpected message received");
};
if let Some(_) = await!(stream_2.try_next()).unwrap() {
panic!("Unexpected message, channel should be closed");
}
};
let request_fut = async move {
assert_eq!(account_event_emitter.inspect.active.get().unwrap(), 0);
assert!(await!(account_event_emitter.add_listener(
listener_1,
options_1,
&AUTH_STATES
))
.is_ok());
assert_eq!(account_event_emitter.inspect.active.get().unwrap(), 1);
assert!(await!(account_event_emitter.add_listener(
listener_2,
options_2,
&AUTH_STATES
))
.is_ok());
assert_eq!(account_event_emitter.inspect.active.get().unwrap(), 2);
assert_eq!(account_event_emitter.inspect.events.get().unwrap(), 0);
await!(account_event_emitter.publish(&EVENT_ADDED));
assert_eq!(account_event_emitter.inspect.events.get().unwrap(), 1);
await!(account_event_emitter.publish(&EVENT_REMOVED));
assert_eq!(account_event_emitter.inspect.events.get().unwrap(), 2);
};
await!(join3(serve_fut_1, serve_fut_2, request_fut));
}
/// Check that that stale clients are cleaned up, once the server is closed
#[fuchsia_async::run_until_stalled(test)]
async fn test_cleanup_stale_clients() {
let options = AccountListenerOptions {
initial_state: false,
add_account: true,
remove_account: true,
granularity: AuthChangeGranularity { summary_changes: false },
};
let (client_end, mut stream) = create_request_stream::<AccountListenerMarker>().unwrap();
let listener = client_end.into_proxy().unwrap();
let account_event_emitter = create_account_event_emitter();
assert!(await!(account_event_emitter.add_listener(listener, options, &AUTH_STATES)).is_ok());
let serve_fut = async move {
let request = await!(stream.try_next()).unwrap();
if let Some(AccountListenerRequest::OnAccountAdded { id, responder }) = request {
assert_eq!(LocalAccountId::from(id), ACCOUNT_ID_ADD.clone());
responder.send().unwrap();
} else {
panic!("Unexpected message received");
};
};
let request_fut = async move {
await!(account_event_emitter.publish(&EVENT_ADDED)); // Normal event
{
let clients_lock = await!(account_event_emitter.clients.lock());
assert_eq!(clients_lock.len(), 1); // Listener remains
}
account_event_emitter
};
let (_, account_event_emitter) = await!(join(serve_fut, request_fut));
// Now the server is dropped, so the new publish should trigger a drop of the client
await!(account_event_emitter.publish(&EVENT_REMOVED));
let clients_lock = await!(account_event_emitter.clients.lock());
assert!(clients_lock.is_empty());
}
}