blob: 87555b7d8d7d997ae1dd211c73bf33d1759cb737 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::agent::storage::storage_factory::testing::InMemoryStorageFactory;
use crate::agent::{AgentError, Context, Payload};
use crate::event;
use crate::service;
use crate::tests::scaffold;
use crate::EnvironmentBuilder;
use assert_matches::assert_matches;
use fuchsia_async as fasync;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use std::sync::Arc;
const ENV_NAME: &str = "settings_service_event_test_environment";
// Exercises the event publishing path from agents.
#[fuchsia_async::run_until_stalled(test)]
async fn test_agent_event_propagation() {
let agent_publisher: Arc<Mutex<Option<event::Publisher>>> = Arc::new(Mutex::new(None));
let delegate: Arc<Mutex<Option<service::message::Delegate>>> = Arc::new(Mutex::new(None));
// Capturing the context allows retrieving the publisher meant for the
// agent.
let publisher_capture = agent_publisher.clone();
// Capturing the delegate allows registering a listener to published events.
let cloned_delegate = delegate.clone();
// Upon instantiation, the subscriber will capture the event message
// delegate.
let create_subscriber =
Arc::new(move |captured_delegate: service::message::Delegate| -> BoxFuture<'static, ()> {
let delegate = cloned_delegate.clone();
Box::pin(async move {
*delegate.lock().await = Some(captured_delegate);
})
});
// This agent simply captures the context and returns unhandled for all
// subsequent invocations (allowing the authority to progress).
let create_agent = Arc::new(move |mut context: Context| -> BoxFuture<'static, ()> {
let publisher_capture = publisher_capture.clone();
Box::pin(async move {
*publisher_capture.lock().await = Some(context.get_publisher());
fasync::Task::spawn(async move {
let _ = &context;
while let Ok((Payload::Invocation(_), client)) =
context.receptor.next_of::<Payload>().await
{
client
.reply(Payload::Complete(Err(AgentError::UnhandledLifespan)).into())
.send()
.ack();
}
})
.detach();
})
});
let _ = EnvironmentBuilder::new(Arc::new(InMemoryStorageFactory::new()))
.event_subscribers(&[scaffold::event::subscriber::Blueprint::create(create_subscriber)])
.agents(&[Arc::new(scaffold::agent::Blueprint::new(scaffold::agent::Generate::Async(
create_agent,
)))])
.spawn_and_get_nested_environment(ENV_NAME)
.await
.unwrap();
let service_delegate =
delegate.clone().lock().await.take().expect("Should have captured event factory");
let mut receptor = service::build_event_listener(&service_delegate).await;
let sent_event = event::Event::Custom("test");
let publisher = agent_publisher.lock().await.take().expect("Should have captured publisher");
publisher.send_event(sent_event.clone());
assert_matches!(
receptor.next_of::<event::Payload>().await.expect("Should have received broadcast").0,
event::Payload::Event(broadcasted_event) if broadcasted_event == sent_event);
}