blob: 93d6ef7d1cf29ef86812c53dd2c40a4153734175 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::base::{HasSettingType, SettingInfo, SettingType};
use crate::handler::base::{Context, ControllerGenerateResult, Request};
use crate::handler::device_storage::DeviceStorageFactory;
use crate::message::base::Audience;
use crate::payload_convert;
use crate::service::message::{MessageClient, Messenger, Signature};
use crate::service_context::ServiceContext;
use async_trait::async_trait;
use core::convert::TryFrom;
use fuchsia_async as fasync;
use fuchsia_syslog::fx_log_err;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use std::borrow::Cow;
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;
use thiserror::Error;
pub type ExitResult = Result<(), ControllerError>;
pub type SettingHandlerResult = Result<Option<SettingInfo>, ControllerError>;
/// Return type from a controller after handling a state change.
pub type ControllerStateResult = Result<(), ControllerError>;
// The types of data that can be sent to and from a setting controller.
#[derive(Clone, Debug, PartialEq)]
pub enum Payload {
// Sent to the controller to request an action is taken.
Command(Command),
// Sent from the controller adhoc to indicate an event has happened.
Event(Event),
// Sent in response to a request.
Result(SettingHandlerResult),
}
payload_convert!(Controller, Payload);
/// An command sent to the controller to take a particular action.
#[derive(Debug, Clone, PartialEq)]
pub enum Command {
HandleRequest(Request),
ChangeState(State),
}
impl TryFrom<crate::handler::setting_handler::Payload> for Command {
type Error = &'static str;
fn try_from(value: crate::handler::setting_handler::Payload) -> Result<Self, Self::Error> {
match value {
crate::handler::setting_handler::Payload::Command(command) => Ok(command),
_ => Err("wrong payload type"),
}
}
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum State {
/// State of a controller immediately after it is created. Intended
/// to initialize state on the controller.
Startup,
/// State of a controller when at least one client is listening on
/// changes to the setting state.
Listen,
/// State of a controller when there are no more clients listening
/// on changes to the setting state.
EndListen,
/// State of a controller when there are no requests or listeners on
/// the setting type. Intended to tear down state before taking down
/// the controller.
Teardown,
}
/// Events are sent from the setting handler back to the parent
/// proxy to indicate changes that happen out-of-band (happening
/// outside of response to a Command above). They indicate a
/// change in the handler that should potentially be handled by
/// the proxy.
#[derive(Clone, Debug, PartialEq)]
pub enum Event {
// Sent when the publicly perceived values of the setting
// handler have been changed.
Changed(SettingInfo),
Exited(ExitResult),
}
pub trait StorageFactory: DeviceStorageFactory + Send + Sync {}
impl<T: DeviceStorageFactory + Send + Sync> StorageFactory for T {}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum ControllerError {
#[error("Unimplemented Request:{1:?} for setting type: {0:?}")]
UnimplementedRequest(SettingType, Request),
#[error("Write failed. setting type: {0:?}")]
WriteFailure(SettingType),
#[error("Initialization failure: cause {0:?}")]
InitFailure(Cow<'static, str>),
#[error("Restoration of setting on controller startup failed: cause {0:?}")]
RestoreFailure(Cow<'static, str>),
#[error("Call to an external dependency {1:?} for setting type {0:?} failed. Request:{2:?}")]
ExternalFailure(SettingType, Cow<'static, str>, Cow<'static, str>),
#[error("Invalid input argument for setting type: {0:?} argument:{1:?} value:{2:?}")]
InvalidArgument(SettingType, Cow<'static, str>, Cow<'static, str>),
#[error(
"Incompatible argument values passed: {setting_type:?} argument:{main_arg:?} cannot be \
combined with arguments:[{other_args:?}] with respective values:[{values:?}]. {reason:?}"
)]
IncompatibleArguments {
setting_type: SettingType,
main_arg: Cow<'static, str>,
other_args: Cow<'static, str>,
values: Cow<'static, str>,
reason: Cow<'static, str>,
},
#[error("Unhandled type: {0:?}")]
UnhandledType(SettingType),
#[error("Unexpected error: {0:?}")]
UnexpectedError(Cow<'static, str>),
#[error("Undeliverable Request:{1:?} for setting type: {0:?}")]
UndeliverableError(SettingType, Request),
#[error("Unsupported request for setting type: {0:?}")]
UnsupportedError(SettingType),
#[error("Delivery error for type: {0:?} received by: {1:?}")]
DeliveryError(SettingType, SettingType),
#[error("Irrecoverable error")]
IrrecoverableError,
#[error("Timeout occurred")]
TimeoutError,
#[error("Exit occurred")]
ExitError,
}
pub type BoxedController = Box<dyn controller::Handle + Send + Sync>;
pub type BoxedControllerResult = Result<BoxedController, ControllerError>;
pub type GenerateController =
Box<dyn Fn(Arc<ClientImpl>) -> BoxFuture<'static, BoxedControllerResult> + Send + Sync>;
pub mod controller {
use super::*;
#[async_trait]
pub trait Create: Sized {
async fn create(client: Arc<ClientImpl>) -> Result<Self, ControllerError>;
}
#[async_trait]
pub trait Handle: Send {
async fn handle(&self, request: Request) -> Option<SettingHandlerResult>;
async fn change_state(&mut self, _state: State) -> Option<ControllerStateResult> {
None
}
}
}
pub struct ClientImpl {
notify: Mutex<bool>,
messenger: Messenger,
notifier_signature: Signature,
service_context: Arc<ServiceContext>,
setting_type: SettingType,
}
impl ClientImpl {
fn new(context: &Context) -> Self {
Self {
messenger: context.messenger.clone(),
setting_type: context.setting_type,
notifier_signature: context.notifier_signature.clone(),
notify: Mutex::new(false),
service_context: Arc::clone(&context.environment.service_context),
}
}
async fn process_request(
setting_type: SettingType,
controller: &BoxedController,
request: Request,
) -> SettingHandlerResult {
let result = controller.handle(request.clone()).await;
match result {
Some(response_result) => response_result,
None => Err(ControllerError::UnimplementedRequest(setting_type, request)),
}
}
pub(crate) async fn create(
mut context: Context,
generate_controller: GenerateController,
) -> ControllerGenerateResult {
let client = Arc::new(Self::new(&context));
let controller_result = generate_controller(Arc::clone(&client)).await;
if let Err(error) = controller_result {
return Err(anyhow::Error::new(error));
}
let mut controller = controller_result.unwrap();
// Process MessageHub requests
fasync::Task::spawn(async move {
while let Ok((payload, message_client)) = context.receptor.next_payload().await {
let setting_type = client.setting_type;
// Setting handlers should only expect commands
match Command::try_from(
Payload::try_from(payload).expect("should only receive handler payloads"),
)
.expect("should only receive commands")
{
// Rebroadcasting requires special handling. The handler will request the
// current value from controller and then notify the caller as if it was a
// change in value.
Command::HandleRequest(Request::Rebroadcast) => {
// Fetch the current value
let controller_reply =
Self::process_request(setting_type, &controller, Request::Get).await;
// notify proxy of value
if let Ok(Some(info)) = &controller_reply {
client.notify(Event::Changed(info.clone())).await;
}
reply(message_client, controller_reply);
}
Command::HandleRequest(request) => reply(
message_client,
Self::process_request(setting_type, &controller, request.clone()).await,
),
Command::ChangeState(state) => {
match state {
State::Startup => {
if let Some(Err(e)) = controller.change_state(state).await {
fx_log_err!(
"Failed startup phase for SettingType {:?} {}",
setting_type,
e
);
}
reply(message_client, Ok(None));
continue;
}
State::Listen => {
*client.notify.lock().await = true;
}
State::EndListen => {
*client.notify.lock().await = false;
}
State::Teardown => {
if let Some(Err(e)) = controller.change_state(state).await {
fx_log_err!(
"Failed teardown phase for SettingType {:?} {}",
setting_type,
e
);
}
reply(message_client, Ok(None));
continue;
}
}
controller.change_state(state).await;
}
}
}
})
.detach();
Ok(())
}
pub(crate) fn get_service_context(&self) -> Arc<ServiceContext> {
Arc::clone(&self.service_context)
}
pub(crate) async fn notify(&self, event: Event) {
let notify = self.notify.lock().await;
if *notify {
self.messenger
.message(Payload::Event(event).into(), Audience::Messenger(self.notifier_signature))
.send();
}
}
}
pub struct Handler<C: controller::Create + controller::Handle + Send + Sync + 'static> {
_data: PhantomData<C>,
}
impl<C: controller::Create + controller::Handle + Send + Sync + 'static> Handler<C> {
pub fn spawn(context: Context) -> BoxFuture<'static, ControllerGenerateResult> {
Box::pin(async move {
ClientImpl::create(
context,
Box::new(|proxy| {
Box::pin(async move {
let controller_result = C::create(proxy).await;
match controller_result {
Err(err) => Err(err),
Ok(controller) => Ok(Box::new(controller) as BoxedController),
}
})
}),
)
.await
})
}
}
/// `IntoHandlerResult` helps with converting a value into the result of a setting request.
pub trait IntoHandlerResult {
/// Converts `Self` into a `SettingHandlerResult` for use in a `Controller`.
fn into_handler_result(self) -> SettingHandlerResult;
}
impl IntoHandlerResult for SettingInfo {
fn into_handler_result(self) -> SettingHandlerResult {
Ok(Some(self))
}
}
pub mod persist {
use super::ClientImpl as BaseProxy;
use super::*;
use crate::base::SettingInfo;
use crate::handler::device_storage::DeviceStorageConvertible;
use crate::message::base::{Audience, MessageEvent};
use crate::service;
use crate::storage;
use futures::StreamExt;
pub trait Storage: DeviceStorageConvertible + Into<SettingInfo> + Send + Sync {}
impl<T: DeviceStorageConvertible + Into<SettingInfo> + Send + Sync> Storage for T {}
#[derive(PartialEq, Clone, Debug)]
/// Enum for describing whether writing affected persistent value.
pub enum UpdateState {
Unchanged,
Updated,
}
pub mod controller {
use super::ClientProxy;
use super::*;
#[async_trait]
pub trait Create: Sized {
async fn create(handler: ClientProxy) -> Result<Self, ControllerError>;
}
}
pub struct ClientProxy {
base: Arc<BaseProxy>,
setting_type: SettingType,
}
impl Clone for ClientProxy {
fn clone(&self) -> Self {
Self { base: Arc::clone(&self.base), setting_type: self.setting_type }
}
}
impl ClientProxy {
pub async fn new(base_proxy: Arc<BaseProxy>, setting_type: SettingType) -> Self {
Self { base: base_proxy, setting_type }
}
pub fn get_service_context(&self) -> Arc<ServiceContext> {
self.base.get_service_context()
}
pub async fn get_messenger(&self) -> Messenger {
self.base.messenger.clone()
}
pub async fn notify(&self, event: Event) {
self.base.notify(event).await;
}
pub async fn read_setting_info<T: HasSettingType>(&self) -> SettingInfo {
let mut receptor = self
.base
.messenger
.message(
storage::Payload::Request(storage::StorageRequest::Read(T::SETTING_TYPE))
.into(),
Audience::Address(service::Address::Storage),
)
.send();
while let Ok((payload, _)) = receptor.next_payload().await {
if let service::Payload::Storage(storage::Payload::Response(
storage::StorageResponse::Read(setting_info),
)) = payload
{
return setting_info;
} else {
panic!("Incorrect response received from storage: {:?}", payload);
}
}
panic!("Did not get a read response");
}
pub async fn read_setting<T: HasSettingType + TryFrom<SettingInfo>>(&self) -> T {
let setting_info = self.read_setting_info::<T>().await;
if let Ok(info) = setting_info.clone().try_into() {
info
} else {
panic!(
"Mismatching type during read. Expected {:?}, but got {:?}",
T::SETTING_TYPE,
setting_info
);
}
}
/// The argument `write_through` will block returning until the value has been completely
/// written to persistent store, rather than any temporary in-memory caching.
pub async fn write_setting(
&self,
setting_info: SettingInfo,
write_through: bool,
) -> Result<UpdateState, ControllerError> {
let setting_type = (&setting_info).into();
let mut receptor = self
.base
.messenger
.message(
storage::Payload::Request(storage::StorageRequest::Write(
setting_info.clone(),
write_through,
))
.into(),
Audience::Address(service::Address::Storage),
)
.send();
while let Some(response) = receptor.next().await {
if let MessageEvent::Message(
service::Payload::Storage(storage::Payload::Response(
storage::StorageResponse::Write(result),
)),
_,
) = response
{
if let Ok(UpdateState::Updated) = result {
self.notify(Event::Changed(setting_info)).await;
}
return result.map_err(|e| {
fx_log_err!("Failed to write setting: {:?}", e);
ControllerError::WriteFailure(setting_type)
});
}
}
panic!("Did not get a write response");
}
}
/// A trait for interpreting a `Result` into whether a notification occurred
/// and converting the `Result` into a `SettingHandlerResult`.
pub trait WriteResult: IntoHandlerResult {
/// Indicates whether a notification occurred as a result of the write.
fn notified(&self) -> bool;
}
impl WriteResult for Result<UpdateState, ControllerError> {
fn notified(&self) -> bool {
self.as_ref().map_or(false, |update_state| UpdateState::Updated == *update_state)
}
}
impl IntoHandlerResult for Result<UpdateState, ControllerError> {
fn into_handler_result(self) -> SettingHandlerResult {
self.map(|_| None)
}
}
pub struct Handler<C: controller::Create + super::controller::Handle + Send + Sync + 'static> {
_data: PhantomData<C>,
}
impl<C: controller::Create + super::controller::Handle + Send + Sync + 'static> Handler<C> {
pub fn spawn(context: Context) -> BoxFuture<'static, ControllerGenerateResult> {
Box::pin(async move {
let setting_type = context.setting_type;
ClientImpl::create(
context,
Box::new(move |proxy| {
Box::pin(async move {
let proxy = ClientProxy::new(proxy, setting_type).await;
let controller_result = C::create(proxy).await;
match controller_result {
Err(err) => Err(err),
Ok(controller) => Ok(Box::new(controller) as BoxedController),
}
})
}),
)
.await
})
}
}
}
pub fn reply(client: MessageClient, result: SettingHandlerResult) {
client.reply(Payload::Result(result).into()).send().ack();
}