blob: 543b11367b695495fa0fad2108054ad32031e854 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Watch request handling.
//!
//! This mod defines the components for handling hanging-get, or "watch", [Requests](Request). These
//! requests return a value to the requester when a value different from the previously returned /
//! value is available. This pattern is common across the various setting service interfaces.
//! Since there is context involved between watch requests, these job workloads are [Sequential].
//!
//! Users of these components define three implementations to create "watch"-related jobs. First,
//! implementations of [From<SettingInfo>] and [From<Error>] are needed. Since these requests will
//! always return a value on success, the request handling automatically converts the [SettingInfo].
//! The built-in conversion to the user type with the [From] trait implementation helps reduce the
//! explicit conversion in the responding code. Lastly, the user must implement [Responder], which
//! returns a [Result] converted from the [Response](crate::handler::base::Response) returned from
//! the setting service.
use crate::base::{SettingInfo, SettingType};
use crate::handler::base::{Error, Payload, Request};
use crate::job::data::{self, Data, Key};
use crate::job::work::{Error as WorkError, Load, Sequential};
use crate::job::Job;
use crate::job::Signature;
use crate::message::base::Audience;
use crate::message::receptor::Receptor;
use crate::service::{message, Address, Payload as ServicePayload, Role};
use crate::trace;
use crate::trace::TracingNonce;
use async_trait::async_trait;
use fuchsia_syslog::fx_log_warn;
use futures::channel::oneshot;
use futures::FutureExt;
use std::collections::HashMap;
use std::marker::PhantomData;
/// The key used to store the last value sent. This cache is scoped to the
/// [Job's Signature](Signature).
const LAST_VALUE_KEY: &str = "LAST_VALUE";
/// A custom function used to compare an existing setting value with a new one to determine if
/// listeners should be notified. If true is returned, listeners will be notified.
pub(crate) struct ChangeFunction {
function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool + Send + Sync + 'static>,
id: u64,
}
/// This struct can be used to manage construction of [ChangeFunctions](ChangeFunction). It tracks
/// the ids of all ChangeFunctions it has generated, so that each new one is unique from the others.
/// Given that [Signatures](Signature) use the responder type and this generated id, only one
/// ChangeFunctionGenerator needs to be managed per fidl interface.
pub(crate) struct ChangeFunctionGenerator {
next_id: u64,
}
impl ChangeFunctionGenerator {
#[allow(dead_code)]
fn new() -> Self {
Self { next_id: 0 }
}
}
impl ChangeFunctionGenerator {
// TODO(fxbug.dev/79044): remove allow dead_code once used
#[allow(dead_code)]
fn make<F>(&mut self, function: F) -> ChangeFunction
where
F: Fn(&'_ SettingInfo, &'_ SettingInfo) -> bool + Send + Sync + 'static,
{
let id = self.next_id;
self.next_id += 1;
ChangeFunction { function: Box::new(function), id }
}
}
/// [Responder] is a trait for handing back results of a watch request. It is unique from other
/// work responders, since [Work] consumers expect a value to be present on success. The Responder
/// specifies the conversions for [Response](crate::handler::base::Response).
pub trait Responder<
R: From<SettingInfo> + Send + Sync + 'static,
E: From<Error> + Send + Sync + 'static,
>
{
fn respond(self, response: Result<R, E>);
}
pub struct Work<
R: From<SettingInfo> + Send + Sync + 'static,
E: From<Error> + Send + Sync + 'static,
T: Responder<R, E> + Send + Sync + 'static,
> {
setting_type: SettingType,
signature: Signature,
responder: T,
cancelation_rx: oneshot::Receiver<()>,
change_function: Option<ChangeFunction>,
_response_type: PhantomData<R>,
_error_type: PhantomData<E>,
}
impl<
R: From<SettingInfo> + Send + Sync + 'static,
E: From<Error> + Send + Sync + 'static,
T: Responder<R, E> + Send + Sync + 'static,
> Work<R, E, T>
{
fn new(setting_type: SettingType, responder: T, cancelation_rx: oneshot::Receiver<()>) -> Self
where
T: 'static,
{
Self {
setting_type,
signature: Signature::new::<T>(),
responder,
cancelation_rx,
change_function: None,
_response_type: PhantomData,
_error_type: PhantomData,
}
}
pub(crate) fn new_job(setting_type: SettingType, responder: T) -> Job
where
T: 'static,
{
let (cancelation_tx, cancelation_rx) = oneshot::channel();
let work = Self::new(setting_type, responder, cancelation_rx);
Job::from((work, cancelation_tx))
}
// TODO(fxbug.dev/79044): remove allow dead_code once used
#[allow(dead_code)]
pub(crate) fn with_change_function(
setting_type: SettingType,
responder: T,
cancelation_rx: oneshot::Receiver<()>,
change_function: ChangeFunction,
) -> Self {
Self {
setting_type,
signature: Signature::with::<T>(change_function.id),
responder,
cancelation_rx,
change_function: Some(change_function),
_response_type: PhantomData,
_error_type: PhantomData,
}
}
async fn get_next(
&mut self,
receptor: &mut Receptor<ServicePayload, Address, Role>,
) -> Result<Result<Payload, anyhow::Error>, WorkError> {
let receptor = receptor.next_of::<Payload>().fuse();
let mut cancelation_rx = &mut self.cancelation_rx;
futures::pin_mut!(receptor);
futures::select! {
result = receptor => Ok(result.map(|(payload, _)| payload)),
_ = cancelation_rx => Err(WorkError::Canceled),
}
}
/// Returns a non-empty value when the last response should be returned to the caller. The lack
/// of a response indicates the watched value has not changed and watching will continue.
fn process_response(
&self,
response: Result<Payload, anyhow::Error>,
store: &mut HashMap<Key, Data>,
) -> Option<Result<SettingInfo, Error>> {
match response {
Ok(Payload::Response(Ok(Some(setting_info)))) => {
let key = Key::Identifier(LAST_VALUE_KEY);
let return_val = match (store.get(&key), self.change_function.as_ref()) {
// Apply the change function to determine if we should notify listeners.
(Some(Data::SettingInfo(info)), Some(change_function))
if !(change_function.function)(info, &setting_info) =>
{
None
}
// No change function used, compare the new info with the old.
(Some(Data::SettingInfo(info)), None) if *info == setting_info => None,
_ => Some(Ok(setting_info)),
};
if let Some(Ok(ref info)) = return_val {
let _ = store.insert(key, Data::SettingInfo(info.clone()));
}
return_val
}
Ok(Payload::Response(Err(error))) => Some(Err(error)),
Err(error) => {
fx_log_warn!(
"An error occurred while watching {:?}:{:?}",
self.setting_type,
error
);
Some(Err(match error.root_cause().downcast_ref::<Error>() {
Some(error) => error.clone(),
_ => crate::handler::base::Error::CommunicationError,
}))
}
_ => {
panic!("invalid variant {:?}", response);
}
}
}
}
#[async_trait]
impl<
R: From<SettingInfo> + Send + Sync + 'static,
E: From<Error> + Send + Sync + 'static,
T: Responder<R, E> + Send + Sync + 'static,
> Sequential for Work<R, E, T>
{
async fn execute(
mut self: Box<Self>,
messenger: message::Messenger,
store_handle: data::StoreHandle,
nonce: TracingNonce,
) -> Result<(), WorkError> {
trace!(nonce, "Sequential Work execute");
// Lock store for Job signature group.
let mut store = store_handle.lock().await;
// Begin listening for changes before fetching current value to ensure no changes are
// missed.
let mut listen_receptor = messenger
.message(
Payload::Request(Request::Listen).into(),
Audience::Address(Address::Handler(self.setting_type)),
)
.send();
// Get current value.
let mut get_receptor = messenger
.message(
Payload::Request(Request::Get).into(),
Audience::Address(Address::Handler(self.setting_type)),
)
.send();
// If a value was returned from the get call and considered updated (no existing or
// different), return new value immediately.
trace!(nonce, "Get first response");
let next_payload = self.get_next(&mut get_receptor).await?;
if let Some(response) = self.process_response(next_payload, &mut store) {
self.responder.respond(response.map(R::from).map_err(E::from));
return Ok(());
}
// Otherwise, loop a watch until an updated value is available
loop {
trace!(nonce, "Get looped response");
let next_payload = self.get_next(&mut listen_receptor).await?;
if let Some(response) = self.process_response(next_payload, &mut store) {
self.responder.respond(response.map(R::from).map_err(E::from));
return Ok(());
}
}
}
}
impl<
R: From<SettingInfo> + Send + Sync + 'static,
E: From<Error> + Send + Sync + 'static,
T: Responder<R, E> + Send + Sync + 'static,
> From<(Work<R, E, T>, oneshot::Sender<()>)> for Job
{
fn from((work, cancelation_tx): (Work<R, E, T>, oneshot::Sender<()>)) -> Job {
let signature = work.signature;
Job::new_with_cancellation(Load::Sequential(Box::new(work), signature), cancelation_tx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::base::{SettingInfo, UnknownInfo};
use crate::message::base::MessengerType;
use crate::message::MessageHubUtil;
use crate::service::{Address, MessageHub};
use assert_matches::assert_matches;
use fuchsia_async as fasync;
use futures::channel::oneshot::Sender;
use futures::lock::Mutex;
use std::sync::Arc;
struct TestResponder {
sender: Sender<Result<SettingInfo, Error>>,
}
impl TestResponder {
pub(crate) fn new(sender: Sender<Result<SettingInfo, Error>>) -> Self {
Self { sender }
}
}
impl Responder<SettingInfo, Error> for TestResponder {
fn respond(self, response: Result<SettingInfo, Error>) {
self.sender.send(response).expect("send should succeed");
}
}
#[test]
fn change_function_ids_do_not_match() {
let mut generator = ChangeFunctionGenerator::new();
let closure = |_: &SettingInfo, _: &SettingInfo| true;
let change_fn1 = generator.make(closure);
let change_fn2 = generator.make(closure);
assert_ne!(change_fn1.id, change_fn2.id);
let signature1 = Signature::with::<()>(change_fn1.id);
let signature2 = Signature::with::<()>(change_fn2.id);
assert_ne!(signature1, signature2);
let signature3 = Signature::with::<()>(change_fn1.id);
assert_eq!(signature1, signature3);
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_watch_basic_functionality() {
// Create store for job.
let store_handle = Arc::new(Mutex::new(HashMap::new()));
let get_info = SettingInfo::Unknown(UnknownInfo(true));
let listen_info = SettingInfo::Unknown(UnknownInfo(false));
// Make sure the first job execution returns the initial value (retrieved through get).
verify_watch(
store_handle.clone(),
listen_info.clone(),
get_info.clone(),
get_info.clone(),
None,
)
.await;
// Make sure the second job execution returns the value returned through watching (listen
// value).
verify_watch(
store_handle.clone(),
listen_info.clone(),
get_info.clone(),
listen_info.clone(),
None,
)
.await;
}
async fn verify_watch(
store_handle: data::StoreHandle,
listen_info: SettingInfo,
get_info: SettingInfo,
expected_info: SettingInfo,
change_function: Option<ChangeFunction>,
) {
// Create MessageHub for communication between components.
let message_hub_delegate = MessageHub::create_hub();
// Create mock handler endpoint to receive request.
let mut handler_receiver = message_hub_delegate
.create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
.await
.expect("handler messenger should be created")
.1;
let (response_tx, response_rx) =
futures::channel::oneshot::channel::<Result<SettingInfo, Error>>();
let (_cancelation_tx, cancelation_rx) = oneshot::channel();
let work = match change_function {
None => Box::new(Work::new(
SettingType::Unknown,
TestResponder::new(response_tx),
cancelation_rx,
)),
Some(change_function) => Box::new(Work::with_change_function(
SettingType::Unknown,
TestResponder::new(response_tx),
cancelation_rx,
change_function,
)),
};
// Execute work on async task.
let work_messenger = message_hub_delegate
.create(MessengerType::Unbound)
.await
.expect("messenger should be created")
.0;
let work_messenger_signature = work_messenger.get_signature();
fasync::Task::spawn(async move {
let _ = work.execute(work_messenger, store_handle, 0).await;
})
.detach();
// Ensure the listen request is received from the right sender.
let (listen_request, listen_client) = handler_receiver
.next_of::<Payload>()
.await
.expect("should successfully receive a listen request");
assert_matches!(listen_request, Payload::Request(Request::Listen));
assert!(listen_client.get_author() == work_messenger_signature);
// Listen should be followed by a get request.
let (get_request, get_client) = handler_receiver
.next_of::<Payload>()
.await
.expect("should successfully receive a get request");
assert_matches!(get_request, Payload::Request(Request::Get));
assert!(get_client.get_author() == work_messenger_signature);
// Reply to the get request.
let _ = get_client.reply(Payload::Response(Ok(Some(get_info))).into()).send();
let _ = listen_client.reply(Payload::Response(Ok(Some(listen_info))).into()).send();
assert_matches!(response_rx.await.expect("should receive successful response"),
Ok(x) if x == expected_info);
}
// This test verifies that custom change functions work by using a custom change function that
// always says a new value is different, even if the actual value is unchanged.
#[fuchsia_async::run_until_stalled(test)]
async fn test_custom_change_function() {
// Create store for job.
let store_handle = Arc::new(Mutex::new(HashMap::new()));
// Pre-fill the storage with the value so that the initial get will not trigger a response.
let unchanged_info = SettingInfo::Unknown(UnknownInfo(true));
let _ = store_handle
.lock()
.await
.insert(Key::Identifier(LAST_VALUE_KEY), Data::SettingInfo(unchanged_info.clone()));
let mut change_function_generator = ChangeFunctionGenerator::new();
verify_watch(
store_handle,
// Send the same value on both the get and listen requests so that the default change
// function would not trigger a response to the client.
unchanged_info.clone(),
unchanged_info.clone(),
unchanged_info,
// Use a custom change function that always reports a change.
Some(
change_function_generator.make(move |_old: &SettingInfo, _new: &SettingInfo| true),
),
)
.await;
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_error_propagation() {
// Create MessageHub for communication between components.
let message_hub_delegate = MessageHub::create_hub();
let (response_tx, response_rx) = oneshot::channel::<Result<SettingInfo, Error>>();
let (_cancelation_tx, cancelation_rx) = oneshot::channel::<()>();
// Create a listen request to a non-existent end-point.
let work = Box::new(Work::new(
SettingType::Unknown,
TestResponder::new(response_tx),
cancelation_rx,
));
let work_messenger = message_hub_delegate
.create(MessengerType::Unbound)
.await
.expect("messenger should be created")
.0;
// Execute work on async task.
fasync::Task::spawn(async move {
let _ = work.execute(work_messenger, Arc::new(Mutex::new(HashMap::new())), 0).await;
})
.detach();
// Ensure an error is returned by the executed work.
assert_matches!(response_rx.await.expect("should receive successful response"),
Err(x) if x == crate::handler::base::Error::CommunicationError);
}
}