blob: 383173449fa5b1aa0e9a8c991fa76aa201e3c31c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::action_fuse::ActionFuseHandle;
use crate::message::base::{
default, messenger, role, ActionSender, Address, Audience, CreateMessengerResult, Fingerprint,
Message, MessageAction, MessageError, MessageType, MessengerAction, MessengerActionSender,
MessengerId, MessengerType, Payload, Role, Signature,
};
use crate::message::beacon::Beacon;
use crate::message::message_builder::MessageBuilder;
use fuchsia_syslog::fx_log_warn;
use std::collections::HashSet;
use std::convert::identity;
/// `Builder` is the default way for creating a new messenger. Beyond the base
/// messenger type, this helper allows for roles to be associated as well
/// during construction.
pub struct Builder<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// The sender for sending messenger creation requests to the MessageHub.
messenger_action_tx: MessengerActionSender<P, A, R>,
/// The type of messenger to be created. Along with roles, the messenger
/// type determines what audiences the messenger is included in.
messenger_type: MessengerType<P, A, R>,
/// The roles to associate with this messenger.
roles: HashSet<role::Signature<R>>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Builder<P, A, R> {
/// Creates a new builder for constructing a messenger of the given
/// type.
pub(super) fn new(
messenger_action_tx: MessengerActionSender<P, A, R>,
messenger_type: MessengerType<P, A, R>,
) -> Self {
Self { messenger_action_tx, messenger_type, roles: HashSet::new() }
}
/// Includes the specified role in the list of roles to be associated with
/// the new messenger.
pub(crate) fn add_role(mut self, role: role::Signature<R>) -> Self {
let _ = self.roles.insert(role);
self
}
/// Constructs a messenger based on specifications supplied.
pub(crate) async fn build(self) -> CreateMessengerResult<P, A, R> {
let (tx, rx) = futures::channel::oneshot::channel::<CreateMessengerResult<P, A, R>>();
// Panic if send failed since a messenger cannot be created.
self.messenger_action_tx
.unbounded_send(MessengerAction::Create(
messenger::Descriptor { messenger_type: self.messenger_type, roles: self.roles },
tx,
self.messenger_action_tx.clone(),
))
.expect("Builder::build, messenger_action_tx failed to send message");
rx.await.map_err(|_| MessageError::Unexpected).and_then(identity)
}
}
/// TargetedMessengerClient is a wrapper around [`MessengerClient`] that limits
/// the audience of sent messages to a preset target.
#[derive(Clone, Debug)]
pub struct TargetedMessengerClient<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
// TODO(fxbug.dev/84729)
#[allow(unused)]
client: MessengerClient<P, A, R>,
// TODO(fxbug.dev/84729)
#[allow(unused)]
audience: Audience<A, R>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static>
TargetedMessengerClient<P, A, R>
{
#[cfg(test)]
pub(crate) fn new(client: MessengerClient<P, A, R>, audience: Audience<A, R>) -> Self {
Self { client, audience }
}
/// Creates a MessageBuilder for a new message with the specified payload.
#[cfg(test)]
pub(crate) fn message(&self, payload: P) -> MessageBuilder<P, A, R> {
self.client.message(payload, self.audience.clone())
}
}
/// MessengerClient is a wrapper around a messenger with a fuse.
#[derive(Clone, Debug)]
pub struct MessengerClient<
P: Payload + 'static,
A: Address + 'static,
R: Role + 'static = default::Role,
> {
messenger: Messenger<P, A, R>,
// TODO(fxbug.dev/84729)
#[allow(unused)]
fuse: ActionFuseHandle,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> MessengerClient<P, A, R> {
pub(super) fn new(
messenger: Messenger<P, A, R>,
fuse: ActionFuseHandle,
) -> MessengerClient<P, A, R> {
MessengerClient { messenger, fuse }
}
/// Creates a MessageBuilder for a new message with the specified payload
/// and audience.
pub(crate) fn message(&self, payload: P, audience: Audience<A, R>) -> MessageBuilder<P, A, R> {
MessageBuilder::new(payload, MessageType::Origin(audience), self.messenger.clone())
}
/// Returns the signature of the client that will handle any sent messages.
pub fn get_signature(&self) -> Signature<A> {
self.messenger.get_signature()
}
}
/// Messengers provide clients the ability to send messages to other registered
/// clients. They can only be created through a MessageHub.
#[derive(Clone, Debug)]
pub struct Messenger<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
fingerprint: Fingerprint<A>,
action_tx: ActionSender<P, A, R>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Messenger<P, A, R> {
pub(super) fn new(
fingerprint: Fingerprint<A>,
action_tx: ActionSender<P, A, R>,
) -> Messenger<P, A, R> {
Messenger { fingerprint, action_tx }
}
/// Returns the identification for this Messenger.
pub(super) fn get_id(&self) -> MessengerId {
self.fingerprint.id
}
/// Forwards the message to the next Messenger. Note that this method is
/// private and only called through the MessageClient.
pub(super) fn forward(&self, message: Message<P, A, R>, beacon: Option<Beacon<P, A, R>>) {
self.transmit(MessageAction::Forward(message), beacon);
}
/// Tranmits a given action to the message hub. This is a common utility
/// method to be used for immediate actions (forwarding, observing) and
/// deferred actions as well (sending, replying).
pub(super) fn transmit(&self, action: MessageAction<P, A, R>, beacon: Option<Beacon<P, A, R>>) {
// Do not transmit if the message hub has exited.
if self.action_tx.is_closed() {
return;
}
// Log info. transmit is called by forward. However, forward might fail if there is no next
// Messenger exists.
self.action_tx.unbounded_send((self.fingerprint, action, beacon)).unwrap_or_else(|_| {
fx_log_warn!("Messenger::transmit, action_tx failed to send message")
});
}
pub(super) fn get_signature(&self) -> Signature<A> {
self.fingerprint.signature
}
}