blob: d3b6d84673d9f760679dfb66bbddb97f57f9fef3 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::action_fuse::{ActionFuseBuilder, ActionFuseHandle};
use crate::message::base::{
Address, Message, MessageClientId, MessageEvent, MessengerId, Payload, Role, Status,
};
use crate::message::message_client::MessageClient;
use crate::message::messenger::Messenger;
use crate::message::receptor::Receptor;
use anyhow::{format_err, Error};
use fuchsia_async::{self as fasync, DurationExt};
use fuchsia_zircon::Duration;
use futures::channel::mpsc::UnboundedSender;
use futures::future::TryFutureExt;
use futures::future::{AbortHandle, Abortable};
use futures::lock::Mutex;
use std::sync::Arc;
/// Helper for creating a beacon. The builder allows chaining additional fuses
pub struct BeaconBuilder<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
messenger: Messenger<P, A, R>,
chained_fuses: Vec<ActionFuseHandle>,
timeout: Option<Duration>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> BeaconBuilder<P, A, R> {
pub(super) fn new(messenger: Messenger<P, A, R>) -> Self {
Self { messenger, chained_fuses: vec![], timeout: None }
}
pub(super) fn add_fuse(mut self, fuse: ActionFuseHandle) -> Self {
self.chained_fuses.push(fuse);
self
}
pub(super) fn set_timeout(mut self, duration: Option<Duration>) -> Self {
self.timeout = duration;
self
}
pub(super) fn build(self) -> (Beacon<P, A, R>, Receptor<P, A, R>) {
Beacon::create(self.messenger, self.chained_fuses, self.timeout)
}
}
/// A Beacon is the conduit for sending messages to a particular Receptor. An
/// instance may be cloned and passed around to other components. All copies of
/// a particular Beacon share a reference to an flag that signals whether the
/// Receptor is active, which controls whether future messages will be sent.
///
/// It is important to note that Beacons spawn from sending a Message. Status
/// and other context sent through the Beacon are in relation to this original
/// Message (either an origin or reply).
#[derive(Clone, Debug)]
pub struct Beacon<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// A reference to the associated Messenger. This is only used when delivering
/// a new message to a beacon, where a MessageClient (which references both
/// the recipient's Messenger and the message) must be created.
messenger: Messenger<P, A, R>,
/// The sender half of an internal channel established between the Beacon and
/// Receptor.
event_sender: UnboundedSender<MessageEvent<P, A, R>>,
/// Sentinel for secondary ActionFuses
sentinel: Arc<Mutex<Sentinel>>,
/// Timeout for firing if a response payload is not delivered in time.
timeout_abort_client: AbortHandle,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Beacon<P, A, R> {
/// Creates a Beacon, Receptor tuple. The Messenger provided as an argument
/// will be associated with any delivered Message for reply purposes.
fn create(
messenger: Messenger<P, A, R>,
fuses: Vec<ActionFuseHandle>,
timeout: Option<Duration>,
) -> (Beacon<P, A, R>, Receptor<P, A, R>) {
let sentinel = Arc::new(Mutex::new(Sentinel::new()));
let (event_tx, event_rx) = futures::channel::mpsc::unbounded::<MessageEvent<P, A, R>>();
let (timeout_abort_client, timeout_abort_server) = AbortHandle::new_pair();
let signature = messenger.get_signature();
let beacon = Beacon {
messenger,
event_sender: event_tx.clone(),
sentinel: sentinel.clone(),
timeout_abort_client: timeout_abort_client.clone(),
};
// pass fuse to receptor to hold and set when it goes out of scope.
let receptor = Receptor::new(
signature,
event_rx,
ActionFuseBuilder::new()
.add_action(Box::new(move || {
let sentinel = sentinel.clone();
fasync::Task::spawn(async move {
timeout_abort_client.abort();
sentinel.lock().await.trigger().await;
})
.detach();
}))
.chain_fuses(fuses)
.build(),
);
if let Some(duration) = timeout {
let abortable_timeout = Abortable::new(
async move {
fuchsia_async::Timer::new(duration.after_now()).await;
// Panic if send failed, otherwise the client cannot abort processes.
event_tx
.unbounded_send(MessageEvent::Status(Status::Timeout))
.expect("Beacon::create, event_tx failed to send Timeout status message");
},
timeout_abort_server,
);
fasync::Task::spawn(abortable_timeout.unwrap_or_else(|_| ())).detach();
}
(beacon, receptor)
}
/// Sends the Status associated with the original message that spawned
/// this beacon.
pub(super) async fn status(&self, status: Status) -> Result<(), Error> {
if self.event_sender.unbounded_send(MessageEvent::Status(status)).is_err() {
return Err(format_err!("failed to deliver status"));
}
Ok(())
}
/// Delivers a response to the original message that spawned this Beacon.
pub(super) async fn deliver(
&self,
message: Message<P, A, R>,
client_id: MessageClientId,
) -> Result<(), Error> {
self.timeout_abort_client.abort();
if self
.event_sender
.unbounded_send(MessageEvent::Message(
message.payload().clone(),
MessageClient::new(client_id, message, self.messenger.clone()),
))
.is_err()
{
return Err(format_err!("failed to deliver message"));
}
Ok(())
}
/// Adds the specified fuse to the beacon's sentinel.
pub(super) async fn add_fuse(&mut self, fuse: ActionFuseHandle) {
self.sentinel.lock().await.add_fuse(fuse);
}
/// Returns the identifier for the associated Messenger.
pub(super) fn get_messenger_id(&self) -> MessengerId {
self.messenger.get_id()
}
}
/// Sentinel gathers actions fuses from other sources and releases them
/// on-demand.
struct Sentinel {
active: bool,
fuses: Vec<ActionFuseHandle>,
}
impl Sentinel {
/// Generates a new Sentinel.
fn new() -> Self {
Self { active: true, fuses: vec![] }
}
/// Adds a fuse if still active.
fn add_fuse(&mut self, fuse: ActionFuseHandle) {
// In the case we're not active anymore, do not add fuse.
if !self.active {
return;
}
self.fuses.push(fuse);
}
/// Removes all pending fuses.
async fn trigger(&mut self) {
self.active = false;
// Clear fuses, triggering them.
self.fuses.clear();
}
}