| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::agent::authority::Authority; |
| use crate::agent::{ |
| AgentError, BlueprintHandle, Context, Invocation, InvocationResult, Lifespan, Payload, |
| }; |
| use crate::base::SettingType; |
| use crate::handler::device_storage::testing::InMemoryStorageFactory; |
| use crate::service; |
| use crate::service_context::ServiceContext; |
| use crate::tests::scaffold; |
| use crate::EnvironmentBuilder; |
| use core::fmt::{Debug, Formatter}; |
| use fuchsia_async as fasync; |
| use futures::channel::mpsc::UnboundedSender; |
| use futures::lock::Mutex; |
| use futures::StreamExt; |
| use rand::Rng; |
| use std::collections::HashSet; |
| use std::convert::TryFrom; |
| use std::sync::Arc; |
| |
| const ENV_NAME: &str = "settings_service_agent_test_environment"; |
| |
| type CallbackSender = UnboundedSender<(u32, Invocation, AckSender)>; |
| type AckSender = futures::channel::oneshot::Sender<InvocationResult>; |
| |
| #[derive(PartialEq, Clone)] |
| enum LifespanTarget { |
| Initialization, |
| Service, |
| } |
| |
| /// Agent provides a test agent to interact with the authority impl. It is |
| /// instantiated with an id that can be used to identify it when returned by |
| /// other parts of the code. Additionally, the last invocation is stored so that |
| /// it can be inspected in tests. |
| /// |
| /// An asynchronous task is spawned upon creation, which listens to an |
| /// invocations. Whenever an invocation is encountered, a callback provided at |
| /// construction is fired (in this context to inform the test of the change). At |
| /// that point, the agent owner may continue the lifespan execution by calling |
| /// continue_invocation. |
| struct TestAgent { |
| id: u32, |
| lifespan_target: LifespanTarget, |
| last_invocation: Option<Invocation>, |
| callback: CallbackSender, |
| } |
| |
| impl Debug for TestAgent { |
| fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { |
| write!(f, "Agent {{ id: {} }}", self.id) |
| } |
| } |
| |
| impl TestAgent { |
| // Creates an agent and spawns a listener for invocation. The agent will be |
| // registered with the given authority for the lifespan specified. The |
| // callback will be invoked whenever an invocation is encountered, passing a |
| // reference to this agent. |
| pub async fn create_and_register( |
| id: u32, |
| lifespan_target: LifespanTarget, |
| authority: &mut Authority, |
| callback: CallbackSender, |
| ) -> Arc<Mutex<TestAgent>> { |
| let (agent, generate) = Self::create(id, lifespan_target, callback); |
| |
| authority.register(generate).await; |
| |
| agent |
| } |
| |
| pub fn create( |
| id: u32, |
| lifespan_target: LifespanTarget, |
| callback: CallbackSender, |
| ) -> (Arc<Mutex<TestAgent>>, BlueprintHandle) { |
| let agent = Arc::new(Mutex::new(TestAgent { |
| id: id, |
| last_invocation: None, |
| lifespan_target: lifespan_target, |
| callback: callback, |
| })); |
| |
| let agent_clone = agent.clone(); |
| let blueprint = Arc::new(scaffold::agent::Blueprint::new(scaffold::agent::Generate::Sync( |
| Arc::new(move |mut context: Context| { |
| let agent = agent_clone.clone(); |
| fasync::Task::spawn(async move { |
| while let Ok((payload, client)) = context.receptor.next_payload().await { |
| if let Ok(Payload::Invocation(invocation)) = Payload::try_from(payload) { |
| client |
| .reply( |
| Payload::Complete(agent.lock().await.handle(invocation).await) |
| .into(), |
| ) |
| .send() |
| .ack(); |
| } |
| } |
| }) |
| .detach(); |
| }), |
| ))); |
| |
| (agent.clone(), blueprint) |
| } |
| |
| async fn handle(&mut self, invocation: Invocation) -> InvocationResult { |
| match invocation.lifespan.clone() { |
| Lifespan::Initialization => { |
| if self.lifespan_target != LifespanTarget::Initialization { |
| return Err(AgentError::UnhandledLifespan); |
| } |
| } |
| Lifespan::Service => { |
| if self.lifespan_target != LifespanTarget::Service { |
| return Err(AgentError::UnhandledLifespan); |
| } |
| } |
| } |
| |
| self.last_invocation = Some(invocation.clone()); |
| let (tx, rx) = futures::channel::oneshot::channel::<InvocationResult>(); |
| self.callback.unbounded_send((self.id, invocation.clone(), tx)).ok(); |
| if let Ok(result) = rx.await { |
| return result; |
| } else { |
| return Err(AgentError::UnexpectedError); |
| } |
| } |
| |
| /// Returns the id specified at construction time. |
| pub fn id(&self) -> u32 { |
| return self.id; |
| } |
| |
| /// Returns the last encountered, unprocessed invocation. None will be |
| /// returned if such invocation does not exist. |
| pub fn last_invocation(&self) -> Option<Invocation> { |
| if let Some(last_invocation) = &self.last_invocation { |
| return Some(last_invocation.clone()); |
| } |
| |
| return None; |
| } |
| } |
| |
| /// Ensures creating environment properly invokes the right lifespans. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_environment_startup() { |
| let startup_agent_id = 1; |
| let (startup_tx, mut startup_rx) = |
| futures::channel::mpsc::unbounded::<(u32, Invocation, AckSender)>(); |
| |
| let service_agent_id = 2; |
| let (service_tx, mut service_rx) = |
| futures::channel::mpsc::unbounded::<(u32, Invocation, AckSender)>(); |
| let (service_agent, service_agent_generate) = |
| TestAgent::create(service_agent_id, LifespanTarget::Service, service_tx); |
| |
| { |
| let service_agent = service_agent.clone(); |
| fasync::Task::spawn(async move { |
| // Wait for the initialization agent to receive invocation |
| if let Some((id, _, tx)) = startup_rx.next().await { |
| // Verify the correct agent was invoked. |
| assert_eq!(id, startup_agent_id); |
| assert!(tx.send(Ok(())).is_ok()); |
| // Ensure the service agent hasn't been invoked |
| assert!(service_agent.lock().await.last_invocation.is_none()); |
| } |
| }) |
| .detach(); |
| } |
| |
| fasync::Task::spawn(async move { |
| // Wait for service agent to receive notification |
| if let Some((id, _, tx)) = service_rx.next().await { |
| // Verify the correct agent was invoked |
| assert_eq!(id, service_agent_id); |
| // Ensure acknowledging succeeds |
| assert!(tx.send(Ok(())).is_ok()); |
| } |
| }) |
| .detach(); |
| |
| let (_, agent_generate) = |
| TestAgent::create(startup_agent_id, LifespanTarget::Initialization, startup_tx); |
| |
| assert!(EnvironmentBuilder::new(Arc::new(InMemoryStorageFactory::new())) |
| .agents(&[service_agent_generate, agent_generate,]) |
| .settings(&[SettingType::Display]) |
| .spawn_nested(ENV_NAME) |
| .await |
| .is_ok()); |
| } |
| |
| async fn create_authority() -> Authority { |
| Authority::create(service::message::create_hub(), HashSet::new(), None).await.unwrap() |
| } |
| |
| /// Ensures that agents are executed in sequential order and the |
| /// completion ack only is sent when all agents have completed. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_sequential() { |
| let (tx, mut rx) = futures::channel::mpsc::unbounded::<(u32, Invocation, AckSender)>(); |
| let mut authority = create_authority().await; |
| let service_context = Arc::new(ServiceContext::new(None, None)); |
| |
| // Create a number of agents. |
| let agent_ids = |
| create_agents(12, LifespanTarget::Initialization, &mut authority, tx.clone()).await; |
| |
| fasync::Task::spawn(async move { |
| // Process the agent callbacks, making sure they are received in the right |
| // order and acknowledging the acks. Note that this is a chain reaction. |
| // Processing the first agent is necessary before the second can receive its |
| // invocation. |
| for agent_id in agent_ids { |
| match rx.next().await { |
| Some((id, _, tx)) => { |
| assert!(rx.try_next().is_err()); |
| |
| if agent_id == id { |
| assert!(tx.send(Ok(())).is_ok()); |
| } |
| } |
| _ => { |
| panic!("couldn't get invocation"); |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| // Ensure lifespan execution completes. |
| assert!(authority |
| .execute_lifespan(Lifespan::Initialization, service_context, true,) |
| .await |
| .is_ok()); |
| } |
| |
| /// Ensures that in simultaneous execution agents are not blocked on each other |
| /// and the completion ack waits for all to complete. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_simultaneous() { |
| let (tx, mut rx) = futures::channel::mpsc::unbounded::<(u32, Invocation, AckSender)>(); |
| let mut authority = create_authority().await; |
| let service_context = Arc::new(ServiceContext::new(None, None)); |
| let agent_ids = |
| create_agents(12, LifespanTarget::Initialization, &mut authority, tx.clone()).await; |
| |
| fasync::Task::spawn(async move { |
| // Ensure that each agent has received the invocation. Note that we are not |
| // acknowledging the invocations here. Each agent should be notified |
| // regardless of order. |
| let mut senders = Vec::new(); |
| for agent_id in agent_ids { |
| if let Some((id, _, tx)) = rx.next().await { |
| assert_eq!(id, agent_id); |
| senders.push(tx); |
| } else { |
| panic!("should be able to retrieve agent"); |
| } |
| } |
| |
| // Acknowledge each invocation. |
| for sender in senders { |
| assert!(sender.send(Ok(())).is_ok()); |
| } |
| }) |
| .detach(); |
| |
| // Execute lifespan non-sequentially. |
| assert!(authority |
| .execute_lifespan(Lifespan::Initialization, service_context, false,) |
| .await |
| .is_ok()); |
| } |
| |
| /// Checks that errors returned from an agent stop execution of a lifecycle. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_err_handling() { |
| let (tx, mut rx) = futures::channel::mpsc::unbounded::<(u32, Invocation, AckSender)>(); |
| let mut authority = create_authority().await; |
| let service_context = Arc::new(ServiceContext::new(None, None)); |
| let mut rng = rand::thread_rng(); |
| |
| let agent_1_id = TestAgent::create_and_register( |
| rng.gen(), |
| LifespanTarget::Initialization, |
| &mut authority, |
| tx.clone(), |
| ) |
| .await |
| .lock() |
| .await |
| .id(); |
| |
| let agent2_lock = TestAgent::create_and_register( |
| rng.gen(), |
| LifespanTarget::Initialization, |
| &mut authority, |
| tx.clone(), |
| ) |
| .await; |
| |
| fasync::Task::spawn(async move { |
| // Ensure the first agent received an invocation, acknowledge with an error. |
| if let Some((id, _, tx)) = rx.next().await { |
| assert_eq!(agent_1_id, id); |
| assert!(tx.send(Err(AgentError::UnexpectedError)).is_ok()); |
| } else { |
| panic!("did not receive expected response from agent"); |
| } |
| }) |
| .detach(); |
| |
| // Execute lifespan sequentially. Should fail since agent 2 returns an error. |
| assert!(authority |
| .execute_lifespan(Lifespan::Initialization, service_context, true,) |
| .await |
| .is_err()); |
| |
| assert!(agent2_lock.lock().await.last_invocation().is_none()); |
| } |
| |
| async fn create_agents( |
| count: u32, |
| lifespan_target: LifespanTarget, |
| authority: &mut Authority, |
| sender: UnboundedSender<(u32, Invocation, AckSender)>, |
| ) -> Vec<u32> { |
| let mut return_agents = Vec::new(); |
| let mut rng = rand::thread_rng(); |
| |
| for _i in 0..count { |
| let id = rng.gen(); |
| return_agents.push(id); |
| TestAgent::create_and_register(id, lifespan_target.clone(), authority, sender.clone()) |
| .await; |
| } |
| |
| return return_agents; |
| } |