blob: c4310c4e626c21df3545b1f02e89aaa06c687d9c [file]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::handler::base::{
Command, Event, ExitResult, SettingHandlerFactory, SettingHandlerResult, State,
};
use crate::handler::setting_handler::ControllerError;
use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::Error;
use fuchsia_async as fasync;
use fuchsia_syslog::fx_log_err;
use futures::channel::mpsc::UnboundedSender;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt};
use crate::internal::core;
use crate::internal::event;
use crate::internal::handler;
use crate::message::base::{Audience, MessageEvent, MessengerType, Status};
use crate::switchboard::base::{
SettingAction, SettingActionData, SettingEvent, SettingRequest, SettingType,
};
use fuchsia_zircon::Duration;
#[derive(Clone, Debug)]
struct ActiveRequest {
id: u64,
request: SettingRequest,
client: core::message::Client,
attempts: u64,
last_result: Option<SettingHandlerResult>,
}
#[derive(Clone, Debug)]
enum ActiveControllerRequest {
/// Request to add an active request from a ControllerState.
AddActive(ActiveRequest),
/// Executes the next active request, recreating the handler if the
/// argument is set to true.
Execute(bool),
/// Processes the result to a request.
HandleResult(u64, SettingHandlerResult),
/// Request to remove an active request from a ControllerState.
RemoveActive(u64),
/// Requests resources be torn down. Called when there are no more requests
/// to process.
Teardown,
/// Request to retry the current request.
Retry(u64),
/// Requests listen
Listen(ListenEvent),
}
#[derive(Clone, Debug, PartialEq)]
enum ListenEvent {
Restart,
ListenerCount(u64),
}
pub struct SettingProxy {
setting_type: SettingType,
messenger_client: core::message::Messenger,
client_signature: Option<handler::message::Signature>,
active_requests: VecDeque<ActiveRequest>,
has_active_listener: bool,
/// Handler factory.
handler_factory: Arc<Mutex<dyn SettingHandlerFactory + Send + Sync>>,
/// Factory for creating messengers to communicate with handlers.
controller_messenger_factory: handler::message::Factory,
/// Client for communicating with handlers.
controller_messenger_client: handler::message::Messenger,
/// Client for communicating events.
event_publisher: event::Publisher,
/// Sender for passing messages about the active requests and controllers.
active_controller_sender: UnboundedSender<ActiveControllerRequest>,
max_attempts: u64,
request_timeout: Option<Duration>,
retry_on_timeout: bool,
}
/// Publishes an event to the event_publisher.
macro_rules! publish {
($self:ident, $event:expr) => {
$self.event_publisher.send_event(event::Event::Handler($self.setting_type, $event));
};
}
impl SettingProxy {
/// Creates a SettingProxy that is listening to SettingAction from the
/// provided receiver and will send responses/updates on the given sender.
pub async fn create(
setting_type: SettingType,
handler_factory: Arc<Mutex<dyn SettingHandlerFactory + Send + Sync>>,
messenger_factory: core::message::Factory,
controller_messenger_factory: handler::message::Factory,
event_messenger_factory: event::message::Factory,
max_attempts: u64,
request_timeout: Option<Duration>,
retry_on_timeout: bool,
) -> Result<(core::message::Signature, handler::message::Signature), Error> {
let messenger_result = messenger_factory.create(MessengerType::Unbound).await;
if let Err(error) = messenger_result {
return Err(Error::new(error));
}
let (core_client, mut core_receptor) = messenger_result.unwrap();
let signature = core_client.get_signature();
let controller_messenger_result =
controller_messenger_factory.create(MessengerType::Unbound).await;
if let Err(error) = controller_messenger_result {
return Err(Error::new(error));
}
let (controller_messenger_client, mut controller_receptor) =
controller_messenger_result.unwrap();
let event_publisher = event::Publisher::create(
&event_messenger_factory,
MessengerType::Addressable(event::Address::SettingProxy(setting_type)),
)
.await;
let handler_signature = controller_messenger_client.get_signature();
let (active_controller_sender, mut active_controller_receiver) =
futures::channel::mpsc::unbounded::<ActiveControllerRequest>();
// We must create handle here rather than return back the value as we
// reference the proxy in the async tasks below.
let mut proxy = Self {
setting_type,
handler_factory,
client_signature: None,
active_requests: VecDeque::new(),
has_active_listener: false,
messenger_client: core_client,
controller_messenger_client,
controller_messenger_factory,
event_publisher: event_publisher,
active_controller_sender,
max_attempts,
request_timeout: request_timeout,
retry_on_timeout,
};
// Main task loop for receiving and processing incoming messages.
fasync::Task::spawn(async move {
loop {
let controller_fuse = controller_receptor.next().fuse();
let core_fuse = core_receptor.next().fuse();
futures::pin_mut!(controller_fuse, core_fuse);
futures::select! {
// Handle top level message from controllers.
controller_event = controller_fuse => {
if let Some(
MessageEvent::Message(handler::Payload::Event(event), client)
) = controller_event {
// Messages received after the client signature
// has been changed will be ignored.
if Some(client.get_author()) != proxy.client_signature {
continue;
}
match event {
Event::Changed => {
proxy.notify();
}
Event::Exited(result) => {
proxy.process_exit(result);
}
}
}
}
// Handle messages from the core messenger.
core_event = core_fuse => {
if let Some(
MessageEvent::Message(core::Payload::Action(action), message_client)
) = core_event {
proxy.process_action(action, message_client).await;
}
}
// Handle messages for dealing with the active_controllers.
request = active_controller_receiver.next() => {
if let Some(request) = request {
match request {
ActiveControllerRequest::AddActive(active_request) => {
proxy.add_active_request(active_request);
}
ActiveControllerRequest::Execute(recreate_handler) => {
proxy.execute_next_request(recreate_handler).await;
}
ActiveControllerRequest::RemoveActive(id) => {
proxy.remove_active_request(id);
}
ActiveControllerRequest::Teardown => {
proxy.teardown_if_needed().await
}
ActiveControllerRequest::Retry(id) => {
proxy.retry(id);
}
ActiveControllerRequest::HandleResult(id, result) => {
proxy.handle_result(id, result);
}
ActiveControllerRequest::Listen(event) => {
proxy.handle_listen(event).await;
}
}
}
}
}
}
})
.detach();
Ok((signature, handler_signature))
}
/// Interpret action from switchboard into proxy actions.
async fn process_action(
&mut self,
action: SettingAction,
mut message_client: core::message::Client,
) {
if self.setting_type != action.setting_type {
message_client
.reply(core::Payload::Event(SettingEvent::Response(
action.id,
Err(ControllerError::DeliveryError(action.setting_type, self.setting_type)),
)))
.send();
return;
}
match action.data {
SettingActionData::Request(request) => {
self.process_request(action.id, request, message_client).await;
}
SettingActionData::Listen(size) => {
self.request(ActiveControllerRequest::Listen(ListenEvent::ListenerCount(size)));
// Inform client that the request has been processed, regardless
// of whether a result was produced.
message_client.acknowledge().await;
}
}
}
async fn get_handler_signature(
&mut self,
force_create: bool,
) -> Option<handler::message::Signature> {
if force_create || self.client_signature.is_none() {
self.client_signature = self
.handler_factory
.lock()
.await
.generate(
self.setting_type,
self.controller_messenger_factory.clone(),
self.controller_messenger_client.get_signature(),
)
.await
.map_or(None, Some);
}
self.client_signature
}
/// Called by the receiver task when a sink has reported a change to its
/// setting type.
fn notify(&self) {
if !self.has_active_listener {
return;
}
self.messenger_client
.message(
core::Payload::Event(SettingEvent::Changed(self.setting_type)),
Audience::Address(core::Address::Switchboard),
)
.send();
}
fn process_exit(&mut self, result: ExitResult) {
// Log the exit
self.event_publisher.send_event(event::Event::Handler(
self.setting_type,
event::handler::Event::Exit(result),
));
// Clear the setting handler client signature
self.client_signature = None;
// If there is an active request, process the error
if !self.active_requests.is_empty() {
self.active_controller_sender
.unbounded_send(ActiveControllerRequest::HandleResult(
self.active_requests.front().expect("active request should be present").id,
Err(ControllerError::ExitError),
))
.ok();
}
// If there is an active listener, forefully refetch
if self.has_active_listener {
self.request(ActiveControllerRequest::Listen(ListenEvent::Restart));
}
}
/// Forwards request to proper sink. A new task is spawned in order to receive
/// the response. If no sink is available, an error is immediately reported
/// back.
async fn process_request(
&mut self,
id: u64,
request: SettingRequest,
client: core::message::Client,
) {
match self.get_handler_signature(false).await {
None => {
client
.reply(core::Payload::Event(SettingEvent::Response(
id,
Err(ControllerError::UnhandledType(self.setting_type)),
)))
.send();
}
Some(_) => {
// Add the request to the queue of requests to process.
let active_request = ActiveRequest {
id,
request: request.clone(),
client: client.clone(),
attempts: 0,
last_result: None,
};
self.request(ActiveControllerRequest::AddActive(active_request));
}
}
}
/// Adds an active request to the request queue for this setting.
///
/// If this is the first request in the queue, processing will begin immediately.
///
/// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
fn add_active_request(&mut self, active_request: ActiveRequest) {
self.active_requests.push_back(active_request);
if self.active_requests.len() == 1 {
self.request(ActiveControllerRequest::Execute(false));
}
}
fn request(&mut self, request: ActiveControllerRequest) {
self.active_controller_sender.unbounded_send(request).ok();
}
/// Notifies handler in the case the notification listener count is
/// non-zero and we aren't already listening for changes or there
/// are no more listeners and we are actively listening.
async fn handle_listen(&mut self, event: ListenEvent) {
if let ListenEvent::ListenerCount(size) = event {
let no_more_listeners = size == 0;
if no_more_listeners ^ self.has_active_listener {
return;
}
self.has_active_listener = size > 0;
}
let optional_handler_signature =
self.get_handler_signature(ListenEvent::Restart == event).await;
if optional_handler_signature.is_none() {
return;
}
let handler_signature =
optional_handler_signature.expect("handler signature should be present");
self.controller_messenger_client
.message(
handler::Payload::Command(Command::ChangeState(if self.has_active_listener {
State::Listen
} else {
State::EndListen
})),
Audience::Messenger(handler_signature),
)
.send()
.ack();
self.request(ActiveControllerRequest::Teardown);
}
fn handle_result(&mut self, id: u64, mut result: SettingHandlerResult) {
let request_index = self.active_requests.iter().position(|r| r.id == id);
let request = self
.active_requests
.get_mut(request_index.expect("request ID not found"))
.expect("request should be present");
let mut retry = false;
if matches!(result, Err(ControllerError::ExternalFailure(..)))
|| matches!(result, Err(ControllerError::ExitError))
{
result = Err(ControllerError::IrrecoverableError);
retry = true;
} else if matches!(result, Err(ControllerError::TimeoutError)) {
publish!(self, event::handler::Event::Timeout(request.request.clone()));
retry = self.retry_on_timeout;
}
request.last_result = Some(result);
if retry {
self.request(ActiveControllerRequest::Retry(id));
} else {
self.request(ActiveControllerRequest::RemoveActive(id));
}
}
/// Removes an active request from the request queue for this setting.
///
/// Should only be called once a request is finished processing.
///
/// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
fn remove_active_request(&mut self, id: u64) {
let request_index = self.active_requests.iter().position(|r| r.id == id);
let mut removed_request = self
.active_requests
.remove(request_index.expect("request ID not found"))
.expect("request should be present");
// Send result back to original caller if present.
if let Some(result) = removed_request.last_result.take() {
removed_request
.client
.reply(core::Payload::Event(SettingEvent::Response(removed_request.id, result)))
.send();
}
// If there are still requests to process, then request for the next to
// be processed. Otherwise request teardown.
if !self.active_requests.is_empty() {
self.request(ActiveControllerRequest::Execute(false));
} else {
self.request(ActiveControllerRequest::Teardown);
}
}
/// Processes the next request in the queue of active requests.
///
/// If the queue is empty, nothing happens.
///
/// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
async fn execute_next_request(&mut self, recreate_handler: bool) {
// Recreating signature is always honored, even if the request is not.
let signature = self
.get_handler_signature(recreate_handler)
.await
.expect("failed to generate handler signature");
let mut active_request = self
.active_requests
.front_mut()
.expect("execute should only be called with present requests");
active_request.attempts += 1;
// Note that we must copy these values as we are borrowing self for
// active_requests and self is needed to remove active below.
let id = active_request.id;
let current_attempts = active_request.attempts;
let request = active_request.request.clone();
// If we have exceeded the maximum number of attempts, remove this
// request from the queue.
if current_attempts > self.max_attempts {
publish!(self, event::handler::Event::AttemptsExceeded(request));
self.request(ActiveControllerRequest::RemoveActive(id));
return;
}
publish!(self, event::handler::Event::Execute(id));
let mut receptor = self
.controller_messenger_client
.message(
handler::Payload::Command(Command::HandleRequest(request.clone())),
Audience::Messenger(signature),
)
.set_timeout(self.request_timeout)
.send();
let active_controller_sender_clone = self.active_controller_sender.clone();
fasync::Task::spawn(async move {
while let Some(message_event) = receptor.next().await {
let handler_result = match message_event {
MessageEvent::Message(handler::Payload::Result(result), _) => Some(result),
MessageEvent::Status(Status::Undeliverable) => {
Some(Err(ControllerError::IrrecoverableError))
}
MessageEvent::Status(Status::Timeout) => {
Some(Err(ControllerError::TimeoutError))
}
_ => None,
};
if let Some(result) = handler_result {
// Mark the request as having been handled after retries have been
// attempted and the client has been notified.
active_controller_sender_clone
.unbounded_send(ActiveControllerRequest::HandleResult(id, result))
.ok();
return;
}
}
})
.detach();
}
/// Requests the first request in the queue be tried again, forcefully
/// recreating the handler.
fn retry(&mut self, id: u64) {
// The supplied id should always match the first request in the queue.
debug_assert!(
id == self.active_requests.front().expect("active request should be present").id
);
publish!(
self,
event::handler::Event::Retry(
self.active_requests
.front()
.expect("active request should be present")
.request
.clone(),
)
);
self.request(ActiveControllerRequest::Execute(true));
}
/// Transitions the controller for the [setting_type] to the Teardown phase
/// and removes it from the active_controllers.
async fn teardown_if_needed(&mut self) {
if !self.active_requests.is_empty()
|| self.has_active_listener
|| self.client_signature.is_none()
{
return;
}
let signature = self.client_signature.take().expect("signature should be set");
let mut controller_receptor = self
.controller_messenger_client
.message(
handler::Payload::Command(Command::ChangeState(State::Teardown)),
Audience::Messenger(signature),
)
.send();
// Wait for the teardown phase to be over before continuing.
if controller_receptor.next().await != Some(MessageEvent::Status(Status::Received)) {
fx_log_err!("Failed to tear down {:?} controller", self.setting_type);
}
// This ensures that the client event loop for the corresponding controller is
// properly stopped. Without this, the client event loop will run forever.
self.controller_messenger_factory.delete(signature);
publish!(self, event::handler::Event::Teardown);
}
}