| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use std::collections::HashMap; |
| use std::pin::pin; |
| use std::sync::Arc; |
| use std::task::Poll; |
| |
| use diagnostics_assertions::assert_data_tree; |
| use fuchsia_async as fasync; |
| use fuchsia_inspect::Inspector; |
| use fuchsia_zircon::{Duration, DurationNum, Time}; |
| use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; |
| use futures::channel::oneshot; |
| use futures::lock::Mutex; |
| use futures::StreamExt; |
| |
| use async_trait::async_trait; |
| |
| use assert_matches::assert_matches; |
| |
| use crate::base::{SettingType, UnknownInfo}; |
| use crate::handler::base::{ |
| Error as HandlerError, Payload as HandlerPayload, Request, Response, SettingHandlerFactory, |
| SettingHandlerFactoryError, |
| }; |
| use crate::handler::setting_handler::{ |
| self, Command, ControllerError, Event, ExitResult, SettingHandlerResult, State, |
| }; |
| use crate::handler::setting_proxy::{SettingProxy, MAX_NODE_ERRORS}; |
| use crate::inspect::listener_logger::ListenerInspectLogger; |
| use crate::message::base::{Audience, MessageEvent, MessengerType}; |
| use crate::message::receptor::Receptor; |
| use crate::service::{self, message, TryFromWithClient}; |
| use crate::{clock, event, Payload}; |
| |
| const TEARDOWN_TIMEOUT: Duration = Duration::from_seconds(5); |
| const SETTING_PROXY_MAX_ATTEMPTS: u64 = 3; |
| const SETTING_PROXY_TIMEOUT_MS: i64 = 1; |
| |
| type ListenReceptor = Receptor; |
| |
| struct SettingHandler { |
| setting_type: SettingType, |
| messenger: service::message::Messenger, |
| state_tx: UnboundedSender<State>, |
| responses: Vec<(Request, HandlerAction)>, |
| done_tx: Option<oneshot::Sender<()>>, |
| proxy_signature: service::message::Signature, |
| } |
| |
| #[derive(Debug)] |
| enum HandlerAction { |
| Ignore, |
| Exit(ExitResult), |
| Respond(SettingHandlerResult), |
| RespondAfterDelay(SettingHandlerResult, Duration), |
| } |
| |
| impl SettingHandler { |
| #[allow(clippy::result_large_err)] // TODO(https://fxbug.dev/42069089) |
| fn process_state(&mut self, state: State) -> SettingHandlerResult { |
| self.state_tx.unbounded_send(state).unwrap(); |
| Ok(None) |
| } |
| |
| fn queue_action(&mut self, request: Request, action: HandlerAction) { |
| self.responses.push((request, action)) |
| } |
| |
| fn notify(&self) { |
| let _ = self.messenger.message( |
| setting_handler::Payload::Event(Event::Changed(UnknownInfo(true).into())).into(), |
| Audience::Messenger(self.proxy_signature), |
| ); |
| } |
| |
| async fn process_request(&mut self, request: Request) -> Option<SettingHandlerResult> { |
| if let Some((match_request, action)) = self.responses.pop() { |
| if request == match_request { |
| match action { |
| HandlerAction::Respond(result) => { |
| return Some(result); |
| } |
| HandlerAction::RespondAfterDelay(result, duration) => { |
| fasync::Timer::new(duration).await; |
| return Some(result); |
| } |
| HandlerAction::Ignore => { |
| return None; |
| } |
| HandlerAction::Exit(result) => { |
| let _ = self.messenger.message( |
| setting_handler::Payload::Event(Event::Exited(result)).into(), |
| Audience::Messenger(self.proxy_signature), |
| ); |
| return None; |
| } |
| } |
| } |
| } |
| |
| Some(Err(ControllerError::UnimplementedRequest(self.setting_type, request))) |
| } |
| |
| fn create( |
| messenger: service::message::Messenger, |
| mut receptor: service::message::Receptor, |
| proxy_signature: service::message::Signature, |
| setting_type: SettingType, |
| state_tx: UnboundedSender<State>, |
| done_tx: Option<oneshot::Sender<()>>, |
| ) -> Arc<Mutex<Self>> { |
| let handler = Arc::new(Mutex::new(Self { |
| messenger, |
| setting_type, |
| state_tx, |
| responses: vec![], |
| done_tx, |
| proxy_signature, |
| })); |
| |
| let handler_clone = handler.clone(); |
| fasync::Task::spawn(async move { |
| while let Some(event) = receptor.next().await { |
| match event { |
| MessageEvent::Message( |
| service::Payload::Controller(setting_handler::Payload::Command( |
| Command::HandleRequest(request), |
| )), |
| client, |
| ) => { |
| if let Some(response) = |
| handler_clone.lock().await.process_request(request).await |
| { |
| setting_handler::reply(client, response); |
| } |
| } |
| MessageEvent::Message( |
| service::Payload::Controller(setting_handler::Payload::Command( |
| Command::ChangeState(state), |
| )), |
| client, |
| ) => { |
| setting_handler::reply( |
| client, |
| handler_clone.lock().await.process_state(state), |
| ); |
| } |
| _ => {} |
| } |
| } |
| |
| if let Some(done_tx) = handler_clone.lock().await.done_tx.take() { |
| let _ = done_tx.send(()); |
| } |
| }) |
| .detach(); |
| |
| handler |
| } |
| } |
| |
| struct FakeFactory { |
| handlers: HashMap<SettingType, service::message::Signature>, |
| request_counts: HashMap<SettingType, u64>, |
| delegate: service::message::Delegate, |
| } |
| |
| impl FakeFactory { |
| fn new(delegate: service::message::Delegate) -> Self { |
| FakeFactory { handlers: HashMap::new(), request_counts: HashMap::new(), delegate } |
| } |
| |
| async fn create( |
| &mut self, |
| setting_type: SettingType, |
| ) -> (service::message::Messenger, service::message::Receptor) { |
| let (client, receptor) = self.delegate.create(MessengerType::Unbound).await.unwrap(); |
| let _ = self.handlers.insert(setting_type, receptor.get_signature()); |
| |
| (client, receptor) |
| } |
| |
| fn get_request_count(&mut self, setting_type: SettingType) -> u64 { |
| if let Some(count) = self.request_counts.get(&setting_type) { |
| *count |
| } else { |
| 0 |
| } |
| } |
| } |
| |
| #[async_trait] |
| impl SettingHandlerFactory for FakeFactory { |
| async fn generate( |
| &mut self, |
| setting_type: SettingType, |
| _: service::message::Delegate, |
| _: service::message::Signature, |
| ) -> Result<service::message::Signature, SettingHandlerFactoryError> { |
| Ok(self |
| .handlers |
| .get(&setting_type) |
| .copied() |
| .map(|signature| { |
| *self.request_counts.entry(setting_type).or_insert(0) += 1; |
| signature |
| }) |
| .unwrap()) |
| } |
| } |
| |
| struct TestEnvironmentBuilder { |
| setting_type: SettingType, |
| done_tx: Option<oneshot::Sender<()>>, |
| timeout: Option<(Duration, bool)>, |
| } |
| |
| impl TestEnvironmentBuilder { |
| fn new(setting_type: SettingType) -> Self { |
| Self { setting_type, done_tx: None, timeout: None } |
| } |
| |
| fn set_done_tx(mut self, tx: Option<oneshot::Sender<()>>) -> Self { |
| self.done_tx = tx; |
| self |
| } |
| |
| fn set_timeout(mut self, duration: Duration, retry_on_timeout: bool) -> Self { |
| self.timeout = Some((duration, retry_on_timeout)); |
| self |
| } |
| |
| async fn build(self) -> TestEnvironment { |
| let delegate = service::MessageHub::create_hub(); |
| |
| let handler_factory = Arc::new(Mutex::new(FakeFactory::new(delegate.clone()))); |
| |
| let inspector = Inspector::default(); |
| let listener_logger = |
| Arc::new(Mutex::new(ListenerInspectLogger::with_inspector(&inspector))); |
| let proxy_handler_signature = SettingProxy::create( |
| self.setting_type, |
| handler_factory.clone(), |
| delegate.clone(), |
| SETTING_PROXY_MAX_ATTEMPTS, |
| TEARDOWN_TIMEOUT, |
| self.timeout.map(|(duration, _)| duration), |
| self.timeout.map_or(true, |(_, retry)| retry), |
| inspector.root().create_child("test"), |
| listener_logger.clone(), |
| ) |
| .await |
| .expect("proxy creation should succeed"); |
| |
| let (service_client, _) = delegate.create(MessengerType::Unbound).await.unwrap(); |
| |
| let (handler_messenger, handler_receptor) = |
| handler_factory.lock().await.create(self.setting_type).await; |
| let (state_tx, state_rx) = futures::channel::mpsc::unbounded::<State>(); |
| let handler = SettingHandler::create( |
| handler_messenger, |
| handler_receptor, |
| proxy_handler_signature, |
| self.setting_type, |
| state_tx, |
| self.done_tx, |
| ); |
| |
| TestEnvironment { |
| proxy_handler_signature, |
| service_client, |
| handler_factory, |
| setting_handler_rx: state_rx, |
| setting_handler: handler, |
| setting_type: self.setting_type, |
| delegate, |
| inspector, |
| } |
| } |
| } |
| |
| struct TestEnvironment { |
| proxy_handler_signature: service::message::Signature, |
| service_client: service::message::Messenger, |
| handler_factory: Arc<Mutex<FakeFactory>>, |
| setting_handler_rx: UnboundedReceiver<State>, |
| setting_handler: Arc<Mutex<SettingHandler>>, |
| setting_type: SettingType, |
| delegate: service::message::Delegate, |
| inspector: Inspector, |
| } |
| |
| impl TestEnvironment { |
| async fn regenerate_handler(&mut self, done_tx: Option<oneshot::Sender<()>>) { |
| let (handler_messenger, handler_receptor) = |
| self.handler_factory.lock().await.create(self.setting_type).await; |
| let (state_tx, state_rx) = futures::channel::mpsc::unbounded::<State>(); |
| self.setting_handler = SettingHandler::create( |
| handler_messenger, |
| handler_receptor, |
| self.proxy_handler_signature, |
| self.setting_type, |
| state_tx, |
| done_tx, |
| ); |
| |
| self.setting_handler_rx = state_rx; |
| } |
| } |
| |
| // Initializes an environment set up to handle Listen requests. |
| async fn init_listen_env() -> (TestEnvironment, ListenReceptor) { |
| let setting_type = SettingType::Unknown; |
| let environment = TestEnvironmentBuilder::new(setting_type).build().await; |
| |
| // Send a listen state and make sure sink is notified. |
| let mut listen_receptor = environment.service_client.message( |
| service::Payload::Setting(HandlerPayload::Request(Request::Listen)), |
| Audience::Address(service::Address::Handler(setting_type)), |
| ); |
| |
| assert!(listen_receptor.wait_for_acknowledge().await.is_ok(), "ack should be sent"); |
| (environment, listen_receptor) |
| } |
| |
| // Executes a Listen request. |
| async fn run_listen(env: Arc<Mutex<TestEnvironment>>) { |
| let mut environment = env.lock().await; |
| environment.setting_handler.lock().await.notify(); |
| |
| if let Some(state) = environment.setting_handler_rx.next().await { |
| assert_eq!(state, State::Listen); |
| } else { |
| panic!("should have received state update"); |
| } |
| } |
| |
| // Executes an EndListen request. |
| async fn run_end_listen(env: Arc<Mutex<TestEnvironment>>, listen_receptor: ListenReceptor) { |
| let mut environment = env.lock().await; |
| // Drop the listener so the service transitions into teardown. |
| drop(listen_receptor); |
| |
| if let Some(state) = environment.setting_handler_rx.next().await { |
| assert_eq!(state, State::EndListen); |
| } else { |
| panic!("should have received EndListen state update"); |
| } |
| } |
| |
| // Ensures setting proxy registers with the MessageHub. |
| #[fuchsia::test(allow_stalls = false)] |
| async fn test_message_hub_presence() { |
| let setting_type = SettingType::Unknown; |
| let environment = TestEnvironmentBuilder::new(setting_type).build().await; |
| |
| assert!(environment |
| .delegate |
| .contains(service::message::Signature::Address(service::Address::Handler(setting_type))) |
| .await |
| .expect("should have result")); |
| } |
| |
| #[fuchsia::test] |
| fn test_notify() { |
| let mut executor = fasync::TestExecutor::new_with_fake_time(); |
| |
| let environment_fut = init_listen_env(); |
| futures::pin_mut!(environment_fut); |
| let (environment, receptor) = |
| if let Poll::Ready((env, receptor)) = executor.run_until_stalled(&mut environment_fut) { |
| (env, receptor) |
| } else { |
| panic!("environment creation stalled"); |
| }; |
| |
| let env_handle = Arc::new(Mutex::new(environment)); |
| |
| let listen_fut = run_listen(env_handle.clone()); |
| futures::pin_mut!(listen_fut); |
| if let Poll::Ready(res) = executor.run_until_stalled(&mut listen_fut) { |
| res |
| } else { |
| panic!("Listen failed"); |
| }; |
| |
| let end_listen_fut = run_end_listen(env_handle.clone(), receptor); |
| futures::pin_mut!(end_listen_fut); |
| if let Poll::Ready(res) = executor.run_until_stalled(&mut end_listen_fut) { |
| res |
| } else { |
| panic!("EndListen failed"); |
| }; |
| |
| // Validate that the teardown timeout matches the constant. |
| let deadline = crate::clock::now() + TEARDOWN_TIMEOUT; |
| assert_eq!(Some(deadline), executor.wake_next_timer().map(Into::into)); |
| |
| let env_fut = env_handle.lock(); |
| futures::pin_mut!(env_fut); |
| let mut environment_lock = |
| if let Poll::Ready(env_lock) = executor.run_until_stalled(&mut env_fut) { |
| env_lock |
| } else { |
| panic!("Failed to acquire environment lock"); |
| }; |
| |
| let state_fut = environment_lock.setting_handler_rx.next(); |
| futures::pin_mut!(state_fut); |
| let state = if let Poll::Ready(state) = executor.run_until_stalled(&mut state_fut) { |
| state |
| } else { |
| panic!("state retrieval stalled"); |
| }; |
| |
| if let Some(state) = state { |
| assert_eq!(state, State::Teardown); |
| } else { |
| panic!("should have received Teardown state update"); |
| } |
| } |
| |
| #[fuchsia::test(allow_stalls = false)] |
| async fn test_request() { |
| let setting_type = SettingType::Unknown; |
| let environment = TestEnvironmentBuilder::new(setting_type).build().await; |
| |
| environment |
| .setting_handler |
| .lock() |
| .await |
| .queue_action(Request::Get, HandlerAction::Respond(Ok(None))); |
| |
| // Send initial request. |
| let mut receptor = environment.service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| ); |
| |
| if let Ok((HandlerPayload::Response(response), _)) = receptor.next_of::<HandlerPayload>().await |
| { |
| assert!(response.is_ok()); |
| assert_eq!(None, response.unwrap()); |
| } else { |
| panic!("should have received response"); |
| } |
| } |
| |
| #[fuchsia::test(allow_stalls = false)] |
| async fn test_request_order() { |
| let setting_type = SettingType::Unknown; |
| let request_id_1 = 0; |
| let request_id_2 = 1; |
| let environment = TestEnvironmentBuilder::new(setting_type).build().await; |
| |
| environment |
| .setting_handler |
| .lock() |
| .await |
| .queue_action(Request::Get, HandlerAction::Respond(Ok(None))); |
| |
| // Send multiple requests. |
| let receptor_1 = environment.service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| ); |
| let receptor_2 = environment.service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| ); |
| |
| // Wait for both requests to finish and add them to the list as they finish so we can verify the |
| // order. |
| let mut completed_request_ids = Vec::<u64>::new(); |
| let mut receptor_1_fuse = receptor_1.fuse(); |
| let mut receptor_2_fuse = receptor_2.fuse(); |
| loop { |
| environment |
| .setting_handler |
| .lock() |
| .await |
| .queue_action(Request::Get, HandlerAction::Respond(Ok(None))); |
| futures::select! { |
| payload_1 = receptor_1_fuse.next() => { |
| if let Ok((HandlerPayload::Response(_), _)) = |
| payload_1.map_or(Err(String::from("no event")), |
| HandlerPayload::try_from_with_client) { |
| // First request finishes first. |
| assert_eq!(completed_request_ids.len(), 0); |
| completed_request_ids.push(request_id_1); |
| } |
| }, |
| payload_2 = receptor_2_fuse.next() => { |
| if let Ok((HandlerPayload::Response(_), _)) = |
| payload_2.map_or(Err(String::from("no event")), |
| HandlerPayload::try_from_with_client) { |
| assert_eq!(completed_request_ids.len(), 1); |
| completed_request_ids.push(request_id_2); |
| } |
| }, |
| complete => break, |
| } |
| } |
| } |
| |
| struct ErrorFactory; |
| |
| #[async_trait] |
| impl SettingHandlerFactory for ErrorFactory { |
| async fn generate( |
| &mut self, |
| setting_type: SettingType, |
| _delegate: message::Delegate, |
| _notifier_signature: message::Signature, |
| ) -> Result<message::Signature, SettingHandlerFactoryError> { |
| Err(SettingHandlerFactoryError::HandlerStartupError(setting_type, "test error".into())) |
| } |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn inspect_catches_errors() { |
| // Set the clock so that timestamps will always be 0. |
| clock::mock::set(Time::from_nanos(0)); |
| |
| const SETTING_TYPE: SettingType = SettingType::Unknown; |
| |
| let delegate = service::MessageHub::create_hub(); |
| let (service_client, _) = delegate.create(MessengerType::Unbound).await.unwrap(); |
| let handler_factory = Arc::new(Mutex::new(ErrorFactory)); |
| let inspector = Inspector::default(); |
| let _proxy_handler_signature = SettingProxy::create( |
| SETTING_TYPE, |
| handler_factory, |
| delegate, |
| SETTING_PROXY_MAX_ATTEMPTS, |
| TEARDOWN_TIMEOUT, |
| None, |
| false, |
| inspector.root().create_child("test"), |
| Arc::new(Mutex::new(ListenerInspectLogger::with_inspector(&inspector))), |
| ) |
| .await |
| .expect("proxy creation should succeed"); |
| |
| let mut receptor = service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(SETTING_TYPE)), |
| ); |
| |
| let (payload, _) = receptor.next_payload().await.expect("should get payload"); |
| assert_matches::assert_matches!( |
| payload, |
| Payload::Setting(HandlerPayload::Response(Err(HandlerError::UnhandledType( |
| SettingType::Unknown, |
| )))) |
| ); |
| |
| assert_data_tree!(inspector, root: contains { |
| test: { |
| errors: { |
| "00000000000000000000": { |
| timestamp: "0.000000000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| } |
| } |
| } |
| }); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn test_active_listener_inspect() { |
| let (env, receptor) = init_listen_env().await; |
| let env_handle = Arc::new(Mutex::new(env)); |
| run_listen(env_handle.clone()).await; |
| |
| // Logger handle must be locally scoped so the lock doesn't deadlock with the code under test. |
| { |
| let env_handle_clone = env_handle.clone(); |
| let env_lock = env_handle_clone.lock().await; |
| let inspector = &env_lock.inspector; |
| assert_data_tree!(inspector, root: contains { |
| active_listeners: { |
| "Unknown": { |
| count: 1u64, |
| } |
| } |
| }); |
| } |
| |
| run_end_listen(env_handle.clone(), receptor).await; |
| |
| // Logger handle must be locally scoped so the lock doesn't deadlock with the code under test. |
| { |
| let env_handle_clone = env_handle.clone(); |
| let env_lock = env_handle_clone.lock().await; |
| let inspector = &env_lock.inspector; |
| assert_data_tree!(inspector, root: contains { |
| active_listeners: { |
| "Unknown": { |
| count: 0u64, |
| } |
| } |
| }); |
| } |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn inspect_errors_roll_after_limit() { |
| // Set the clock so that timestamps will always be 0. |
| clock::mock::set(Time::from_nanos(0)); |
| const SETTING_TYPE: SettingType = SettingType::Unknown; |
| |
| let delegate = service::MessageHub::create_hub(); |
| let (service_client, _) = delegate.create(MessengerType::Unbound).await.unwrap(); |
| let handler_factory = Arc::new(Mutex::new(ErrorFactory)); |
| let inspector = Inspector::default(); |
| let _proxy_handler_signature = SettingProxy::create( |
| SETTING_TYPE, |
| handler_factory, |
| delegate, |
| SETTING_PROXY_MAX_ATTEMPTS, |
| TEARDOWN_TIMEOUT, |
| None, |
| false, |
| inspector.root().create_child("test"), |
| Arc::new(Mutex::new(ListenerInspectLogger::with_inspector(&inspector))), |
| ) |
| .await |
| .expect("proxy creation should succeed"); |
| |
| for i in 0..(MAX_NODE_ERRORS + 1) { |
| clock::mock::set(Time::from_nanos(i as i64 * 1000)); |
| let mut receptor = service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(SETTING_TYPE)), |
| ); |
| |
| let (payload, _) = receptor.next_payload().await.expect("should get payload"); |
| assert_matches::assert_matches!( |
| payload, |
| Payload::Setting(HandlerPayload::Response(Err(HandlerError::UnhandledType( |
| SettingType::Unknown, |
| )))) |
| ); |
| } |
| |
| assert_data_tree!(inspector, root: contains { |
| test: { |
| errors: { |
| "00000000000000000001": { |
| timestamp: "0.000001000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000002": { |
| timestamp: "0.000002000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000003": { |
| timestamp: "0.000003000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000004": { |
| timestamp: "0.000004000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000005": { |
| timestamp: "0.000005000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000006": { |
| timestamp: "0.000006000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000007": { |
| timestamp: "0.000007000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000008": { |
| timestamp: "0.000008000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000009": { |
| timestamp: "0.000009000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| "00000000000000000010": { |
| timestamp: "0.000010000", |
| value: "HandlerStartupError(Unknown, \"test error\")" |
| }, |
| } |
| } |
| }); |
| } |
| |
| #[fuchsia::test] |
| fn test_regeneration() { |
| let setting_type = SettingType::Unknown; |
| |
| let mut executor = fasync::TestExecutor::new_with_fake_time(); |
| |
| async fn run_once(setting_type: SettingType) -> (oneshot::Receiver<()>, TestEnvironment) { |
| let (done_tx, done_rx) = oneshot::channel(); |
| let environment = |
| TestEnvironmentBuilder::new(setting_type).set_done_tx(Some(done_tx)).build().await; |
| |
| // Send initial request. |
| assert!( |
| get_response(environment.service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| )) |
| .await |
| .is_some(), |
| "response should have been received" |
| ); |
| |
| // Ensure the handler was only created once. |
| assert_eq!(1, environment.handler_factory.lock().await.get_request_count(setting_type)); |
| |
| // The subsequent teardown should happen here. |
| (done_rx, environment) |
| } |
| |
| let environment_fut = run_once(setting_type); |
| futures::pin_mut!(environment_fut); |
| let (done_rx, mut environment) = |
| if let Poll::Ready(output) = executor.run_until_stalled(&mut environment_fut) { |
| output |
| } else { |
| panic!("initial call stalled"); |
| }; |
| |
| let _ = executor.wake_next_timer(); |
| |
| futures::pin_mut!(done_rx); |
| assert_matches::assert_matches!(executor.run_until_stalled(&mut done_rx), Poll::Ready(Ok(_))); |
| |
| let mut hit_teardown = false; |
| loop { |
| let state_fut = environment.setting_handler_rx.next(); |
| futures::pin_mut!(state_fut); |
| let state = if let Poll::Ready(state) = executor.run_until_stalled(&mut state_fut) { |
| state |
| } else { |
| panic!("getting next state stalled"); |
| }; |
| match state { |
| Some(State::Teardown) => { |
| hit_teardown = true; |
| break; |
| } |
| None => break, |
| _ => {} |
| } |
| } |
| assert!(hit_teardown, "Handler should have torn down"); |
| |
| async fn complete(mut environment: TestEnvironment, setting_type: SettingType) { |
| drop(environment.setting_handler); |
| |
| // Now that the handler is dropped, the setting_handler_tx should be dropped too and the rx |
| // end will return none. |
| assert!( |
| environment.setting_handler_rx.next().await.is_none(), |
| "There should be no more states after teardown" |
| ); |
| |
| let (handler_messenger, handler_receptor) = |
| environment.handler_factory.lock().await.create(setting_type).await; |
| let (state_tx, _) = futures::channel::mpsc::unbounded::<State>(); |
| let _handler = SettingHandler::create( |
| handler_messenger, |
| handler_receptor, |
| environment.proxy_handler_signature, |
| setting_type, |
| state_tx, |
| None, |
| ); |
| |
| // Send followup request. |
| assert!( |
| get_response(environment.service_client.message( |
| HandlerPayload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| )) |
| .await |
| .is_some(), |
| "response should have been received" |
| ); |
| |
| // Check that the handler was re-generated. |
| assert_eq!(2, environment.handler_factory.lock().await.get_request_count(setting_type)); |
| } |
| |
| let complete_fut = complete(environment, setting_type); |
| futures::pin_mut!(complete_fut); |
| assert_eq!(executor.run_until_stalled(&mut complete_fut), Poll::Ready(())); |
| } |
| |
| // Exercises the retry flow, ensuring the setting proxy goes through the |
| // defined number of tests and correctly reports back activity. |
| #[fuchsia::test] |
| fn test_retry() { |
| let setting_type = SettingType::Unknown; |
| async fn run_retries(setting_type: SettingType) -> (TestEnvironment, message::Receptor) { |
| let environment = TestEnvironmentBuilder::new(setting_type).build().await; |
| |
| let mut event_receptor = service::build_event_listener(&environment.delegate).await; |
| |
| // Queue up external failure responses in the handler. |
| for _ in 0..SETTING_PROXY_MAX_ATTEMPTS { |
| environment.setting_handler.lock().await.queue_action( |
| Request::Get, |
| HandlerAction::Respond(Err(ControllerError::ExternalFailure( |
| setting_type, |
| "test_component".into(), |
| "connect".into(), |
| "Error".into(), |
| ))), |
| ); |
| } |
| |
| let request = Request::Get; |
| |
| // Send request. |
| let handler_result = get_response(environment.service_client.message( |
| HandlerPayload::Request(request.clone()).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| )) |
| .await |
| .expect("result should be present"); |
| |
| assert_eq!( |
| handler_result, |
| Err(HandlerError::ExternalFailure( |
| SettingType::Unknown, |
| "test_component".into(), |
| "connect".into(), |
| "Error".into() |
| )), |
| "error should have been encountered", |
| ); |
| |
| // For each failed attempt, make sure a retry event was broadcasted |
| for _ in 0..SETTING_PROXY_MAX_ATTEMPTS { |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Execute, request.clone()), |
| ); |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Retry, request.clone()), |
| ); |
| } |
| |
| // Ensure that the final event reports that attempts were exceeded |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Request( |
| event::handler::Action::AttemptsExceeded, |
| request.clone(), |
| ), |
| ); |
| |
| (environment, event_receptor) |
| } |
| |
| async fn run_to_response( |
| mut environment: TestEnvironment, |
| mut _event_receptor: message::Receptor, |
| setting_type: SettingType, |
| ) -> (TestEnvironment, message::Receptor) { |
| // Regenerate setting handler |
| environment.regenerate_handler(None).await; |
| |
| // Queue successful response |
| environment |
| .setting_handler |
| .lock() |
| .await |
| .queue_action(Request::Get, HandlerAction::Respond(Ok(None))); |
| |
| let request = Request::Get; |
| // Ensure subsequent request succeeds |
| assert_matches::assert_matches!( |
| get_response(environment.service_client.message( |
| HandlerPayload::Request(request.clone()).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| ),) |
| .await, |
| Some(Ok(_)) |
| ); |
| |
| (environment, _event_receptor) |
| } |
| |
| let mut executor = fasync::TestExecutor::new_with_fake_time(); |
| |
| let environment_fut = run_retries(setting_type); |
| futures::pin_mut!(environment_fut); |
| let (environment, event_receptor) = |
| if let Poll::Ready(output) = executor.run_until_stalled(&mut environment_fut) { |
| output |
| } else { |
| panic!("environment creation and retries stalled"); |
| }; |
| |
| let _ = executor.wake_next_timer(); |
| |
| let environment_fut = run_to_response(environment, event_receptor, setting_type); |
| futures::pin_mut!(environment_fut); |
| let (_environment, mut event_receptor) = |
| if let Poll::Ready(output) = executor.run_until_stalled(&mut environment_fut) { |
| output |
| } else { |
| panic!("running final step stalled"); |
| }; |
| |
| let _ = executor.wake_next_timer(); |
| |
| let event_fut = event_receptor.next_of::<event::Payload>(); |
| futures::pin_mut!(event_fut); |
| let state = if let Poll::Ready(Ok((payload, _))) = executor.run_until_stalled(&mut event_fut) { |
| payload |
| } else { |
| panic!("state retrieval stalled or had no result"); |
| }; |
| |
| // Make sure SettingHandler tears down |
| verify_handler_event(setting_type, state, event::handler::Event::Teardown); |
| } |
| |
| // Ensures early exit triggers retry flow. |
| #[fuchsia::test(allow_stalls = false)] |
| async fn test_early_exit() { |
| let exit_result = Ok(()); |
| let setting_type = SettingType::Unknown; |
| let environment = TestEnvironmentBuilder::new(setting_type).build().await; |
| |
| let mut event_receptor = service::build_event_listener(&environment.delegate).await; |
| |
| // Queue up external failure responses in the handler. |
| for _ in 0..SETTING_PROXY_MAX_ATTEMPTS { |
| environment |
| .setting_handler |
| .lock() |
| .await |
| .queue_action(Request::Get, HandlerAction::Exit(exit_result.clone())); |
| } |
| |
| let request = Request::Get; |
| |
| // Send request. |
| let handler_result = get_response(environment.service_client.message( |
| HandlerPayload::Request(request.clone()).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| )) |
| .await |
| .expect("result should be present"); |
| |
| // Make sure the result is an `ControllerError::IrrecoverableError` |
| if let Err(error) = handler_result { |
| assert_eq!(error, HandlerError::IrrecoverableError); |
| } else { |
| panic!("error should have been encountered"); |
| } |
| |
| //For each failed attempt, make sure a retry event was broadcasted |
| for _ in 0..SETTING_PROXY_MAX_ATTEMPTS { |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Execute, request.clone()), |
| ); |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Exit(exit_result.clone()), |
| ); |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Retry, request.clone()), |
| ); |
| } |
| |
| // Ensure that the final event reports that attempts were exceeded |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of external failure") |
| .0, |
| event::handler::Event::Request(event::handler::Action::AttemptsExceeded, request.clone()), |
| ); |
| } |
| |
| // Ensures timeouts trigger retry flow. |
| #[fuchsia::test] |
| fn test_timeout() { |
| let mut executor = fuchsia_async::TestExecutor::new_with_fake_time(); |
| |
| let mut fut = pin!(async move { |
| let setting_type = SettingType::Unknown; |
| let environment = TestEnvironmentBuilder::new(setting_type) |
| .set_timeout(SETTING_PROXY_TIMEOUT_MS.millis(), true) |
| .build() |
| .await; |
| |
| let mut event_receptor = service::build_event_listener(&environment.delegate).await; |
| |
| // Queue up to ignore resquests |
| for _ in 0..SETTING_PROXY_MAX_ATTEMPTS { |
| environment |
| .setting_handler |
| .lock() |
| .await |
| .queue_action(Request::Get, HandlerAction::Ignore); |
| } |
| |
| let request = Request::Get; |
| |
| // Send request. |
| let mut receptor = environment |
| .delegate |
| .create(MessengerType::Unbound) |
| .await |
| .expect("messenger should be created") |
| .0 |
| .message( |
| HandlerPayload::Request(request.clone()).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| ); |
| |
| assert_matches!( |
| receptor.next_of::<HandlerPayload>().await.expect("should receive response").0, |
| HandlerPayload::Response(Err(HandlerError::TimeoutError)) |
| ); |
| |
| // For each failed attempt, make sure a retry event was broadcasted |
| for _ in 0..SETTING_PROXY_MAX_ATTEMPTS { |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of execute") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Execute, request.clone()), |
| ); |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of timeout") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Timeout, request.clone()), |
| ); |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of reattempt") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Retry, request.clone()), |
| ); |
| } |
| |
| // Ensure that the final event reports that attempts were exceeded |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of exceeded attempts") |
| .0, |
| event::handler::Event::Request( |
| event::handler::Action::AttemptsExceeded, |
| request.clone(), |
| ), |
| ); |
| }); |
| |
| loop { |
| let new_time = fuchsia_async::Time::from_nanos( |
| executor.now().into_nanos() |
| + fuchsia_zircon::Duration::from_millis(SETTING_PROXY_TIMEOUT_MS).into_nanos(), |
| ); |
| match executor.run_until_stalled(&mut fut) { |
| Poll::Ready(x) => break x, |
| Poll::Pending => executor.set_fake_time(new_time), |
| } |
| } |
| } |
| |
| // Ensures that timeouts cause an error when retry is not enabled for them. |
| #[fuchsia::test] |
| fn test_timeout_no_retry() { |
| let mut executor = fuchsia_async::TestExecutor::new_with_fake_time(); |
| |
| let mut fut = pin!(async move { |
| let setting_type = SettingType::Unknown; |
| let environment = TestEnvironmentBuilder::new(setting_type) |
| .set_timeout(SETTING_PROXY_TIMEOUT_MS.millis(), false) |
| .build() |
| .await; |
| |
| let mut event_receptor = service::build_event_listener(&environment.delegate).await; |
| |
| // Queue up to ignore requests |
| environment.setting_handler.lock().await.queue_action( |
| Request::Get, |
| HandlerAction::RespondAfterDelay( |
| Ok(None), |
| Duration::from_millis(SETTING_PROXY_TIMEOUT_MS * 2), |
| ), |
| ); |
| |
| let request = Request::Get; |
| |
| // Send request. |
| let handler_result = get_response(environment.service_client.message( |
| HandlerPayload::Request(request.clone()).into(), |
| Audience::Address(service::Address::Handler(setting_type)), |
| )) |
| .await |
| .expect("result should be present"); |
| |
| // Make sure the result is an `ControllerError::TimeoutError` |
| assert_matches!(handler_result, Err(HandlerError::TimeoutError)); |
| |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of execution") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Execute, request.clone()), |
| ); |
| verify_handler_event( |
| setting_type, |
| event_receptor |
| .next_of::<event::Payload>() |
| .await |
| .expect("should be notified of timeout") |
| .0, |
| event::handler::Event::Request(event::handler::Action::Timeout, request), |
| ); |
| }); |
| |
| loop { |
| let new_time = fuchsia_async::Time::from_nanos( |
| executor.now().into_nanos() |
| + fuchsia_zircon::Duration::from_millis(SETTING_PROXY_TIMEOUT_MS).into_nanos(), |
| ); |
| match executor.run_until_stalled(&mut fut) { |
| Poll::Ready(x) => break x, |
| Poll::Pending => { |
| executor.set_fake_time(new_time); |
| } |
| } |
| } |
| } |
| |
| /// Checks that the supplied message event specifies the supplied handler event. |
| fn verify_handler_event( |
| setting_type: SettingType, |
| event_payload: event::Payload, |
| event: event::handler::Event, |
| ) { |
| if let event::Payload::Event(event::Event::Handler(captured_type, captured_event)) = |
| event_payload |
| { |
| assert_eq!(captured_type, setting_type); |
| assert_eq!(event, captured_event); |
| return; |
| } |
| |
| panic!("should have matched the provided event"); |
| } |
| |
| async fn get_response(mut receptor: service::message::Receptor) -> Option<Response> { |
| if let Ok((HandlerPayload::Response(response), _)) = receptor.next_of::<HandlerPayload>().await |
| { |
| Some(response) |
| } else { |
| None |
| } |
| } |