blob: ea1f82cfbea3ed694df1fd2ef551d7ccf404e3be [file]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::HashMap;
use std::sync::Arc;
use std::task::Poll;
use fuchsia_async as fasync;
use fuchsia_zircon::{Duration, DurationNum};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::lock::Mutex;
use futures::StreamExt;
use async_trait::async_trait;
use crate::handler::base::{
Command, Event, ExitResult, SettingHandlerFactory, SettingHandlerFactoryError,
SettingHandlerResult, State,
};
use crate::handler::setting_handler::ControllerError;
use crate::handler::setting_proxy::SettingProxy;
use crate::internal::core::message::{create_hub, Messenger, Receptor};
use crate::internal::core::{self, Address, Payload};
use crate::internal::event;
use crate::internal::handler;
use crate::message::base::{Audience, MessageEvent, MessengerType};
use crate::message::receptor::Receptor as BaseReceptor;
use crate::switchboard::base::{
SettingAction, SettingActionData, SettingEvent, SettingRequest, SettingType,
};
const SETTING_PROXY_MAX_ATTEMPTS: u64 = 3;
const SETTING_PROXY_TIMEOUT_MS: i64 = 1;
pub type SwitchboardReceptor = BaseReceptor<Payload, Address>;
struct SettingHandler {
setting_type: SettingType,
messenger: handler::message::Messenger,
state_tx: UnboundedSender<State>,
responses: Vec<(SettingRequest, HandlerAction)>,
done_tx: Option<oneshot::Sender<()>>,
proxy_signature: handler::message::Signature,
}
enum HandlerAction {
Ignore,
Exit(ExitResult),
Respond(SettingHandlerResult),
}
impl SettingHandler {
fn process_state(&mut self, state: State) -> SettingHandlerResult {
self.state_tx.unbounded_send(state).ok();
Ok(None)
}
pub fn queue_action(&mut self, request: SettingRequest, action: HandlerAction) {
self.responses.push((request, action))
}
pub fn notify(&self) {
self.messenger
.message(
handler::Payload::Event(Event::Changed),
Audience::Messenger(self.proxy_signature),
)
.send()
.ack();
}
fn process_request(&mut self, request: SettingRequest) -> Option<SettingHandlerResult> {
if let Some((match_request, action)) = self.responses.pop() {
if request == match_request {
match action {
HandlerAction::Respond(result) => {
return Some(result);
}
HandlerAction::Ignore => {
return None;
}
HandlerAction::Exit(result) => {
self.messenger
.message(
handler::Payload::Event(Event::Exited(result)),
Audience::Messenger(self.proxy_signature),
)
.send()
.ack();
return None;
}
}
}
}
Some(Err(ControllerError::UnimplementedRequest(self.setting_type, request)))
}
fn create(
messenger: handler::message::Messenger,
mut receptor: handler::message::Receptor,
proxy_signature: handler::message::Signature,
setting_type: SettingType,
state_tx: UnboundedSender<State>,
done_tx: Option<oneshot::Sender<()>>,
) -> Arc<Mutex<Self>> {
let handler = Arc::new(Mutex::new(Self {
messenger,
setting_type,
state_tx,
responses: vec![],
done_tx,
proxy_signature,
}));
let handler_clone = handler.clone();
fasync::Task::spawn(async move {
while let Some(event) = receptor.next().await {
match event {
MessageEvent::Message(
handler::Payload::Command(Command::HandleRequest(request)),
client,
) => {
if let Some(response) = handler_clone.lock().await.process_request(request)
{
handler::reply(client, response);
}
}
MessageEvent::Message(
handler::Payload::Command(Command::ChangeState(state)),
client,
) => {
handler::reply(client, handler_clone.lock().await.process_state(state));
}
_ => {}
}
}
if let Some(done_tx) = handler_clone.lock().await.done_tx.take() {
done_tx.send(()).ok();
}
})
.detach();
handler
}
}
struct FakeFactory {
handlers: HashMap<SettingType, handler::message::Signature>,
request_counts: HashMap<SettingType, u64>,
messenger_factory: handler::message::Factory,
}
impl FakeFactory {
pub fn new(messenger_factory: handler::message::Factory) -> Self {
FakeFactory {
handlers: HashMap::new(),
request_counts: HashMap::new(),
messenger_factory: messenger_factory,
}
}
pub async fn create(
&mut self,
setting_type: SettingType,
) -> (handler::message::Messenger, handler::message::Receptor) {
let (client, receptor) =
self.messenger_factory.create(MessengerType::Unbound).await.unwrap();
self.handlers.insert(setting_type, client.get_signature());
(client, receptor)
}
pub fn get_request_count(&mut self, setting_type: SettingType) -> u64 {
if let Some(count) = self.request_counts.get(&setting_type) {
*count
} else {
0
}
}
}
#[async_trait]
impl SettingHandlerFactory for FakeFactory {
async fn generate(
&mut self,
setting_type: SettingType,
_: handler::message::Factory,
_: handler::message::Signature,
) -> Result<handler::message::Signature, SettingHandlerFactoryError> {
let existing_count = self.get_request_count(setting_type);
Ok(self
.handlers
.get(&setting_type)
.copied()
.map(|signature| {
self.request_counts.insert(setting_type, existing_count + 1);
signature
})
.unwrap())
}
}
struct TestEnvironmentBuilder {
setting_type: SettingType,
done_tx: Option<oneshot::Sender<()>>,
timeout: Option<(Duration, bool)>,
}
impl TestEnvironmentBuilder {
pub fn new(setting_type: SettingType) -> Self {
Self { setting_type, done_tx: None, timeout: None }
}
pub fn set_done_tx(mut self, tx: Option<oneshot::Sender<()>>) -> Self {
self.done_tx = tx;
self
}
pub fn set_timeout(mut self, duration: Duration, retry_on_timeout: bool) -> Self {
self.timeout = Some((duration, retry_on_timeout));
self
}
pub async fn build(self) -> TestEnvironment {
let messenger_factory = create_hub();
let handler_messenger_factory = handler::message::create_hub();
let event_messenger_factory = event::message::create_hub();
let handler_factory =
Arc::new(Mutex::new(FakeFactory::new(handler_messenger_factory.clone())));
let (proxy_signature, proxy_handler_signature) = SettingProxy::create(
self.setting_type,
handler_factory.clone(),
messenger_factory.clone(),
handler_messenger_factory,
event_messenger_factory.clone(),
SETTING_PROXY_MAX_ATTEMPTS,
self.timeout.map_or(None, |(duration, _)| Some(duration)),
self.timeout.map_or(true, |(_, retry)| retry),
)
.await
.expect("proxy creation should succeed");
let (messenger_client, receptor) = messenger_factory
.create(MessengerType::Addressable(Address::Switchboard))
.await
.unwrap();
let (handler_messenger, handler_receptor) =
handler_factory.lock().await.create(self.setting_type).await;
let (state_tx, state_rx) = futures::channel::mpsc::unbounded::<State>();
let handler = SettingHandler::create(
handler_messenger,
handler_receptor,
proxy_handler_signature,
self.setting_type,
state_tx,
self.done_tx,
);
TestEnvironment {
proxy_signature,
proxy_handler_signature,
messenger_client,
messenger_receptor: receptor,
handler_factory,
setting_handler_rx: state_rx,
setting_handler: handler,
event_factory: event_messenger_factory,
setting_type: self.setting_type,
}
}
}
pub struct TestEnvironment {
proxy_signature: core::message::Signature,
proxy_handler_signature: handler::message::Signature,
messenger_client: Messenger,
messenger_receptor: Receptor,
handler_factory: Arc<Mutex<FakeFactory>>,
setting_handler_rx: UnboundedReceiver<State>,
setting_handler: Arc<Mutex<SettingHandler>>,
setting_type: SettingType,
event_factory: event::message::Factory,
}
impl TestEnvironment {
async fn regenerate_handler(&mut self, done_tx: Option<oneshot::Sender<()>>) {
let (handler_messenger, handler_receptor) =
self.handler_factory.lock().await.create(self.setting_type).await;
let (state_tx, state_rx) = futures::channel::mpsc::unbounded::<State>();
self.setting_handler = SettingHandler::create(
handler_messenger,
handler_receptor,
self.proxy_handler_signature,
self.setting_type,
state_tx,
done_tx,
);
self.setting_handler_rx = state_rx;
}
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_notify() {
let setting_type = SettingType::Unknown;
let mut environment = TestEnvironmentBuilder::new(setting_type).build().await;
// Send a listen state and make sure sink is notified.
{
assert!(environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: 1,
setting_type,
data: SettingActionData::Listen(1),
}),
Audience::Messenger(environment.proxy_signature),
)
.send()
.wait_for_acknowledge()
.await
.is_ok());
environment.setting_handler.lock().await.notify();
while let Some(event) = environment.messenger_receptor.next().await {
if let MessageEvent::Message(Payload::Event(SettingEvent::Changed(changed_type)), _) =
event
{
assert_eq!(changed_type, setting_type);
break;
}
}
}
if let Some(state) = environment.setting_handler_rx.next().await {
assert_eq!(state, State::Listen);
} else {
panic!("should have received state update");
}
// Send an end listen state and make sure sink is notified.
{
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: 1,
setting_type,
data: SettingActionData::Listen(0),
}),
Audience::Messenger(environment.proxy_signature),
)
.send()
.ack();
}
if let Some(state) = environment.setting_handler_rx.next().await {
assert_eq!(state, State::EndListen);
} else {
panic!("should have received EndListen state update");
}
if let Some(state) = environment.setting_handler_rx.next().await {
assert_eq!(state, State::Teardown);
} else {
panic!("should have received Teardown state update");
}
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_request() {
let setting_type = SettingType::Unknown;
let request_id = 42;
let environment = TestEnvironmentBuilder::new(setting_type).build().await;
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Respond(Ok(None)));
// Send initial request.
let mut receptor = environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(SettingRequest::Get),
}),
Audience::Messenger(environment.proxy_signature),
)
.send();
while let Some(event) = receptor.next().await {
if let MessageEvent::Message(
Payload::Event(SettingEvent::Response(response_id, response)),
_,
) = event
{
assert_eq!(request_id, response_id);
assert!(response.is_ok());
assert_eq!(None, response.unwrap());
return;
}
}
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_request_order() {
let setting_type = SettingType::Unknown;
let request_id_1 = 42;
let request_id_2 = 43;
let environment = TestEnvironmentBuilder::new(setting_type).build().await;
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Respond(Ok(None)));
// Send multiple requests.
let receptor_1 = environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id_1,
setting_type,
data: SettingActionData::Request(SettingRequest::Get),
}),
Audience::Messenger(environment.proxy_signature),
)
.send();
let receptor_2 = environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id_2,
setting_type,
data: SettingActionData::Request(SettingRequest::Get),
}),
Audience::Messenger(environment.proxy_signature),
)
.send();
// Wait for both requests to finish and add them to the list as they finish so we can verify the
// order.
let mut completed_request_ids = Vec::<u64>::new();
let mut receptor_1_fuse = receptor_1.fuse();
let mut receptor_2_fuse = receptor_2.fuse();
loop {
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Respond(Ok(None)));
futures::select! {
payload_1 = receptor_1_fuse.next() => {
if let Some(MessageEvent::Message(
Payload::Event(SettingEvent::Response(response_id, response)),
_,
)) = payload_1
{
assert_eq!(request_id_1, response_id);
// First request finishes first.
assert_eq!(completed_request_ids.len(), 0);
completed_request_ids.push(response_id);
}
},
payload_2 = receptor_2_fuse.next() => {
if let Some(MessageEvent::Message(
Payload::Event(SettingEvent::Response(response_id, response)),
_,
)) = payload_2
{
assert_eq!(request_id_2, response_id);
// Second request finishes second.
assert_eq!(completed_request_ids.len(), 1);
completed_request_ids.push(response_id);
}
},
complete => break,
}
}
assert_eq!(completed_request_ids.len(), 2);
assert_eq!(completed_request_ids[0], request_id_1);
assert_eq!(completed_request_ids[1], request_id_2);
}
/// Ensures setting handler is only generated once if never torn down.
#[fuchsia_async::run_until_stalled(test)]
async fn test_generation() {
let setting_type = SettingType::Unknown;
let request_id = 42;
let environment = TestEnvironmentBuilder::new(setting_type).build().await;
// Send initial request.
let _ = get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Listen(1),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await;
// Ensure the handler was only created once.
assert_eq!(1, environment.handler_factory.lock().await.get_request_count(setting_type));
// Send followup request.
let _ = get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(SettingRequest::Get),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await;
// Make sure no followup generation was invoked.
assert_eq!(1, environment.handler_factory.lock().await.get_request_count(setting_type));
}
/// Ensures setting handler is generated multiple times successfully if torn down.
#[fuchsia_async::run_until_stalled(test)]
async fn test_regeneration() {
let setting_type = SettingType::Unknown;
let request_id = 42;
let (done_tx, done_rx) = oneshot::channel();
let mut environment =
TestEnvironmentBuilder::new(setting_type).set_done_tx(Some(done_tx)).build().await;
// Send initial request.
assert!(
get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(SettingRequest::Get),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.is_some(),
"A response was expected"
);
// Ensure the handler was only created once.
assert_eq!(1, environment.handler_factory.lock().await.get_request_count(setting_type));
// The subsequent teardown should happen here.
done_rx.await.ok();
let mut hit_teardown = false;
loop {
let state = environment.setting_handler_rx.next().await;
match state {
Some(State::Teardown) => {
hit_teardown = true;
break;
}
None => break,
_ => {}
}
}
assert!(hit_teardown, "Handler should have torn down");
drop(environment.setting_handler);
// Now that the handler is dropped, the setting_handler_tx should be dropped too and the rx end
// will return none.
assert!(
environment.setting_handler_rx.next().await.is_none(),
"There should be no more states after teardown"
);
let (handler_messenger, handler_receptor) =
environment.handler_factory.lock().await.create(setting_type).await;
let (state_tx, _) = futures::channel::mpsc::unbounded::<State>();
let _handler = SettingHandler::create(
handler_messenger,
handler_receptor,
environment.proxy_handler_signature,
setting_type,
state_tx,
None,
);
// Send followup request.
assert!(
get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(SettingRequest::Get),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.is_some(),
"A response was expected"
);
// Check that the handler was re-generated.
assert_eq!(2, environment.handler_factory.lock().await.get_request_count(setting_type));
}
/// Exercises the retry flow, ensuring the setting proxy goes through the
/// defined number of tests and correctly reports back activity.
#[fuchsia_async::run_until_stalled(test)]
async fn test_retry() {
let setting_type = SettingType::Unknown;
let mut environment = TestEnvironmentBuilder::new(setting_type).build().await;
let (_, mut event_receptor) = environment
.event_factory
.create(MessengerType::Unbound)
.await
.expect("Should be able to retrieve receptor");
// Queue up external failure responses in the handler.
for _ in 0..SETTING_PROXY_MAX_ATTEMPTS {
environment.setting_handler.lock().await.queue_action(
SettingRequest::Get,
HandlerAction::Respond(Err(ControllerError::ExternalFailure(
setting_type,
"test_component".into(),
"connect".into(),
))),
);
}
let request_id = 2;
let request = SettingRequest::Get;
// Send request.
let (returned_request_id, handler_result) = get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(request.clone()),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.expect("result should be present");
// Ensure returned request id matches the outgoing id
assert_eq!(request_id, returned_request_id);
// Make sure the result is an `ControllerError::IrrecoverableError`
if let Err(error) = handler_result {
assert_eq!(error, ControllerError::IrrecoverableError);
} else {
panic!("error should have been encountered");
}
// For each failed attempt, make sure a retry event was broadcasted
for _ in 0..SETTING_PROXY_MAX_ATTEMPTS {
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::Execute(request_id),
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::Retry(request.clone()),
);
}
// Ensure that the final event reports that attempts were exceeded
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::AttemptsExceeded(request.clone()),
);
// Regenerate setting handler
environment.regenerate_handler(None).await;
// Queue successful response
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Respond(Ok(None)));
// Ensure subsequent request succeeds
assert!(get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(request.clone()),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.expect("result should be present")
.1
.is_ok());
// Make sure SettingHandler tears down
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of teardown"),
event::handler::Event::Teardown,
);
}
/// Ensures early exit triggers retry flow.
#[fuchsia_async::run_until_stalled(test)]
async fn test_early_exit() {
let exit_result = Ok(());
let setting_type = SettingType::Unknown;
let environment = TestEnvironmentBuilder::new(setting_type).build().await;
let (_, mut event_receptor) = environment
.event_factory
.create(MessengerType::Unbound)
.await
.expect("Should be able to retrieve receptor");
// Queue up external failure responses in the handler.
for _ in 0..SETTING_PROXY_MAX_ATTEMPTS {
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Exit(exit_result.clone()));
}
let request_id = 2;
let request = SettingRequest::Get;
// Send request.
let (returned_request_id, handler_result) = get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(request.clone()),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.expect("result should be present");
// Ensure returned request id matches the outgoing id
assert_eq!(request_id, returned_request_id);
// Make sure the result is an `ControllerError::IrrecoverableError`
if let Err(error) = handler_result {
assert_eq!(error, ControllerError::IrrecoverableError);
} else {
panic!("error should have been encountered");
}
//For each failed attempt, make sure a retry event was broadcasted
for _ in 0..SETTING_PROXY_MAX_ATTEMPTS {
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::Execute(request_id),
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::Exit(exit_result.clone()),
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::Retry(request.clone()),
);
}
// Ensure that the final event reports that attempts were exceeded
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of external failure"),
event::handler::Event::AttemptsExceeded(request.clone()),
);
}
/// Ensures timeouts trigger retry flow.
#[test]
fn test_timeout() {
let mut executor =
fuchsia_async::Executor::new_with_fake_time().expect("Failed to create executor");
let fut = async move {
let setting_type = SettingType::Unknown;
let environment = TestEnvironmentBuilder::new(setting_type)
.set_timeout(SETTING_PROXY_TIMEOUT_MS.millis(), true)
.build()
.await;
let (_, mut event_receptor) = environment
.event_factory
.create(MessengerType::Unbound)
.await
.expect("Should be able to retrieve receptor");
// Queue up to ignore resquests
for _ in 0..SETTING_PROXY_MAX_ATTEMPTS {
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Ignore);
}
let request_id = 2;
let request = SettingRequest::Get;
// Send request.
let (returned_request_id, handler_result) = get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(request.clone()),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.expect("result should be present");
// Ensure returned request id matches the outgoing id
assert_eq!(request_id, returned_request_id);
// Make sure the result is an `ControllerError::TimeoutError`
assert!(
matches!(handler_result, Err(ControllerError::TimeoutError)),
"error should have been encountered"
);
// For each failed attempt, make sure a retry event was broadcasted
for _ in 0..SETTING_PROXY_MAX_ATTEMPTS {
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of execute"),
event::handler::Event::Execute(request_id),
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of timeout"),
event::handler::Event::Timeout(request.clone()),
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of reattempt"),
event::handler::Event::Retry(request.clone()),
);
}
// Ensure that the final event reports that attempts were exceeded
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of exceeded attempts"),
event::handler::Event::AttemptsExceeded(request.clone()),
);
};
pin_utils::pin_mut!(fut);
let _result = loop {
executor.wake_main_future();
let new_time = fuchsia_async::Time::from_nanos(
executor.now().into_nanos()
+ fuchsia_zircon::Duration::from_millis(SETTING_PROXY_TIMEOUT_MS).into_nanos(),
);
match executor.run_one_step(&mut fut) {
Some(Poll::Ready(x)) => break x,
None => panic!("Executor stalled"),
Some(Poll::Pending) => {
executor.set_fake_time(new_time);
}
}
};
}
/// Ensures that timeouts cause an error when retry is not enabled for them.
#[test]
fn test_timeout_no_retry() {
let mut executor =
fuchsia_async::Executor::new_with_fake_time().expect("Failed to create executor");
let fut = async move {
let setting_type = SettingType::Unknown;
let environment = TestEnvironmentBuilder::new(setting_type)
.set_timeout(SETTING_PROXY_TIMEOUT_MS.millis(), false)
.build()
.await;
let (_, mut event_receptor) = environment
.event_factory
.create(MessengerType::Unbound)
.await
.expect("Should be able to retrieve receptor");
// Queue up to ignore resquests
environment
.setting_handler
.lock()
.await
.queue_action(SettingRequest::Get, HandlerAction::Respond(Ok(None)));
let request_id = 2;
let request = SettingRequest::Get;
// Send request.
let (returned_request_id, handler_result) = get_response(
environment
.messenger_client
.message(
Payload::Action(SettingAction {
id: request_id,
setting_type,
data: SettingActionData::Request(request.clone()),
}),
Audience::Messenger(environment.proxy_signature),
)
.send(),
)
.await
.expect("result should be present");
// Ensure returned request id matches the outgoing id
assert_eq!(request_id, returned_request_id);
// Make sure the result is an `ControllerError::TimeoutError`
assert!(
matches!(handler_result, Err(ControllerError::TimeoutError)),
"error should have been encountered"
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of execution"),
event::handler::Event::Execute(request_id),
);
verify_handler_event(
setting_type,
event_receptor.next().await.expect("should be notified of timeout"),
event::handler::Event::Timeout(request),
);
};
pin_utils::pin_mut!(fut);
let _result = loop {
executor.wake_main_future();
let new_time = fuchsia_async::Time::from_nanos(
executor.now().into_nanos()
+ fuchsia_zircon::Duration::from_millis(SETTING_PROXY_TIMEOUT_MS).into_nanos(),
);
match executor.run_one_step(&mut fut) {
Some(Poll::Ready(x)) => break x,
None => panic!("Executor stalled"),
Some(Poll::Pending) => {
executor.set_fake_time(new_time);
}
}
};
}
/// Checks that the supplied message event specifies the supplied handler event.
fn verify_handler_event(
setting_type: SettingType,
message_event: MessageEvent<event::Payload, event::Address>,
event: event::handler::Event,
) {
if let MessageEvent::Message(
event::Payload::Event(event::Event::Handler(captured_type, captured_event)),
_,
) = message_event
{
assert_eq!(captured_type, setting_type);
assert_eq!(event, captured_event);
return;
}
panic!("should have matched the provided event");
}
async fn get_response(mut receptor: SwitchboardReceptor) -> Option<(u64, SettingHandlerResult)> {
while let Some(event) = receptor.next().await {
if let MessageEvent::Message(
Payload::Event(SettingEvent::Response(response_id, response)),
_,
) = event
{
return Some((response_id, response));
}
}
return None;
}