blob: 2bc0fdc06a0f2b1b0298b916fcf3a41fc8c61f58 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// TODO(fxb/68069): Add module documentation describing Setting Proxy's role in
// setting handling.
use crate::handler::base::{
Error as HandlerError, Payload as HandlerPayload, Request as HandlerRequest,
SettingHandlerFactory,
};
use crate::handler::setting_handler::Command;
use crate::handler::setting_handler::{
ControllerError, Event, ExitResult, SettingHandlerResult, State,
};
use crate::message::action_fuse::ActionFuseBuilder;
use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::Error;
use fuchsia_async as fasync;
use fuchsia_syslog::fx_log_err;
use futures::channel::mpsc::UnboundedSender;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt};
use crate::base::{SettingInfo, SettingType};
use crate::event;
use crate::handler::base::{Payload, Request};
use crate::handler::setting_handler;
use crate::message::base::{Audience, MessageEvent, MessengerType, Status};
use crate::service;
use fuchsia_zircon::Duration;
/// An enumeration of the different client types that provide requests to
/// setting handlers.
#[derive(Clone, Debug)]
enum Client {
/// A client from the Unified (service) MessageHub
Service(service::message::MessageClient),
}
/// A container for associating a Handler Request with a given [`Client`].
#[derive(Clone, Debug)]
struct RequestInfo {
setting_request: Request,
client: Client,
// This identifier is unique within each setting proxy to identify a
// request. This can be used for removing a particular RequestInfo within a
// set, such as the active change listeners.
id: usize,
}
impl PartialEq for RequestInfo {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl RequestInfo {
/// Sends the supplied result as a reply with the associated [`Client`].
pub fn reply(&self, result: SettingHandlerResult) {
match &self.client {
Client::Service(client) => {
// TODO(fxbug.dev/70985): return HandlerErrors directly
client
.reply(HandlerPayload::Response(result.map_err(HandlerError::from)).into())
.send();
}
}
}
/// Sends an acknowledge message back through the reply client. This used in
/// long running requests (such a listen) where acknowledge message ensures
/// the client the request was processed.
async fn acknowledge(&mut self) {
match &mut self.client {
Client::Service(client) => {
client.acknowledge().await;
}
}
}
/// Adds a closure that will be triggered when the recipient for a response
/// to the request goes out of scope. This allows for the message handler to
/// know when the recipient is no longer valid.
async fn bind_to_scope(&mut self, trigger_fn: Box<dyn FnOnce(RequestInfo) + Sync + Send>) {
let request = self.clone();
let fuse = ActionFuseBuilder::new()
.add_action(Box::new(move || {
(trigger_fn)(request);
}))
.build();
match &mut self.client {
Client::Service(client) => {
client.bind_to_recipient(fuse).await;
}
}
}
}
#[derive(Clone, Debug)]
struct ActiveRequest {
request: RequestInfo,
// The number of attempts that have been made on this request.
attempts: u64,
last_result: Option<SettingHandlerResult>,
}
impl ActiveRequest {
pub fn get_request(&self) -> Request {
self.request.setting_request.clone()
}
pub fn get_info(&mut self) -> &mut RequestInfo {
&mut self.request
}
}
#[derive(Clone, Debug)]
enum ProxyRequest {
/// Adds a request to the pending request queue.
Add(RequestInfo),
/// Executes the next pending request, recreating the handler if the
/// argument is set to true.
Execute(bool),
/// Evaluates supplied the result for the active request.
HandleResult(SettingHandlerResult),
/// Request to remove the active request.
RemoveActive,
/// Request to remove listen request.
EndListen(RequestInfo),
/// Starts a timeout for resources be torn down. Called when there are no
/// more requests to process.
TeardownTimeout,
/// Request for resources to be torn down.
Teardown,
/// Request to retry the active request.
Retry,
/// Requests listen
Listen(ListenEvent),
}
#[derive(Clone, Debug, PartialEq)]
enum ListenEvent {
Restart,
}
pub struct SettingProxy {
setting_type: SettingType,
client_signature: Option<service::message::Signature>,
active_request: Option<ActiveRequest>,
pending_requests: VecDeque<RequestInfo>,
listen_requests: Vec<RequestInfo>,
next_request_id: usize,
/// Factory for generating a new controller to service requests.
handler_factory: Arc<Mutex<dyn SettingHandlerFactory + Send + Sync>>,
/// Messenger factory for communication with service components.
messenger_factory: service::message::Factory,
/// Messenger to send messages to controllers.
messenger: service::message::Messenger,
/// Signature for messages from controllers to be direct towards.
signature: service::message::Signature,
/// Client for communicating events.
event_publisher: event::Publisher,
/// Sender for passing messages about the active requests and controllers.
proxy_request_sender: UnboundedSender<ProxyRequest>,
max_attempts: u64,
teardown_timeout: Duration,
request_timeout: Option<Duration>,
retry_on_timeout: bool,
teardown_cancellation: Option<futures::channel::oneshot::Sender<()>>,
}
/// Publishes an event to the event_publisher.
macro_rules! publish {
($self:ident, $event:expr) => {
$self.event_publisher.send_event(event::Event::Handler($self.setting_type, $event));
};
}
impl SettingProxy {
/// Creates a SettingProxy that is listening to requests from the
/// provided receiver and will send responses/updates on the given sender.
pub async fn create(
setting_type: SettingType,
handler_factory: Arc<Mutex<dyn SettingHandlerFactory + Send + Sync>>,
messenger_factory: service::message::Factory,
max_attempts: u64,
teardown_timeout: Duration,
request_timeout: Option<Duration>,
retry_on_timeout: bool,
) -> Result<service::message::Signature, Error> {
let (messenger, receptor) = messenger_factory
.create(MessengerType::Addressable(service::Address::Handler(setting_type)))
.await
.map_err(Error::new)?;
let service_signature = receptor.get_signature();
// TODO(fxbug.dev/67536): Remove receptors below as their logic is
// migrated to the MessageHub defined above.
let event_publisher = event::Publisher::create(
&messenger_factory,
MessengerType::Addressable(service::Address::EventSource(
event::Address::SettingProxy(setting_type),
)),
)
.await;
let (proxy_request_sender, proxy_request_receiver) =
futures::channel::mpsc::unbounded::<ProxyRequest>();
// We must create handle here rather than return back the value as we
// reference the proxy in the async tasks below.
let mut proxy = Self {
setting_type,
handler_factory,
next_request_id: 0,
client_signature: None,
active_request: None,
pending_requests: VecDeque::new(),
listen_requests: Vec::new(),
messenger_factory,
messenger,
signature: service_signature,
event_publisher: event_publisher,
proxy_request_sender,
max_attempts,
teardown_timeout,
request_timeout: request_timeout,
retry_on_timeout,
teardown_cancellation: None,
};
// Main task loop for receiving and processing incoming messages.
fasync::Task::spawn(async move {
let receptor_fuse = receptor.fuse();
let proxy_fuse = proxy_request_receiver.fuse();
futures::pin_mut!(receptor_fuse, proxy_fuse);
loop {
futures::select! {
// Handles requests from the service MessageHub and
// communication from the setting controller.
event = receptor_fuse.select_next_some() => {
proxy.process_service_event(event).await;
}
// Handles messages for enqueueing requests and processing
// results on the main event loop for proxy.
request = proxy_fuse.select_next_some() => {
proxy.process_proxy_request(request).await;
}
}
}
})
.detach();
Ok(service_signature)
}
async fn process_service_event(&mut self, event: service::message::MessageEvent) {
if let MessageEvent::Message(payload, client) = event {
match payload {
service::Payload::Setting(Payload::Request(request)) => {
self.process_service_request(request, client).await;
}
service::Payload::Controller(setting_handler::Payload::Event(event)) => {
// Messages received after the client signature
// has been changed will be ignored.
if Some(client.get_author()) != self.client_signature {
return;
}
match event {
Event::Changed(setting_info) => {
self.notify(setting_info);
}
Event::Exited(result) => {
self.process_exit(result);
}
}
}
_ => {
panic!("Unexpected message");
}
}
}
}
async fn process_proxy_request(&mut self, request: ProxyRequest) {
match request {
ProxyRequest::Add(request) => {
self.add_request(request);
}
ProxyRequest::Execute(recreate_handler) => {
self.execute_next_request(recreate_handler).await;
}
ProxyRequest::RemoveActive => {
self.remove_active_request();
}
ProxyRequest::TeardownTimeout => {
self.start_teardown_timeout().await;
}
ProxyRequest::Teardown => self.teardown_if_needed().await,
ProxyRequest::Retry => {
self.retry();
}
ProxyRequest::HandleResult(result) => {
self.handle_result(result);
}
ProxyRequest::Listen(event) => {
self.handle_listen(event).await;
}
ProxyRequest::EndListen(request_info) => {
self.handle_end_listen(request_info).await;
}
}
}
async fn process_service_request(
&mut self,
request: HandlerRequest,
message_client: service::message::MessageClient,
) {
let id = self.next_request_id;
self.next_request_id += 1;
self.process_request(RequestInfo {
setting_request: request,
id,
client: Client::Service(message_client),
})
.await;
}
async fn get_handler_signature(
&mut self,
force_create: bool,
) -> Option<service::message::Signature> {
if force_create || self.client_signature.is_none() {
self.client_signature = self
.handler_factory
.lock()
.await
.generate(self.setting_type, self.messenger_factory.clone(), self.signature.clone())
.await
.map_or(None, Some);
}
self.client_signature
}
/// Returns whether there is an active listener across the various
/// listening clients.
fn is_listening(&self) -> bool {
!self.listen_requests.is_empty()
}
/// Informs the Switchboard when the controller has indicated the setting
/// has changed.
fn notify(&self, setting_info: SettingInfo) {
if !self.is_listening() {
return;
}
// Notify each listener on the service MessageHub.
for request in &self.listen_requests {
request.reply(Ok(Some(setting_info.clone())));
}
}
fn process_exit(&mut self, result: ExitResult) {
// Log the exit
self.event_publisher.send_event(event::Event::Handler(
self.setting_type,
event::handler::Event::Exit(result),
));
// Clear the setting handler client signature
self.client_signature = None;
// If there is an active request, process the error
if self.active_request.is_some() {
self.proxy_request_sender
.unbounded_send(ProxyRequest::HandleResult(Err(ControllerError::ExitError)))
.ok();
}
// If there is an active listener, forefully refetch
if self.is_listening() {
self.request(ProxyRequest::Listen(ListenEvent::Restart));
}
}
/// Ensures we first have an active controller (spun up by
/// get_handler_signature if not already active) before adding the request
/// to the proxy's queue.
async fn process_request(&mut self, request: RequestInfo) {
match self.get_handler_signature(false).await {
None => {
request.reply(Err(ControllerError::UnhandledType(self.setting_type)));
}
Some(_) => {
self.request(ProxyRequest::Add(request));
}
}
}
/// Adds a request to the request queue for this setting.
///
/// If this is the first request in the queue, processing will begin immediately.
///
/// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
fn add_request(&mut self, request: RequestInfo) {
if let Some(teardown_cancellation) = self.teardown_cancellation.take() {
let _ = teardown_cancellation.send(());
}
self.pending_requests.push_back(request);
// If this is the first request (no active request or pending requests),
// request the controller begin execution of requests. Otherwise, the
// controller is already executing requests and will eventually process
// this new request.
if self.pending_requests.len() == 1 && self.active_request.is_none() {
self.request(ProxyRequest::Execute(false));
}
}
/// Sends a request to be processed by the proxy. Requests are sent as
/// messages and marshalled onto a single event loop to ensure proper
/// ordering.
fn request(&self, request: ProxyRequest) {
self.proxy_request_sender.unbounded_send(request).ok();
}
/// Sends an update to the controller about whether or not it should be
/// listening.
///
/// # Arguments
///
/// * `force_recreate_controller` - a bool representing whether the
/// controller should be recreated regardless if it is currently running.
async fn send_listen_update(&mut self, force_recreate_controller: bool) {
let optional_handler_signature =
self.get_handler_signature(force_recreate_controller).await;
if optional_handler_signature.is_none() {
return;
}
let handler_signature =
optional_handler_signature.expect("handler signature should be present");
self.messenger
.message(
setting_handler::Payload::Command(Command::ChangeState(if self.is_listening() {
State::Listen
} else {
State::EndListen
}))
.into(),
Audience::Messenger(handler_signature),
)
.send()
.ack();
self.request(ProxyRequest::TeardownTimeout);
}
// TODO(fxbug.dev/67536): Remove this method once no more communication
// happens over the core MessageHub.
/// Notifies handler in the case the notification listener count is
/// non-zero and we aren't already listening for changes or there
/// are no more listeners and we are actively listening.
async fn handle_listen(&mut self, event: ListenEvent) {
self.send_listen_update(ListenEvent::Restart == event).await;
}
/// Notifies handler in the case the notification listener count is
/// non-zero and we aren't already listening for changes or there
/// are no more listeners and we are actively listening.
async fn handle_end_listen(&mut self, request: RequestInfo) {
let was_listening = self.is_listening();
if let Some(pos) = self.listen_requests.iter().position(|target| *target == request) {
self.listen_requests.remove(pos);
} else {
return;
}
if was_listening != self.is_listening() {
self.send_listen_update(false).await;
}
}
/// Evaluates the supplied result for the current active request. Based
/// on the return result, also determines whether the request should be
/// retried. Based on this determination, the function will request from
/// the proxy whether to retry the request or remove the request (and send
/// response).
fn handle_result(&mut self, mut result: SettingHandlerResult) {
let active_request = self.active_request.as_mut().expect("request should be present");
let mut retry = false;
if matches!(result, Err(ControllerError::ExternalFailure(..)))
|| matches!(result, Err(ControllerError::ExitError))
{
result = Err(ControllerError::IrrecoverableError);
retry = true;
} else if matches!(result, Err(ControllerError::TimeoutError)) {
publish!(
self,
event::handler::Event::Request(
event::handler::Action::Timeout,
active_request.get_request()
)
);
retry = self.retry_on_timeout;
}
active_request.last_result = Some(result);
if retry {
self.request(ProxyRequest::Retry);
} else {
self.request(ProxyRequest::RemoveActive);
}
}
/// Removes the active request for this setting.
///
/// Should only be called once a request is finished processing.
///
/// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
fn remove_active_request(&mut self) {
let mut removed_request = self.active_request.take().expect("request should be present");
// Send result back to original caller if present.
if let Some(result) = removed_request.last_result.take() {
removed_request.request.reply(result)
}
// If there are still requests to process, then request for the next to
// be processed. Otherwise request teardown.
if !self.pending_requests.is_empty() {
self.request(ProxyRequest::Execute(false));
} else {
self.request(ProxyRequest::TeardownTimeout);
}
}
/// Processes the next request in the queue of pending requests.
///
/// If the queue is empty, nothing happens.
///
/// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
async fn execute_next_request(&mut self, recreate_handler: bool) {
if self.active_request.is_none() {
// Add the request to the queue of requests to process.
self.active_request = Some(ActiveRequest {
request: self
.pending_requests
.pop_front()
.expect("execute should only be called with present requests"),
attempts: 0,
last_result: None,
});
}
// Recreating signature is always honored, even if the request is not.
let signature = self
.get_handler_signature(recreate_handler)
.await
.expect("failed to generate handler signature");
// since we borrow self as mutable for active_request, we must retrieve
// the listening state (which borrows immutable) before.
let was_listening = self.is_listening();
let active_request =
self.active_request.as_mut().expect("active request should be present");
active_request.attempts += 1;
// Note that we must copy these values as we are borrowing self for
// active_requests and self is needed to remove active below.
let request = active_request.get_request();
if matches!(request, Request::Listen) {
let info = active_request.get_info();
// Add a callback when the client side goes out of scope
let proxy_request_sender = self.proxy_request_sender.clone();
info.bind_to_scope(Box::new(move |request_info| {
proxy_request_sender.unbounded_send(ProxyRequest::EndListen(request_info)).ok();
}))
.await;
// Add the request to tracked listen requests.
self.listen_requests.push(info.clone());
// Listening requests must be acknowledged as they are long-living.
info.acknowledge().await;
// If listening state has changed, update state.
if was_listening != self.is_listening() {
self.send_listen_update(false).await;
}
// Request the active request be removed as it is now tracked
// elsewhere.
self.request(ProxyRequest::RemoveActive);
return;
}
// If we have exceeded the maximum number of attempts, remove this
// request from the queue.
if active_request.attempts > self.max_attempts {
publish!(
self,
event::handler::Event::Request(
event::handler::Action::AttemptsExceeded,
request.clone()
)
);
self.request(ProxyRequest::RemoveActive);
return;
}
publish!(
self,
event::handler::Event::Request(event::handler::Action::Execute, request.clone())
);
let mut receptor = self
.messenger
.message(
setting_handler::Payload::Command(Command::HandleRequest(request.clone())).into(),
Audience::Messenger(signature),
)
.set_timeout(self.request_timeout)
.send();
let proxy_request_sender_clone = self.proxy_request_sender.clone();
fasync::Task::spawn(async move {
while let Some(message_event) = receptor.next().await {
let handler_result = match message_event {
MessageEvent::Message(
service::Payload::Controller(setting_handler::Payload::Result(result)),
_,
) => Some(result),
MessageEvent::Status(Status::Undeliverable) => {
Some(Err(ControllerError::IrrecoverableError))
}
MessageEvent::Status(Status::Timeout) => {
Some(Err(ControllerError::TimeoutError))
}
_ => None,
};
if let Some(result) = handler_result {
// Mark the request as having been handled after retries have been
// attempted and the client has been notified.
proxy_request_sender_clone
.unbounded_send(ProxyRequest::HandleResult(result))
.ok();
return;
}
}
})
.detach();
}
/// Requests the active request to be tried again, forcefully recreating the
/// handler.
fn retry(&mut self) {
publish!(
self,
event::handler::Event::Request(
event::handler::Action::Retry,
self.active_request
.as_ref()
.expect("active request should be present")
.get_request(),
)
);
self.request(ProxyRequest::Execute(true));
}
fn has_active_work(&self) -> bool {
self.active_request.is_some()
|| !self.pending_requests.is_empty()
|| self.is_listening()
|| self.client_signature.is_none()
}
async fn start_teardown_timeout(&mut self) {
if self.has_active_work() {
return;
}
let (cancellation_tx, cancellation_rx) = futures::channel::oneshot::channel();
if self.teardown_cancellation.is_some() {
// Do not overwrite the cancellation. We do not want to extend it if it's already
// counting down.
return;
}
self.teardown_cancellation = Some(cancellation_tx);
let sender = self.proxy_request_sender.clone();
let teardown_timeout = self.teardown_timeout;
fasync::Task::spawn(async move {
let timeout = fuchsia_async::Timer::new(crate::clock::now() + teardown_timeout).fuse();
futures::pin_mut!(cancellation_rx, timeout);
futures::select! {
_ = cancellation_rx => {
// Exit the loop and do not send teardown message when cancellation received.
return;
}
_ = timeout => {}, // no-op
}
sender.unbounded_send(ProxyRequest::Teardown).ok();
})
.detach();
}
/// Transitions the controller for the [setting_type] to the Teardown phase
/// and removes it from the active_controllers.
async fn teardown_if_needed(&mut self) {
if self.has_active_work() {
return;
}
let signature = self.client_signature.take().expect("signature should be set");
let mut controller_receptor = self
.messenger
.message(
setting_handler::Payload::Command(Command::ChangeState(State::Teardown)).into(),
Audience::Messenger(signature),
)
.send();
// Wait for the teardown phase to be over before continuing.
if controller_receptor.next().await != Some(MessageEvent::Status(Status::Received)) {
fx_log_err!("Failed to tear down {:?} controller", self.setting_type);
}
// This ensures that the client event loop for the corresponding controller is
// properly stopped. Without this, the client event loop will run forever.
self.messenger_factory.delete(signature);
publish!(self, event::handler::Event::Teardown);
}
}