blob: a8c90ff64affbcef54194d100d529aa8e7cc4452 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::sync::Arc;
use fidl::endpoints::{Request as FidlRequest, ServiceMarker};
use fuchsia_async as fasync;
use futures::lock::Mutex;
use crate::base::{SettingInfo, SettingType};
use crate::fidl_processor::processor::{ProcessingUnit, RequestResultCreator};
use crate::handler::base::{Error, Payload as HandlerPayload, Request as SettingRequest, Response};
use crate::hanging_get_handler::{HangingGetHandler, Sender};
use crate::message::base::Audience;
use crate::service;
use crate::ExitSender;
use std::hash::Hash;
/// Convenience macro to make a setting request and send the result to a responder.
#[macro_export]
macro_rules! request_respond {
(
$context:ident,
$responder:ident,
$setting_type:expr,
$request:expr,
$success:expr,
$error:expr,
$marker:ty $(,)?
) => {{
use ::fidl::endpoints::ServiceMarker;
use $crate::fidl_common::FidlResponseErrorLogger;
match $context.request($setting_type, $request).await {
Ok(_) => $responder.send(&mut $success),
_ => $responder.send(&mut $error),
}
.log_fidl_response_error(<$marker as ServiceMarker>::DEBUG_NAME);
}};
}
/// `RequestCallback` closures are handed a request and the surrounding
/// context. They are expected to hand back a future that returns when the
/// request is processed. The returned value is a result with an optional
/// request, containing None if not processed and the original request
/// otherwise.
pub type RequestCallback<S, T, ST, K> =
Box<dyn Fn(RequestContext<T, ST, K>, FidlRequest<S>) -> RequestResultCreator<'static, S>>;
type ChangeFunction<T> = Box<dyn Fn(&T, &T) -> bool + Send + Sync + 'static>;
/// `RequestContext` is passed to each request callback to provide resources,
/// Note that we do not directly expose the hanging get handler so that we can
/// better control its lifetime.
pub struct RequestContext<T, ST, K = String>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
service_messenger: crate::service::message::Messenger,
hanging_get_handler: Arc<Mutex<HangingGetHandler<T, ST, K>>>,
exit_tx: ExitSender,
}
impl<T, ST, K> RequestContext<T, ST, K>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
pub async fn request(&self, setting_type: SettingType, request: SettingRequest) -> Response {
let mut receptor = self
.service_messenger
.message(
service::Payload::Setting(HandlerPayload::Request(request)),
Audience::Address(service::Address::Handler(setting_type)),
)
.send();
if let Ok((service::Payload::Setting(HandlerPayload::Response(result)), _)) =
receptor.next_payload().await
{
return result;
}
Err(Error::CommunicationError)
}
pub async fn watch(&self, responder: ST, close_on_error: bool) {
let mut hanging_get_lock = self.hanging_get_handler.lock().await;
hanging_get_lock
.watch(responder, if close_on_error { Some(self.exit_tx.clone()) } else { None })
.await;
}
pub async fn watch_with_change_fn(
&self,
change_function_key: K,
change_function: ChangeFunction<T>,
responder: ST,
close_on_error: bool,
) {
let mut hanging_get_lock = self.hanging_get_handler.lock().await;
hanging_get_lock
.watch_with_change_fn(
Some(change_function_key),
change_function,
responder,
if close_on_error { Some(self.exit_tx.clone()) } else { None },
)
.await;
}
}
impl<T, ST, K> Clone for RequestContext<T, ST, K>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn clone(&self) -> RequestContext<T, ST, K> {
RequestContext {
service_messenger: self.service_messenger.clone(),
hanging_get_handler: self.hanging_get_handler.clone(),
exit_tx: self.exit_tx.clone(),
}
}
}
/// `SettingProcessingUnit` is a concrete implementation of the ProcessingUnit
/// trait, allowing a [`RequestCallback`] to participate in stream request
/// processing. The SettingProcessingUnit maintains a hanging get handler keyed
/// to the constructed type.
///
/// [`RequestCallback`]: type.RequestCallback.html
pub struct SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
callback: RequestCallback<S, T, ST, K>,
hanging_get_handler: Arc<Mutex<HangingGetHandler<T, ST, K>>>,
}
impl<S, T, ST, K> SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
pub(crate) async fn new(
setting_type: SettingType,
messenger: service::message::Messenger,
callback: RequestCallback<S, T, ST, K>,
) -> Self {
Self {
callback,
hanging_get_handler: HangingGetHandler::create(messenger.clone(), setting_type).await,
}
}
}
impl<S, T, ST, K> Drop for SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
let hanging_get_handler = self.hanging_get_handler.clone();
fasync::Task::local(async move {
hanging_get_handler.lock().await.close();
})
.detach();
}
}
impl<S, T, ST, K> ProcessingUnit<S> for SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn process(
&self,
service_messenger: crate::service::message::Messenger,
request: FidlRequest<S>,
exit_tx: ExitSender,
) -> RequestResultCreator<'static, S> {
let context = RequestContext {
service_messenger: service_messenger.clone(),
hanging_get_handler: self.hanging_get_handler.clone(),
exit_tx: exit_tx.clone(),
};
return (self.callback)(context.clone(), request);
}
}