blob: 68bc2827396cb1f9dc5c94e0cb6905eea86ef268 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::Error;
use fidl::endpoints::{Request, ServiceMarker};
use futures::future::LocalBoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use crate::base::{SettingInfo, SettingType};
use crate::fidl_processor::policy::{
PolicyProcessingUnit, RequestCallback as PolicyRequestCallback,
};
use crate::fidl_processor::settings::{
RequestCallback as SettingsRequestCallback, SettingProcessingUnit,
};
use crate::hanging_get_handler::Sender;
use crate::service;
use crate::ExitSender;
use std::hash::Hash;
pub type RequestResultCreator<'a, S> = LocalBoxFuture<'a, Result<Option<Request<S>>, Error>>;
pub type RequestStream<S> = <S as ServiceMarker>::RequestStream;
/// `ProcessingUnit` is an entity that is able to process a stream request and indicate whether the
/// request was consumed.
pub trait ProcessingUnit<S>
where
S: ServiceMarker,
{
/// Processes a request provided by the fidl processor.
///
/// If the request is not processed, this method should return the original `request` that was
/// passed in. If the request was successfully processed, `None` should be returned. If an error
/// was encountered, an appropriate error should be returned.
///
/// Parameters:
/// `messenger`: a message hub connection that can be used to send and receive messages
/// `service_messenger`: a MessageHub messenger to send messages.
/// `request`: the request to process
/// `exit_tx`: a channel to indicate that the connection is closed and for processing to stop
fn process(
&self,
service_messenger: service::message::Messenger,
request: Request<S>,
exit_tx: ExitSender,
) -> RequestResultCreator<'static, S>;
}
/// `BaseFidlProcessor` delegates request processing for setting requests across a number of
/// processing units. There should be a single FidlProcessor per stream.
pub struct BaseFidlProcessor<S>
where
S: ServiceMarker,
{
request_stream: RequestStream<S>,
service_messenger: service::message::Messenger,
processing_units: Vec<Box<dyn ProcessingUnit<S>>>,
}
impl<S> BaseFidlProcessor<S>
where
S: ServiceMarker,
{
pub fn new(
request_stream: RequestStream<S>,
service_messenger: service::message::Messenger,
) -> Self {
Self { request_stream, service_messenger, processing_units: Vec::new() }
}
#[cfg(test)]
pub(crate) fn with_processing_units(
request_stream: RequestStream<S>,
service_messenger: service::message::Messenger,
processing_units: Vec<Box<dyn ProcessingUnit<S>>>,
) -> Self {
Self { request_stream, service_messenger, processing_units }
}
// Process the stream. Note that we pass in the processor here as it cannot
// be used again afterwards.
pub async fn process(mut self) {
let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
loop {
// Note that we create a fuse outside the select! to prevent it from
// being called from outside the select! macro.
let fused_stream = self.request_stream.try_next().fuse();
futures::pin_mut!(fused_stream);
futures::select! {
request = fused_stream => {
if let Ok(Some(mut req)) = request {
for processing_unit in &self.processing_units {
// If the processing unit consumes the request (an empty
// result is returned) or an error occurs, exit processing this
// request. Otherwise, hand the request to the next processing
// unit
match processing_unit.process(
self.service_messenger.clone(),
req, exit_tx.clone()).await {
Ok(Some(return_request)) => {
req = return_request;
}
_ => {
break
}
}
}
} else {
return;
}
}
_ = exit_rx.next() => {
return;
}
}
}
}
}
/// Wraps [`BaseFidlProcessor`] for use with FIDL APIs in the fuchsia.settings namespace that send
/// and receive messages through the service MessageHub.
///
/// [`BaseFidlProcessor`]: struct.BaseFidlProcessor.html
pub struct SettingsFidlProcessor<S>
where
S: ServiceMarker,
{
base_processor: BaseFidlProcessor<S>,
}
impl<S> SettingsFidlProcessor<S>
where
S: ServiceMarker,
{
pub async fn new(
stream: RequestStream<S>,
service_messenger: service::message::Messenger,
) -> Self {
Self { base_processor: BaseFidlProcessor::new(stream, service_messenger) }
}
/// Registers a fidl processing unit for setting requests.
pub async fn register<V, SV, K>(
&mut self,
setting_type: SettingType,
callback: SettingsRequestCallback<S, V, SV, K>,
) where
V: From<SettingInfo> + Send + Sync + 'static,
SV: Sender<V> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
let processing_unit = Box::new(
SettingProcessingUnit::<S, V, SV, K>::new(
setting_type,
self.base_processor.service_messenger.clone(),
callback,
)
.await,
);
self.base_processor.processing_units.push(processing_unit);
}
pub async fn process(self) {
self.base_processor.process().await
}
}
/// Wraps [`BaseFidlProcessor`] for use with FIDL APIs in the fuchsia.settings.policy namespace that
/// send and receive messages through the policy message hub.
///
/// [`BaseFidlProcessor`]: struct.BaseFidlProcessor.html
pub struct PolicyFidlProcessor<S>
where
S: ServiceMarker,
{
base_processor: BaseFidlProcessor<S>,
}
impl<S> PolicyFidlProcessor<S>
where
S: ServiceMarker,
{
pub async fn new(
stream: RequestStream<S>,
service_messenger: service::message::Messenger,
) -> Self {
Self { base_processor: BaseFidlProcessor::new(stream, service_messenger) }
}
/// Registers a fidl processing unit for policy requests.
pub async fn register(&mut self, callback: PolicyRequestCallback<S>) {
let processing_unit = Box::new(PolicyProcessingUnit::<S>::new(callback));
self.base_processor.processing_units.push(processing_unit);
}
pub async fn process(self) {
self.base_processor.process().await
}
}