blob: c277bcd0d9744350dff9b82c89c5cbd8b0ade24a [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::action_fuse::ActionFuseBuilder;
use crate::message::action_fuse::ActionFuseHandle;
use crate::message::base::{
filter::Filter, messenger, role, ActionSender, Address, Attribution, Audience, Fingerprint,
Message, MessageAction, MessageClientId, MessageError, MessageType, MessengerAction,
MessengerId, MessengerType, Payload, Role, Signature, Status,
use crate::message::beacon::{Beacon, BeaconBuilder};
use crate::message::delegate::Delegate;
use crate::message::messenger::{Messenger, MessengerClient};
use crate::trace::TracingNonce;
use crate::{trace, trace_guard};
use anyhow::format_err;
use fuchsia_async as fasync;
use fuchsia_syslog::{fx_log_err, fx_log_warn};
use futures::lock::Mutex;
use futures::StreamExt;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
use std::sync::Arc;
/// Type definition for a handle to the MessageHub. There is a single instance
/// of a hub per communication ecosystem and therefore held behind an Arc mutex.
pub type MessageHubHandle<P, A, R> = Arc<Mutex<MessageHub<P, A, R>>>;
/// Type definition for exit message sender.
type ExitSender = futures::channel::mpsc::UnboundedSender<()>;
#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("Failed to send response for operation: {0:?}")]
ResponseSendFail(Cow<'static, str>),
#[error("Messenger not present")]
/// `Broker` captures the information necessary to process messages to a broker.
struct Broker<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// The `MessengerId` associated with the broker so that it can be distinguished from other
/// messengers.
messenger_id: MessengerId,
/// A condition that is applied to a message to determine whether it should be directed to the
/// broker.
filter: Option<Filter<P, A, R>>,
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> PartialEq for Broker<P, A, R> {
fn eq(&self, other: &Self) -> bool {
// Since each broker has a unique [`MessengerId`], it is implied that any brokers that share
// the same [`MessengerId`] are the same, having matching filters as well.
self.messenger_id == other.messenger_id
/// The MessageHub controls the message flow for a set of messengers. It
/// processes actions upon messages, incorporates brokers, and signals receipt
/// of messages.
pub struct MessageHub<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
/// A sender given to messengers to signal actions upon the MessageHub.
action_tx: ActionSender<P, A, R>,
/// Address mapping for looking up messengers. Used for sending messages
/// to an addressable recipient.
addresses: HashMap<A, MessengerId>,
/// MessengerId mapping to roles. This mapping allows the `MessageHub`
/// to remove a messenger from role memberships upon removal.
messengers: HashMap<MessengerId, messenger::Roles<R>>,
/// Role mapping for looking up which messengers belong to a particular
/// role.
roles: HashMap<role::Signature<R>, HashSet<MessengerId>>,
/// Mapping of registered messengers (including brokers) to beacons. Used for
/// delivering messages from a resolved address or a list of participants.
beacons: HashMap<MessengerId, Beacon<P, A, R>>,
/// An ordered set of messengers who will be forwarded messages.
brokers: Vec<Broker<P, A, R>>,
/// The next id to be given to a messenger.
next_id: MessengerId,
/// The next id to be given to a `MessageClient`.
next_message_client_id: MessageClientId,
/// Indicates whether the messenger channel has closed.
messenger_channel_closed: bool,
/// Generator for creating new roles.
role_generator: role::Generator,
/// Sender to signal when the hub should exit.
exit_tx: ExitSender,
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> MessageHub<P, A, R> {
/// Returns a new MessageHub for the given types.
pub(crate) fn create(fuse: Option<ActionFuseHandle>) -> Delegate<P, A, R> {
let (action_tx, mut action_rx) = futures::channel::mpsc::unbounded::<(
MessageAction<P, A, R>,
Option<Beacon<P, A, R>>,
let (messenger_tx, mut messenger_rx) =
futures::channel::mpsc::unbounded::<MessengerAction<P, A, R>>();
// A channel used by the MessageHub to listen to requests for
// role-related actions.
let (role_tx, mut role_rx) = futures::channel::mpsc::unbounded::<role::Action<R>>();
let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
let mut hub = MessageHub {
next_id: 0,
next_message_client_id: 0,
beacons: HashMap::new(),
addresses: HashMap::new(),
messengers: HashMap::new(),
roles: HashMap::new(),
brokers: Vec::new(),
messenger_channel_closed: false,
role_generator: role::Generator::new(),
fasync::Task::spawn(async move {
// Released when exiting scope to signal completion.
let _scope_fuse = fuse;
let nonce = fuchsia_trace::generate_nonce();
trace!(nonce, "message hub");
loop {
// We must prioritize the action futures. Exit actions
// take absolute priority. Message actions are ordered before
// messenger in case the messenger is subsequently deleted.
futures::select_biased! {
_ = => {
message_action = action_rx.select_next_some() => {
"message action"
let (fingerprint, action, beacon) = message_action;
hub.process_request(nonce, fingerprint, action, beacon).await;
messenger_action = => {
"messenger action"
match messenger_action {
Some(action) => {
hub.process_messenger_request(nonce, action).await;
None => {
hub.messenger_channel_closed = true;
role_action = role_rx.select_next_some() => {
"role action"
if hub.process_role_request(nonce, role_action).is_err() {
fx_log_err!("failed to process role action");
Delegate::new(messenger_tx, role_tx)
fn check_exit(&self) {
if self.messenger_channel_closed && self.beacons.is_empty() {
// We can ignore the result. If exit_tx fails to send, the task has already ended.
let _ = self.exit_tx.unbounded_send(());
// Determines whether the beacon belongs to a broker.
fn is_broker(&self, messenger_id: MessengerId) -> bool {
self.brokers.iter().any(|broker| broker.messenger_id == messenger_id)
// Derives the underlying MessengerId from a Signature.
fn resolve_messenger_id(&self, signature: &Signature<A>) -> Result<MessengerId, Error> {
Ok(match signature {
Signature::Anonymous(id) => *id,
Signature::Address(address) => {
/// Internally routes a message to the next appropriate receiver. New messages
/// are routed based on the intended recipient(s), while replies follow the
/// return path of the source message. The provided sender id represents the
/// id of the current messenger possessing the message and not necessarily
/// the original author.
async fn send_to_next(
&mut self,
nonce: TracingNonce,
sender_id: MessengerId,
message: Message<P, A, R>,
) {
trace!(nonce, "send_to_next");
let mut recipients = vec![];
let message_type = message.get_type();
let mut require_delivery = false;
// Replies have a predetermined return path.
if let MessageType::Reply(source) = message_type {
// The original author of the reply will be the first participant after brokers in
// the reply's return path. Otherwise, identify current sender in the source return path
// and forward to next participant.
let source_return_path = source.get_return_path();
let mut target_index = None;
let source_return_path_messenger_ids: HashSet<MessengerId> =
source_return_path.iter().map(|beacon| beacon.get_messenger_id()).collect();
// Identify participating brokers. This brokers must:
// 1. Not be already participating in the return path (with a spawned observer)
// 2. Not be the author of the reply.
// 3. Have a matching filter.
let broker_ids: Vec<_> = self
.filter(|broker| {
&& self
.map_or(true, |id| id != broker.messenger_id)
&& broker.filter.as_ref().map_or(true, |filter| filter.matches(&message))
.map(|broker| broker.messenger_id)
let mut return_path: Vec<Beacon<P, A, R>> = broker_ids
.map(|broker_id| {
self.beacons.get(&broker_id).expect("beacon should resolve").clone()
// The return path places the participating brokers before the participants from the
// source message's reply path.
let last_index = return_path.len() - 1;
if self.is_broker(sender_id) && !source_return_path_messenger_ids.contains(&sender_id) {
// If the sender is in the return path as a broker and not a participant in the
// source message's return path, determine next broker to forward to.
let mut candidate_index = self
.position(|broker| broker.messenger_id == sender_id)
.expect("broker should be found")
+ 1;
// A candidate broker is one that is after the sending broker, has a filter
// matching the current message, and is not in the return path already.
while candidate_index < self.brokers.len() && target_index.is_none() {
target_index = broker_ids.iter().position(|broker_id| {
*broker_id == self.brokers[candidate_index].messenger_id
&& !source_return_path_messenger_ids.contains(&broker_id)
candidate_index += 1;
// If we can't find a next broker, we should skip over those considered.
if target_index.is_none() {
target_index = Some(broker_ids.len());
} else if sender_id == message.get_return_path()[0].get_messenger_id()
&& !matches!(message.get_attribution(), Attribution::Derived(..))
// If this is the reply's original author, send to the first
// messenger in the original message's return path.
target_index = Some(0);
// Mark source message as delivered. In the case the sender is the
// original intended audience, this will be a no-op. However, if the
// reply comes beforehand, this will ensure the message is properly
// acknowledged.
} else {
for (index, beacon) in return_path.iter().enumerate().take(last_index) {
if beacon.get_messenger_id() == sender_id {
target_index = Some(index + 1);
if let Some(index) = target_index {
if index == last_index {
// Ack current message if being sent to intended recipient.
} else if let Some(beacon) = self.beacons.get(&sender_id) {
let author_id = self
.expect("messenger should be present");
// If the message is not a reply, determine if the current sender is a broker.
// In the case of a broker, the message should be forwarded to the next
// broker.
let mut target_messengers: Vec<_> = {
// The author cannot participate as a broker.
let iter = self
.filter(|&broker| {
broker.messenger_id != author_id
&& broker
.map_or(true, |filter| filter.matches(&message))
.map(|broker| broker.messenger_id);
let should_find_sender = {
let beacon_messenger_id = beacon.get_messenger_id();
beacon_messenger_id != author_id && self.is_broker(beacon_messenger_id)
if should_find_sender {
// Ignore until we find the matching broker, then move the next broker one over.
iter.skip_while(|&id| id != sender_id).skip(1).take(1).collect()
} else {
// If no broker was added, the original target now should participate.
if target_messengers.is_empty() {
if let MessageType::Origin(audience) = message_type {
if let Ok((resolved_messengers, delivery_required)) =
self.resolve_audience(sender_id, &audience)
target_messengers.append(&mut Vec::from_iter(resolved_messengers));
require_delivery |= delivery_required;
} else {
// This error will occur if the sender specifies a non-existent
// address.
if audience.contains(&Audience::Broadcast) {
// Broadcasts don't require any audience.
// Translate selected messengers into beacon
for messenger in target_messengers {
if let Some(beacon) = self.beacons.get(&messenger) {
let mut successful_delivery = None;
// Send message to each specified recipient.
for recipient in recipients {
if recipient.deliver(message.clone(), self.next_message_client_id).await.is_ok() {
self.next_message_client_id += 1;
if successful_delivery.is_none() {
successful_delivery = Some(true);
} else {
successful_delivery = Some(false);
if require_delivery {
.report_status(if let Some(true) = successful_delivery {
} else {
/// Resolves the audience into a set of MessengerIds. Also returns whether
/// delivery is required (broadcasts for example don't require delivery
/// confirmation). If there is an issue resolving an audience, an error
/// is returned. Errors should halt any further processing on the audience
/// set.
fn resolve_audience(
sender_id: MessengerId,
audience: &Audience<A, R>,
) -> Result<(HashSet<MessengerId>, bool), anyhow::Error> {
let mut return_set = HashSet::new();
let mut delivery_required = false;
match audience {
Audience::Address(address) => {
delivery_required = true;
if let Some(&messenger_id) = self.addresses.get(&address) {
let _ = return_set.insert(messenger_id);
} else {
return Err(format_err!("could not resolve address"));
Audience::Role(role) => {
if let Some(messengers) = self.roles.get(role) {
Audience::Group(group) => {
for audience in &group.audiences {
let resolved_audience = self.resolve_audience(sender_id, &audience)?;
delivery_required |= resolved_audience.1;
Audience::Messenger(signature) => {
delivery_required = true;
match signature {
Signature::Address(address) => {
if let Some(&messenger_id) = self.addresses.get(&address) {
let _ = return_set.insert(messenger_id);
} else {
return Err(format_err!("could not resolve signature"));
Signature::Anonymous(id) => {
let _ = return_set.insert(*id);
Audience::Broadcast => {
// Gather all messengers
for &id in self.beacons.keys() {
if id != sender_id && !self.is_broker(id) {
let _ = return_set.insert(id);
Ok((return_set, delivery_required))
async fn process_messenger_request(
&mut self,
nonce: TracingNonce,
action: MessengerAction<P, A, R>,
) {
match action {
MessengerAction::Create(messenger_descriptor, responder, messenger_tx) => {
"process messenger request create",
"messenger_type" => format!("{:?}", messenger_descriptor.messenger_type).as_str()
let mut optional_address = None;
if let MessengerType::Addressable(address) = messenger_descriptor.messenger_type {
if self.addresses.contains_key(&address) {
// Ignore the result since an error would imply the other side is already
// closed.
let _ = responder.send(Err(MessageError::AddressConflict { address }));
optional_address = Some(address);
let id = self.next_id;
let signature = if let Some(address) = optional_address {
} else {
let messenger =
Messenger::new(Fingerprint { id, signature }, self.action_tx.clone());
// Create fuse to delete Messenger.
let fuse = ActionFuseBuilder::new()
.add_action(Box::new(move || {
// Do not send deletion request if other side is closed.
if messenger_tx.is_closed() {
// ActionFuse drop method might cause the send failed.
.unwrap_or_else(|_| {
"messenger_tx failed to send delete action for signature: {:?}",
self.next_id += 1;
let (beacon, receptor) =
let _ = self.beacons.insert(id, beacon);
match messenger_descriptor.messenger_type {
MessengerType::Broker(filter) => {
self.brokers.push(Broker { messenger_id: id, filter });
MessengerType::Addressable(address) => {
let _ = self.addresses.insert(address, id);
MessengerType::Unbound => {
// We do not track Unbounded messengers.
// Track roles
for role in &messenger_descriptor.roles {
let _ = self.roles.entry(*role).or_insert_with(HashSet::new).insert(id);
// Update descriptor mapping and role records
let _ = self.messengers.insert(id, messenger_descriptor.roles);
let response_result =
responder.send(Ok((MessengerClient::new(messenger, fuse), receptor)));
if let Err(_) = response_result {
// TODO( Track whether this is common, if so, bubble the error
// up.
"Receiving end of oneshot closed while trying to create messenger client \
for client with id: {}",
MessengerAction::CheckPresence(signature, responder) => {
trace!(nonce, "process messenger request check presence");
let _ = responder.send(Ok(self.resolve_messenger_id(&signature).is_ok()));
MessengerAction::DeleteBySignature(signature) => {
trace!(nonce, "process messenger request delete");
fn delete_by_signature(&mut self, signature: Signature<A>) {
let id = self.resolve_messenger_id(&signature).expect("messenger should be present");
// Clean up roles
if let Some(roles) = self.messengers.remove(&id) {
for role in roles {
// Remove messenger from each role it belongs to. Remove the
// role as well if it no longer has any members.
if let Some(messengers) = self.roles.get_mut(&role) {
let _ = messengers.remove(&id);
if messengers.is_empty() {
let _ = self.roles.remove(&role);
// These are all safe if the containers don't contain any items matching `id`.
let _ = self.beacons.remove(&id);
self.brokers.retain(|broker| id != broker.messenger_id);
if let Signature::Address(address) = signature {
let _ = self.addresses.remove(&address);
// Translates messenger requests into actions upon the MessageHub.
async fn process_request(
&mut self,
nonce: TracingNonce,
fingerprint: Fingerprint<A>,
action: MessageAction<P, A, R>,
beacon: Option<Beacon<P, A, R>>,
) {
let (mut outgoing_message, _guard) = match action {
MessageAction::Send(payload, message_type, timestamp) => {
let guard = trace_guard!(
"process request send",
"payload" => format!("{:?}", payload).as_str()
(Message::new(fingerprint, timestamp, payload, message_type), guard)
MessageAction::Forward(forwarded_message) => {
let guard = trace_guard!(nonce, "process request forward");
if let Some(beacon) = self.beacons.get(& {
match forwarded_message.get_type() {
MessageType::Origin(audience) => {
// Can't forward messages meant for forwarder
if Audience::Messenger(fingerprint.signature) == *audience {
// Ignore forward requests from leafs in broadcast
if !self.is_broker(beacon.get_messenger_id()) {
MessageType::Reply(source) => {
if let Some(recipient) = source.get_return_path().last() {
// If the reply recipient drops the message, do not forward.
if recipient.get_messenger_id() == {
} else {
// Every reply should have a return path.
} else {
(forwarded_message, guard)
if let Some(handle) = beacon {
self.send_to_next(nonce,, outgoing_message).await;
fn process_role_request(
&mut self,
nonce: TracingNonce,
action: role::Action<R>,
) -> Result<(), Error> {
trace!(nonce, "process role request");
match action {
// Handle creating a new role. Currently there is no way this call
// can fail. A signature for the generated role is handed back
// through the provided response sender.
role::Action::Create(result_sender) => result_sender
.map(|_| ())
.map_err(|_| Error::ResponseSendFail("create role".into())),