blob: eb149450a350564e93163efbad44ed8230f11d5c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::action_fuse::ActionFuseHandle;
use crate::message::beacon::Beacon;
use crate::message::message_client::MessageClient;
use crate::message::messenger::MessengerClient;
use crate::message::receptor::Receptor;
use crate::message::Timestamp;
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot::Sender;
use std::collections::HashSet;
use std::fmt::Debug;
use std::hash::Hash;
use thiserror::Error;
/// Trait alias for types of data that can be used as the payload in a
/// MessageHub.
pub trait Payload: Clone + Debug + Send + Sync {}
impl<T: Clone + Debug + Send + Sync> Payload for T {}
/// Trait alias for types of data that can be used as an address in a
/// MessageHub.
pub trait Address: Copy + Debug + Eq + Hash + Unpin + Send + Sync {}
impl<T: Copy + Debug + Eq + Hash + Unpin + Send + Sync> Address for T {}
/// Trait alias for types of data that can be used as a role in a
/// MessageHub.
pub trait Role: Copy + Debug + Eq + Hash + Send + Sync {}
impl<T: Copy + Debug + Eq + Hash + Send + Sync> Role for T {}
/// A mod for housing common definitions for messengers. Messengers are
/// MessageHub participants, which are capable of sending and receiving
/// messages.
pub(super) mod messenger {
use super::{role, Address, MessengerType, Payload, Role};
use std::collections::HashSet;
pub type Roles<R> = HashSet<role::Signature<R>>;
/// `Descriptor` is a blueprint for creating a messenger. It is sent to the
/// MessageHub by clients, which interprets the information to build the
/// messenger.
#[derive(Clone)]
pub struct Descriptor<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// The type of messenger to be created. This determines how messages
/// can be directed to a messenger created from this Descriptor.
/// Please reference [Audience](crate::message::base::Audience) to see how these types map
/// to audience targets.
pub messenger_type: MessengerType<P, A, R>,
/// The roles to associate with this messenger. When a messenger
/// is associated with a given [`Role`], any message directed to that
/// [`Role`] will be delivered to the messenger.
pub roles: Roles<R>,
}
}
/// A MessageEvent defines the data that can be returned through a message
/// receptor.
#[derive(Debug, PartialEq)]
pub enum MessageEvent<P: Payload + 'static, A: Address + 'static, R: Role + 'static = default::Role>
{
/// A message that has been delivered, either as a new message directed at to
/// the recipient's address or a reply to a previously sent message
/// (dependent on the receptor's context).
Message(P, MessageClient<P, A, R>),
/// A status update for the message that spawned the receptor delivering this
/// update.
Status(Status),
}
/// This mod contains common definitions for working with [`Role`]. [`Role`]
/// defines a group which messengers can belong to and messages can be directed
/// to.
pub mod role {
use super::Role;
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot::Sender;
/// An enumeration of role-related actions that can be requested upon the
/// MessageHub.
#[allow(dead_code)]
pub(in crate::message) enum Action<R: Role + 'static> {
/// Creates an anonymous Role at runtime.
Create(ResultSender<R>),
}
/// A sender given to MessageHub clients to relay role-related requests.
pub(in crate::message) type ActionSender<R> = UnboundedSender<Action<R>>;
/// A sender passed along with some [`Action`] types in order to send back a
/// response.
pub(in crate::message) type ResultSender<R> = Sender<Result<Response<R>, Error>>;
/// The return value in response to an [`Action`] upon success.
pub(in crate::message) enum Response<R: Role + 'static> {
Role(Signature<R>),
}
/// The error types sent when `Action` fails.
#[derive(thiserror::Error, Debug, Clone, PartialEq)]
pub enum Error {
/// The MessageHub handed back a response type we weren't expecting.
#[error("Unexpected response")]
UnexpectedResponse,
/// There was an issue communicating back the response.
#[error("Communication Error")]
CommunicationError,
}
/// The public representation of a role. `Signature` is used for adding a
/// messenger to a particular role group and targeting a particular group
/// as the audience for an outbound message.
#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
pub struct Signature<R: Role + 'static> {
signature_type: SignatureType<R>,
}
impl<R: Role + 'static> Signature<R> {
/// Returns a `Signature` based on the a predefined role.
pub(crate) fn role(role: R) -> Self {
Self { signature_type: SignatureType::Role(role) }
}
/// Returns a `Signature` based on a generated or anonymous role. This
/// `Signature` can only be generated by the MessageHub.
pub(in crate::message) fn handle(handle: Handle) -> Self {
Self { signature_type: SignatureType::Anonymous(handle) }
}
}
/// An enumeration of role types used internally in [`Signature`] to
/// uniquely identify the role.
#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
enum SignatureType<R: Role + 'static> {
Role(R),
Anonymous(Handle),
}
/// A `Handle` is a reference to a role generated at runtime. `Handle`
/// should be transparent to the client and only produced as part of a
/// [`Signature`] variant through the MessageHub.
#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
pub(in crate::message) struct Handle {
id: usize,
}
impl Handle {
pub(super) fn new(id: usize) -> Self {
Handle { id }
}
}
/// `Generator` is a helper for generating roles at runtime. Each invocation
/// produces a [`Signature`] for a unique anonymous role.
pub(in crate::message) struct Generator {
next_id: usize,
}
impl Generator {
/// Instantiates a new `Generator`.
pub(in crate::message) fn new() -> Self {
Self { next_id: 0 }
}
/// Produces a `Signature` referencing a unique role at runtime.
pub(in crate::message) fn generate<R: Role + 'static>(&mut self) -> Signature<R> {
let handle = Handle::new(self.next_id);
self.next_id += 1;
Signature::handle(handle)
}
}
}
/// This mod contains the default type definitions for the MessageHub's type
/// parameters when not specified.
pub mod default {
/// `Address` provides a [`Address`] definition for message hubs not needing
/// an address.
#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
pub enum Address {}
/// `Role` provides a [`Role`] definition for message hubs not needing
/// roles.
#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
pub enum Role {}
}
#[derive(Error, Debug, Clone)]
pub enum MessageError<A: Address + 'static> {
#[error("Address conflig:{address:?} already exists")]
AddressConflict { address: A },
#[error("Unexpected Error")]
Unexpected,
}
/// The types of results possible from sending or replying.
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum Status {
// Sent to some audience, potentially no one.
Broadcasted,
// Received by the intended address.
Received,
// Could not be delivered to the specified target.
// TODO(61469): add intended address to this enum.
Undeliverable,
// Acknowledged by a recipient.
Acknowledged,
Timeout,
}
/// The intended recipients for a message.
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
pub enum Audience<A: Address + 'static, R: Role + 'static = default::Role> {
// All non-broker messengers outside of the sender.
Broadcast,
// An Audience Group.
Group(group::Group<A, R>),
// The messenger at the specified address.
Address(A),
// The messenger with the specified signature.
Messenger(Signature<A>),
// A messenger who belongs to the specified role.
Role(role::Signature<R>),
}
impl<A: Address + 'static, R: Role + 'static> Audience<A, R> {
/// Indicates whether a message directed towards this `Audience` must match
/// to a messenger or if it's okay for the message to be delivered to no
/// one. For example, broadcasts are meant to be delivered to any
/// (potentially no) messenger.
pub fn requires_delivery(&self) -> bool {
match self {
Audience::Broadcast => false,
Audience::Role(_) => false,
Audience::Group(group) => {
group.audiences.iter().any(|audience| audience.requires_delivery())
}
Audience::Address(_) | Audience::Messenger(_) => true,
}
}
pub fn contains(&self, audience: &Audience<A, R>) -> bool {
audience.flatten().is_subset(&self.flatten())
}
pub fn flatten(&self) -> HashSet<Audience<A, R>> {
match self {
Audience::Group(group) => {
group.audiences.iter().map(|audience| audience.flatten()).flatten().collect()
}
_ => [self.clone()].into(),
}
}
}
pub mod group {
use super::{Address, Audience, Role};
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
pub struct Group<A: Address + 'static, R: Role + 'static> {
pub audiences: Vec<Audience<A, R>>,
}
impl<A: Address + 'static, R: Role + 'static> Group<A, R> {
pub fn contains(&self, audience: &Audience<A, R>) -> bool {
for target in &self.audiences {
if target == audience {
return true;
} else if let Audience::Group(group) = target {
if group.contains(audience) {
return true;
}
}
}
false
}
}
#[cfg(test)]
pub(crate) struct Builder<A: Address + 'static, R: Role + 'static> {
audiences: Vec<Audience<A, R>>,
}
#[cfg(test)]
impl<A: Address + 'static, R: Role + 'static> Builder<A, R> {
pub(crate) fn new() -> Self {
Self { audiences: vec![] }
}
pub(crate) fn add(mut self, audience: Audience<A, R>) -> Self {
self.audiences.push(audience);
self
}
pub(crate) fn build(self) -> Group<A, R> {
Group { audiences: self.audiences }
}
}
}
/// An identifier that can be used to send messages directly to a Messenger.
/// Included with Message instances.
#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)]
pub enum Signature<A> {
// Messenger at a given address.
Address(A),
// The messenger cannot be directly addressed.
Anonymous(MessengerId),
}
#[derive(Copy, Clone, Debug)]
pub struct Fingerprint<A> {
pub id: MessengerId,
pub signature: Signature<A>,
}
/// The messengers that can participate in messaging
#[derive(Clone, Debug)]
pub enum MessengerType<
P: Payload + 'static,
A: Address + 'static,
R: Role + 'static = default::Role,
> {
/// An endpoint in the messenger graph. Can have messages specifically
/// addressed to it and can author new messages.
Addressable(A),
/// A intermediary messenger. Will receive every forwarded message. Brokers
/// are able to send and reply to messages, but the main purpose is to observe
/// messages. An optional filter may be specified, which limits the messages
/// directed to this broker.
Broker(Option<filter::Filter<P, A, R>>),
/// A messenger that cannot be reached by an address.
Unbound,
}
pub mod filter {
use super::{Address, Audience, Message, MessageType, Payload, Role, Signature};
use core::fmt::{Debug, Formatter};
use std::sync::Arc;
/// `Condition` allows specifying a filter condition that must be true
/// for a filter to match.
#[derive(Clone)]
pub enum Condition<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// Matches on the message's intended audience as specified by the
/// sender.
Audience(Audience<A, R>),
/// Matches on the author's signature.
Author(Signature<A>),
/// Matches on a custom closure that may evaluate the sent message.
Custom(Arc<dyn Fn(&Message<P, A, R>) -> bool + Send + Sync>),
/// Matches on another filter and its conditions.
Filter(Filter<P, A, R>),
}
/// We must implement Debug since the `Condition::Custom` does not provide
/// a `Debug` implementation.
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Debug for Condition<P, A, R> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
let condition = match self {
Condition::Audience(audience) => format!("audience:{:?}", audience),
Condition::Author(signature) => format!("author:{:?}", signature),
Condition::Custom(_) => "custom".to_string(),
Condition::Filter(filter) => format!("filter:{:?}", filter),
};
write!(f, "Condition: {:?}", condition)
}
}
// TODO(fxbug.dev/68663): investigate implementing std::ops::Bit* traits.
/// `Conjugation` dictates how multiple conditions are combined to determine
/// a match.
#[derive(Clone, Debug, PartialEq)]
pub enum Conjugation {
/// All conditions must match.
All,
/// Any condition may declare a match.
Any,
}
/// `Builder` provides a convenient way to construct a [`Filter`] based on
/// a number of conditions.
pub struct Builder<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
conjugation: Conjugation,
conditions: Vec<Condition<P, A, R>>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Builder<P, A, R> {
pub(crate) fn new(condition: Condition<P, A, R>, conjugation: Conjugation) -> Self {
Self { conjugation, conditions: vec![condition] }
}
/// Shorthand method to create a filter based on a single condition.
pub(crate) fn single(condition: Condition<P, A, R>) -> Filter<P, A, R> {
Builder::new(condition, Conjugation::All).build()
}
/// Adds an additional condition to the filter under construction.
pub(crate) fn append(mut self, condition: Condition<P, A, R>) -> Self {
self.conditions.push(condition);
self
}
pub(crate) fn build(self) -> Filter<P, A, R> {
Filter { conjugation: self.conjugation, conditions: self.conditions }
}
}
/// `Filter` is used by the `MessageHub` to determine whether an incoming
/// message should be directed to associated broker.
#[derive(Clone, Debug)]
pub struct Filter<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
conjugation: Conjugation,
conditions: Vec<Condition<P, A, R>>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Filter<P, A, R> {
pub(crate) fn matches(&self, message: &Message<P, A, R>) -> bool {
for condition in &self.conditions {
let match_found = match condition {
Condition::Audience(audience) => matches!(
message.get_type(),
MessageType::Origin(target) if target.contains(audience)),
Condition::Custom(check_fn) => (check_fn)(message),
Condition::Filter(filter) => filter.matches(&message),
Condition::Author(signature) => message.get_author().eq(signature),
};
if match_found {
if self.conjugation == Conjugation::Any {
return true;
}
} else if self.conjugation == Conjugation::All {
return false;
}
}
self.conjugation == Conjugation::All
}
}
}
/// MessageType captures details about the Message's source.
#[derive(Clone, Debug)]
pub enum MessageType<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// A completely new message that is intended for the specified audience.
Origin(Audience<A, R>),
/// A response to a previously received message. Note that the value must
/// be boxed to mitigate recursive sizing issues as MessageType is held by
/// Message.
Reply(Box<Message<P, A, R>>),
}
/// `Attribution` describes the relationship of the message path in relation
/// to the author.
#[derive(Clone, Debug)]
pub enum Attribution<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// `Source` attributed messages are the original messages to be sent on a
/// path. For example, a source attribution for an origin message type will
/// be authored by the original sender. In a reply message type, a source
/// attribution means the reply was authored by the original message's
/// intended target.
Source(MessageType<P, A, R>),
/// `Derived` attributed messages are messages that have been modified by
/// someone in the message path. They follow the same trajectory (audience
/// or return path), but their message has been altered. The supplied
/// signature is the messenger that modified the specified message.
Derived(Box<Message<P, A, R>>, Signature<A>),
}
/// The core messaging unit. A Message may be annotated by messengers, but is
/// not associated with a particular Messenger instance.
#[derive(Clone, Debug)]
pub struct Message<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
author: Fingerprint<A>,
// TODO(fxbug.dev/84729)
#[allow(unused)]
timestamp: Timestamp,
payload: P,
attribution: Attribution<P, A, R>,
// The return path is generated while the message is passed from messenger
// to messenger on the way to the intended recipient. It indicates the
// messengers that would like to be informed of replies to this message.
// The message author is always the last element in this vector. New
// participants are pushed to the front.
return_path: Vec<Beacon<P, A, R>>,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Message<P, A, R> {
/// Returns a new Message instance. Only the MessageHub can mint new messages.
pub(super) fn new(
author: Fingerprint<A>,
timestamp: Timestamp,
payload: P,
attribution: Attribution<P, A, R>,
) -> Message<P, A, R> {
let mut return_path = vec![];
// A derived message adopts the return path of the original message.
if let Attribution::Derived(message, _) = &attribution {
return_path.extend(message.get_return_path().iter().cloned());
}
Message { author, timestamp, payload, attribution, return_path }
}
/// Adds an entity to be notified on any replies.
pub(super) fn add_participant(&mut self, participant: Beacon<P, A, R>) {
self.return_path.insert(0, participant);
}
#[cfg(test)]
pub(super) fn get_timestamp(&self) -> Timestamp {
self.timestamp
}
/// Returns the Signatures of messengers who have modified this message
/// through propagation.
#[cfg(test)]
pub(super) fn get_modifiers(&self) -> Vec<Signature<A>> {
let mut modifiers = vec![];
if let Attribution::Derived(origin, signature) = &self.attribution {
modifiers.push(*signature);
modifiers.extend(origin.get_modifiers());
}
modifiers
}
pub(crate) fn get_author(&self) -> Signature<A> {
match &self.attribution {
Attribution::Source(_) => self.author.signature,
Attribution::Derived(message, _) => message.get_author(),
}
}
/// Binds the action fuse to the author's receptor. The fuse will fire
/// once that receptor is released.
pub(super) async fn bind_to_author(&mut self, fuse: ActionFuseHandle) {
if let Some(beacon) = self.return_path.last_mut() {
beacon.add_fuse(fuse).await;
}
}
/// Returns the list of participants for the reply return path.
pub(super) fn get_return_path(&self) -> &Vec<Beacon<P, A, R>> {
&self.return_path
}
/// Returns the message's attribution, which identifies whether it has been modified by a source
/// other than the original author.
pub(crate) fn get_attribution(&self) -> &Attribution<P, A, R> {
&self.attribution
}
/// Returns the message's type.
pub(crate) fn get_type(&self) -> &MessageType<P, A, R> {
match &self.attribution {
Attribution::Source(message_type) => message_type,
Attribution::Derived(message, _) => message.get_type(),
}
}
/// Returns a reference to the message's payload.
pub(crate) fn payload(&self) -> &P {
&self.payload
}
/// Delivers the supplied status to all participants in the return path.
pub(super) async fn report_status(&self, status: Status) {
for beacon in &self.return_path {
// It's ok if the other end is already closed and does not get an update, so we can
// safely ignore the result here.
let _ = beacon.status(status).await;
}
}
}
/// Type definition for a sender handed by the MessageHub to messengers to
/// send actions.
pub(super) type ActionSender<P, A, R> =
UnboundedSender<(Fingerprint<A>, MessageAction<P, A, R>, Option<Beacon<P, A, R>>)>;
/// An internal identifier used by the MessageHub to identify messengers.
pub(super) type MessengerId = usize;
/// An internal identifier used by the `MessageHub` to identify `MessageClient`.
pub(super) type MessageClientId = usize;
pub(super) type CreateMessengerResult<P, A, R> =
Result<(MessengerClient<P, A, R>, Receptor<P, A, R>), MessageError<A>>;
/// Callback for handing back a messenger
pub(super) type MessengerSender<P, A, R> = Sender<CreateMessengerResult<P, A, R>>;
/// Callback for checking on messenger presence
#[cfg(test)]
pub(super) type MessengerPresenceSender<A> = Sender<MessengerPresenceResult<A>>;
#[cfg(test)]
pub(super) type MessengerPresenceResult<A> = Result<bool, MessageError<A>>;
/// Type definition for a sender handed by the MessageHub to spawned components
/// (messenger factories and messengers) to control messengers.
pub(super) type MessengerActionSender<P, A, R> = UnboundedSender<MessengerAction<P, A, R>>;
/// Internal representation of possible actions around a messenger.
pub(super) enum MessengerAction<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// Creates a top level messenger
Create(
messenger::Descriptor<P, A, R>,
MessengerSender<P, A, R>,
MessengerActionSender<P, A, R>,
),
#[cfg(test)]
/// Check whether a messenger exists for the given [`Signature`]
CheckPresence(Signature<A>, MessengerPresenceSender<A>),
/// Deletes a messenger by its [`Signature`]
DeleteBySignature(Signature<A>),
}
/// Internal representation for possible actions on a message.
#[derive(Debug)]
pub(super) enum MessageAction<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
// A new message sent to the specified audience.
Send(P, Attribution<P, A, R>, Timestamp),
// The message has been forwarded by the current holder.
Forward(Message<P, A, R>),
}