blob: dc4ee5acd879e1bd663822efc63f701a7736eedf [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::sync::Arc;
use anyhow::Error;
use fidl::endpoints::{Request, ServiceMarker};
use fuchsia_async as fasync;
use futures::future::LocalBoxFuture;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt, TryStreamExt};
use crate::internal::switchboard;
use crate::message::base::Audience;
use crate::switchboard::base::{
SettingRequest, SettingResponse, SettingResponseResult, SettingType, SwitchboardError,
};
use crate::switchboard::hanging_get_handler::{HangingGetHandler, Sender};
use crate::ExitSender;
use std::hash::Hash;
pub type RequestResultCreator<'a, S> = LocalBoxFuture<'a, Result<Option<Request<S>>, Error>>;
type ChangeFunction<T> = Box<dyn Fn(&T, &T) -> bool + Send + Sync + 'static>;
/// The RequestContext is passed to each request callback to provide resources,
/// such as the switchboard and hanging get functionality. Note that we do not
/// directly expose the hanging get handler so that we can better control its
/// lifetime.
pub struct RequestContext<T, ST, K = String>
where
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
switchboard_messenger: switchboard::message::Messenger,
hanging_get_handler: Arc<Mutex<HangingGetHandler<T, ST, K>>>,
exit_tx: ExitSender,
}
impl<T, ST, K> RequestContext<T, ST, K>
where
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
pub async fn request(
&self,
setting_type: SettingType,
request: SettingRequest,
) -> SettingResponseResult {
let mut receptor = self
.switchboard_messenger
.message(
switchboard::Payload::Action(switchboard::Action::Request(setting_type, request)),
Audience::Address(switchboard::Address::Switchboard),
)
.send();
if let Ok((switchboard::Payload::Action(switchboard::Action::Response(result)), _)) =
receptor.next_payload().await
{
return result;
}
Err(SwitchboardError::CommunicationError)
}
pub async fn watch(&self, responder: ST, close_on_error: bool) {
let mut hanging_get_lock = self.hanging_get_handler.lock().await;
hanging_get_lock
.watch(responder, if close_on_error { Some(self.exit_tx.clone()) } else { None })
.await;
}
pub async fn watch_with_change_fn(
&self,
change_function_key: K,
change_function: ChangeFunction<T>,
responder: ST,
close_on_error: bool,
) {
let mut hanging_get_lock = self.hanging_get_handler.lock().await;
hanging_get_lock
.watch_with_change_fn(
Some(change_function_key),
change_function,
responder,
if close_on_error { Some(self.exit_tx.clone()) } else { None },
)
.await;
}
}
#[macro_export]
macro_rules! request_respond {
($context:ident, $responder:ident, $setting_type:expr, $request:expr, $success:expr, $error:expr, $marker:expr) => {
match $context.request($setting_type, $request).await {
Ok(_) => $responder.send(&mut $success),
_ => $responder.send(&mut $error),
}
.log_fidl_response_error($marker);
};
}
impl<T, ST, K> Clone for RequestContext<T, ST, K>
where
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn clone(&self) -> RequestContext<T, ST, K> {
RequestContext {
switchboard_messenger: self.switchboard_messenger.clone(),
hanging_get_handler: self.hanging_get_handler.clone(),
exit_tx: self.exit_tx.clone(),
}
}
}
/// RequestCallback closures are handed a request and the surrounding
/// context. They are expected to hand back a future that returns when the
/// request is processed. The returned value is a result with an optional
/// request, containing None if not processed and the original request
/// otherwise.
pub type RequestCallback<S, T, ST, K> =
Box<dyn Fn(RequestContext<T, ST, K>, Request<S>) -> RequestResultCreator<'static, S>>;
pub type RequestStream<S> = <S as ServiceMarker>::RequestStream;
/// A processing unit is an entity that is able to process a stream request and
/// indicate whether the request was consumed.
trait ProcessingUnit<S>
where
S: ServiceMarker,
{
fn process(
&self,
switchboard_messenger: switchboard::message::Messenger,
request: Request<S>,
exit_tx: ExitSender,
) -> RequestResultCreator<'static, S>;
}
/// SettingProcessingUnit is a concrete implementation of the ProcessingUnit
/// trait, allowing a RequestCallback to participate in stream request
/// processing. The SettingProcessingUnit maintains a hanging get handler keyed
/// to the constructed type.
struct SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
callback: RequestCallback<S, T, ST, K>,
hanging_get_handler: Arc<Mutex<HangingGetHandler<T, ST, K>>>,
}
impl<S, T, ST, K> SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
async fn new(
setting_type: SettingType,
switchboard_messenger: switchboard::message::Messenger,
callback: RequestCallback<S, T, ST, K>,
) -> Self {
Self {
callback: callback,
hanging_get_handler: HangingGetHandler::create(
switchboard_messenger.clone(),
setting_type,
)
.await,
}
}
}
impl<S, T, ST, K> Drop for SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
let hanging_get_handler = self.hanging_get_handler.clone();
fasync::Task::local(async move {
hanging_get_handler.lock().await.close();
})
.detach();
}
}
impl<S, T, ST, K> ProcessingUnit<S> for SettingProcessingUnit<S, T, ST, K>
where
S: ServiceMarker,
T: From<SettingResponse> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn process(
&self,
switchboard_messenger: switchboard::message::Messenger,
request: Request<S>,
exit_tx: ExitSender,
) -> RequestResultCreator<'static, S> {
let context = RequestContext {
switchboard_messenger: switchboard_messenger.clone(),
hanging_get_handler: self.hanging_get_handler.clone(),
exit_tx: exit_tx.clone(),
};
return (self.callback)(context.clone(), request);
}
}
/// The FidlProcessor delegates request processing across a number of processing
/// units. There should be a single FidlProcessor per stream.
pub struct FidlProcessor<S>
where
S: ServiceMarker,
{
request_stream: RequestStream<S>,
switchboard_messenger: switchboard::message::Messenger,
processing_units: Vec<Box<dyn ProcessingUnit<S>>>,
}
impl<S> FidlProcessor<S>
where
S: ServiceMarker,
{
pub async fn new(
stream: RequestStream<S>,
switchboard_messenger: switchboard::message::Messenger,
) -> Self {
Self { request_stream: stream, switchboard_messenger, processing_units: Vec::new() }
}
pub async fn register<V, SV, K>(
&mut self,
setting_type: SettingType,
callback: RequestCallback<S, V, SV, K>,
) where
V: From<SettingResponse> + Send + Sync + 'static,
SV: Sender<V> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
let processing_unit = Box::new(
SettingProcessingUnit::<S, V, SV, K>::new(
setting_type,
self.switchboard_messenger.clone(),
callback,
)
.await,
);
self.processing_units.push(processing_unit);
}
// Process the stream. Note that we pass in the processor here as it cannot
// be used again afterwards.
pub async fn process(mut self) {
let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
loop {
// Note that we create a fuse outside the select! to prevent it from
// being called from outside the select! macro.
let fused_stream = self.request_stream.try_next().fuse();
futures::pin_mut!(fused_stream);
futures::select! {
request = fused_stream => {
if let Ok(Some(mut req)) = request {
for processing_unit in &self.processing_units {
// If the processing unit consumes the request (an empty
// result is returned) or an error occurs, exit processing this
// request. Otherwise, hand the request to the next processing
// unit
match processing_unit.process(
self.switchboard_messenger.clone(),
req, exit_tx.clone()).await {
Ok(Some(return_request)) => {
req = return_request;
}
_ => {
break
}
}
}
} else {
return;
}
}
exit = exit_rx.next() => {
return;
}
}
}
}
}