| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::message::action_fuse::ActionFuseBuilder; |
| use crate::message::action_fuse::ActionFuseHandle; |
| use crate::message::base::{ |
| filter::Filter, messenger, role, ActionSender, Address, Attribution, Audience, Fingerprint, |
| Message, MessageAction, MessageClientId, MessageError, MessageType, MessengerAction, |
| MessengerId, MessengerType, Payload, Role, Signature, Status, |
| }; |
| use crate::message::beacon::{Beacon, BeaconBuilder}; |
| use crate::message::messenger::{Messenger, MessengerClient, MessengerFactory}; |
| use anyhow::format_err; |
| use fuchsia_async as fasync; |
| use fuchsia_syslog::fx_log_err; |
| use futures::lock::Mutex; |
| use futures::StreamExt; |
| use std::borrow::Cow; |
| use std::collections::{HashMap, HashSet}; |
| use std::iter::FromIterator; |
| use std::sync::Arc; |
| |
| /// Type definition for a handle to the MessageHub. There is a single instance |
| /// of a hub per communication ecosystem and therefore held behind an Arc mutex. |
| pub type MessageHubHandle<P, A, R> = Arc<Mutex<MessageHub<P, A, R>>>; |
| |
| /// Type definition for exit message sender. |
| type ExitSender = futures::channel::mpsc::UnboundedSender<()>; |
| |
| #[derive(thiserror::Error, Debug, Clone)] |
| pub enum Error { |
| #[error("Failed to send response for operation: {0:?}")] |
| ResponseSendFail(Cow<'static, str>), |
| #[error("Messenger not present")] |
| MessengerNotFound, |
| } |
| |
| /// `Broker` captures the information necessary to process messages to a broker. |
| #[derive(Clone)] |
| struct Broker<P: Payload + 'static, A: Address + 'static, R: Role + 'static> { |
| /// The `MessengerId` associated with the broker so that it can be distinguished from other |
| /// messengers. |
| messenger_id: MessengerId, |
| /// A condition that is applied to a message to determine whether it should be directed to the |
| /// broker. |
| filter: Option<Filter<P, A, R>>, |
| } |
| |
| impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> PartialEq for Broker<P, A, R> { |
| fn eq(&self, other: &Self) -> bool { |
| // Since each broker has a unique [`MessengerId`], it is implied that any brokers that share |
| // the same [`MessengerId`] are the same, having matching filters as well. |
| self.messenger_id == other.messenger_id |
| } |
| } |
| |
| /// The MessageHub controls the message flow for a set of messengers. It |
| /// processes actions upon messages, incorporates brokers, and signals receipt |
| /// of messages. |
| pub struct MessageHub<P: Payload + 'static, A: Address + 'static, R: Role + 'static> { |
| /// A sender given to messengers to signal actions upon the MessageHub. |
| action_tx: ActionSender<P, A, R>, |
| /// Address mapping for looking up messengers. Used for sending messages |
| /// to an addressable recipient. |
| addresses: HashMap<A, MessengerId>, |
| /// MessengerId mapping to roles. This mapping allows the `MessageHub` |
| /// to remove a messenger from role memberships upon removal. |
| messengers: HashMap<MessengerId, messenger::Roles<R>>, |
| /// Role mapping for looking up which messengers belong to a particular |
| /// role. |
| roles: HashMap<role::Signature<R>, HashSet<MessengerId>>, |
| /// Mapping of registered messengers (including brokers) to beacons. Used for |
| /// delivering messages from a resolved address or a list of participants. |
| beacons: HashMap<MessengerId, Beacon<P, A, R>>, |
| /// An ordered set of messengers who will be forwarded messages. |
| brokers: Vec<Broker<P, A, R>>, |
| /// The next id to be given to a messenger. |
| next_id: MessengerId, |
| /// The next id to be given to a `MessageClient`. |
| next_message_client_id: MessageClientId, |
| /// Indicates whether the messenger channel has closed. |
| messenger_channel_closed: bool, |
| /// Generator for creating new roles. |
| role_generator: role::Generator, |
| /// Sender to signal when the hub should exit. |
| exit_tx: ExitSender, |
| } |
| |
| impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> MessageHub<P, A, R> { |
| /// Returns a new MessageHub for the given types. |
| pub fn create(fuse: Option<ActionFuseHandle>) -> MessengerFactory<P, A, R> { |
| let (action_tx, mut action_rx) = futures::channel::mpsc::unbounded::<( |
| Fingerprint<A>, |
| MessageAction<P, A, R>, |
| Option<Beacon<P, A, R>>, |
| )>(); |
| let (messenger_tx, mut messenger_rx) = |
| futures::channel::mpsc::unbounded::<MessengerAction<P, A, R>>(); |
| |
| // A channel used by the MessageHub to listen to requests for |
| // role-related actions. |
| let (role_tx, mut role_rx) = futures::channel::mpsc::unbounded::<role::Action<R>>(); |
| |
| let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>(); |
| |
| let mut hub = MessageHub { |
| next_id: 0, |
| next_message_client_id: 0, |
| action_tx, |
| beacons: HashMap::new(), |
| addresses: HashMap::new(), |
| messengers: HashMap::new(), |
| roles: HashMap::new(), |
| brokers: Vec::new(), |
| messenger_channel_closed: false, |
| role_generator: role::Generator::new(), |
| exit_tx, |
| }; |
| |
| fasync::Task::spawn(async move { |
| // Released when exiting scope to signal completion. |
| let _scope_fuse = fuse; |
| |
| loop { |
| // We must prioritize the action futures. Exit actions |
| // take absolute priority. Message actions are ordered before |
| // messenger in case the messenger is subsequently deleted. |
| futures::select_biased! { |
| _ = exit_rx.next() => { |
| break; |
| } |
| message_action = action_rx.select_next_some() => { |
| let (fingerprint, action, beacon) = message_action; |
| hub.process_request(fingerprint, action, beacon).await; |
| } |
| messenger_action = messenger_rx.next() => { |
| match messenger_action { |
| Some(action) => { |
| hub.process_messenger_request(action).await; |
| } |
| None => { |
| hub.messenger_channel_closed = true; |
| hub.check_exit(); |
| } |
| } |
| } |
| role_action = role_rx.select_next_some() => { |
| if hub.process_role_request(role_action).is_err() { |
| fx_log_err!("failed to process role action"); |
| } |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| MessengerFactory::new(messenger_tx, role_tx) |
| } |
| |
| fn check_exit(&self) { |
| if self.messenger_channel_closed && self.beacons.is_empty() { |
| self.exit_tx.unbounded_send(()).ok(); |
| } |
| } |
| |
| // Determines whether the beacon belongs to a broker. |
| fn is_broker(&self, messenger_id: MessengerId) -> bool { |
| self.brokers.iter().any(|broker| broker.messenger_id == messenger_id) |
| } |
| |
| // Derives the underlying MessengerId from a Signature. |
| fn resolve_messenger_id(&self, signature: &Signature<A>) -> Result<MessengerId, Error> { |
| Ok(match signature { |
| Signature::Anonymous(id) => *id, |
| Signature::Address(address) => { |
| *self.addresses.get(&address).ok_or(Error::MessengerNotFound)? |
| } |
| }) |
| } |
| |
| /// Internally routes a message to the next appropriate receiver. New messages |
| /// are routed based on the intended recipient(s), while replies follow the |
| /// return path of the source message. The provided sender id represents the |
| /// id of the current messenger possessing the message and not necessarily |
| /// the original author. |
| async fn send_to_next(&mut self, sender_id: MessengerId, message: Message<P, A, R>) { |
| let mut recipients = vec![]; |
| |
| let message_type = message.get_type(); |
| |
| let mut require_delivery = false; |
| |
| // Replies have a predetermined return path. |
| if let MessageType::Reply(source) = message_type { |
| // The original author of the reply will be the first participant after brokers in |
| // the reply's return path. Otherwise, identify current sender in the source return path |
| // and forward to next participant. |
| let source_return_path = source.get_return_path(); |
| let mut target_index = None; |
| |
| let source_return_path_messenger_ids: HashSet<MessengerId> = |
| source_return_path.iter().map(|beacon| beacon.get_messenger_id()).collect(); |
| |
| // Identify participating brokers. This brokers must: |
| // 1. Not be already participating in the return path (with a spawned observer) |
| // 2. Not be the author of the reply. |
| // 3. Have a matching filter. |
| let broker_ids: Vec<_> = self |
| .brokers |
| .iter() |
| .filter(|broker| { |
| !source_return_path_messenger_ids.contains(&broker.messenger_id) |
| && self |
| .resolve_messenger_id(&message.get_author()) |
| .map_or(true, |id| id != broker.messenger_id) |
| && broker.filter.as_ref().map_or(true, |filter| filter.matches(&message)) |
| }) |
| .map(|broker| broker.messenger_id) |
| .collect(); |
| |
| let mut return_path: Vec<Beacon<P, A, R>> = broker_ids |
| .iter() |
| .map(|broker_id| { |
| self.beacons.get(&broker_id).expect("beacon should resolve").clone() |
| }) |
| .collect(); |
| |
| // The return path places the participating brokers before the participants from the |
| // source message's reply path. |
| return_path.extend(source_return_path.iter().cloned()); |
| let last_index = return_path.len() - 1; |
| |
| if self.is_broker(sender_id) && !source_return_path_messenger_ids.contains(&sender_id) { |
| // If the sender is in the return path as a broker and not a participant in the |
| // source message's return path, determine next broker to forward to. |
| let mut candidate_index = self |
| .brokers |
| .iter() |
| .position(|broker| broker.messenger_id == sender_id) |
| .expect("broker should be found") |
| + 1; |
| |
| // A candidate broker is one that is after the sending broker, has a filter |
| // matching the current message, and is not in the return path already. |
| while candidate_index < self.brokers.len() && target_index.is_none() { |
| target_index = broker_ids.iter().position(|broker_id| { |
| *broker_id == self.brokers[candidate_index].messenger_id |
| && !source_return_path_messenger_ids.contains(&broker_id) |
| }); |
| |
| candidate_index += 1; |
| } |
| |
| // If we can't find a next broker, we should skip over those considered. |
| if target_index.is_none() { |
| target_index = Some(broker_ids.len()); |
| } |
| } else if sender_id == message.get_return_path()[0].get_messenger_id() |
| && !matches!(message.get_attribution(), Attribution::Derived(..)) |
| { |
| // If this is the reply's original author, send to the first |
| // messenger in the original message's return path. |
| target_index = Some(0); |
| |
| // Mark source message as delivered. In the case the sender is the |
| // original intended audience, this will be a no-op. However, if the |
| // reply comes beforehand, this will ensure the message is properly |
| // acknowledged. |
| source.report_status(Status::Received).await; |
| } else { |
| for (index, beacon) in return_path.iter().enumerate().take(last_index) { |
| if beacon.get_messenger_id() == sender_id { |
| target_index = Some(index + 1); |
| } |
| } |
| } |
| |
| if let Some(index) = target_index { |
| recipients.push(return_path[index].clone()); |
| |
| if index == last_index { |
| // Ack current message if being sent to intended recipient. |
| message.report_status(Status::Received).await; |
| } |
| } |
| } else if let Some(beacon) = self.beacons.get(&sender_id) { |
| let author_id = self |
| .resolve_messenger_id(&message.get_author()) |
| .expect("messenger should be present"); |
| |
| // If the message is not a reply, determine if the current sender is a broker. |
| // In the case of a broker, the message should be forwarded to the next |
| // broker. |
| let mut target_messengers: Vec<_> = { |
| // The author cannot participate as a broker. |
| let iter = self |
| .brokers |
| .iter() |
| .filter(|&broker| { |
| broker.messenger_id != author_id |
| && broker |
| .filter |
| .as_ref() |
| .map_or(true, |filter| filter.matches(&message)) |
| }) |
| .map(|broker| broker.messenger_id); |
| |
| let should_find_sender = { |
| let beacon_messenger_id = beacon.get_messenger_id(); |
| beacon_messenger_id != author_id && self.is_broker(beacon_messenger_id) |
| }; |
| |
| if should_find_sender { |
| // Ignore until we find the matching broker, then move the next broker one over. |
| iter.skip_while(|&id| id != sender_id).skip(1).take(1).collect() |
| } else { |
| iter.take(1).collect() |
| } |
| }; |
| |
| // If no broker was added, the original target now should participate. |
| if target_messengers.is_empty() { |
| if let MessageType::Origin(audience) = message_type { |
| if let Ok((resolved_messengers, delivery_required)) = |
| self.resolve_audience(sender_id, &audience) |
| { |
| target_messengers.append(&mut Vec::from_iter(resolved_messengers)); |
| require_delivery |= delivery_required; |
| } else { |
| // This error will occur if the sender specifies a non-existent |
| // address. |
| message.report_status(Status::Undeliverable).await; |
| } |
| |
| if audience.contains(&Audience::Broadcast) { |
| // Broadcasts don't require any audience. |
| message.report_status(Status::Broadcasted).await; |
| } |
| } |
| } |
| |
| // Translate selected messengers into beacon |
| for messenger in target_messengers { |
| if let Some(beacon) = self.beacons.get(&messenger) { |
| recipients.push(beacon.clone()); |
| } |
| } |
| } |
| |
| let mut successful_delivery = None; |
| // Send message to each specified recipient. |
| for recipient in recipients { |
| if recipient.deliver(message.clone(), self.next_message_client_id).await.is_ok() { |
| self.next_message_client_id += 1; |
| if successful_delivery.is_none() { |
| successful_delivery = Some(true); |
| } |
| } else { |
| successful_delivery = Some(false); |
| } |
| } |
| |
| if require_delivery { |
| message |
| .report_status(if let Some(true) = successful_delivery { |
| Status::Received |
| } else { |
| Status::Undeliverable |
| }) |
| .await; |
| } |
| } |
| |
| /// Resolves the audience into a set of MessengerIds. Also returns whether |
| /// delivery is required (broadcasts for example don't require delivery |
| /// confirmation). If there is an issue resolving an audience, an error |
| /// is returned. Errors should halt any further processing on the audience |
| /// set. |
| fn resolve_audience( |
| &self, |
| sender_id: MessengerId, |
| audience: &Audience<A, R>, |
| ) -> Result<(HashSet<MessengerId>, bool), anyhow::Error> { |
| let mut return_set = HashSet::new(); |
| let mut delivery_required = false; |
| |
| match audience { |
| Audience::Address(address) => { |
| delivery_required = true; |
| if let Some(&messenger_id) = self.addresses.get(&address) { |
| return_set.insert(messenger_id); |
| } else { |
| return Err(format_err!("could not resolve address")); |
| } |
| } |
| Audience::Role(role) => { |
| if let Some(messengers) = self.roles.get(role) { |
| return_set.extend(messengers); |
| } |
| } |
| Audience::Group(group) => { |
| for audience in &group.audiences { |
| let resolved_audience = self.resolve_audience(sender_id, &audience)?; |
| return_set.extend(resolved_audience.0); |
| delivery_required |= resolved_audience.1; |
| } |
| } |
| Audience::Messenger(signature) => { |
| delivery_required = true; |
| match signature { |
| Signature::Address(address) => { |
| if let Some(&messenger_id) = self.addresses.get(&address) { |
| return_set.insert(messenger_id); |
| } else { |
| return Err(format_err!("could not resolve signature")); |
| } |
| } |
| Signature::Anonymous(id) => { |
| return_set.insert(*id); |
| } |
| } |
| } |
| Audience::Broadcast => { |
| // Gather all messengers |
| for &id in self.beacons.keys() { |
| if id != sender_id && !self.is_broker(id) { |
| return_set.insert(id); |
| } |
| } |
| } |
| } |
| |
| Ok((return_set, delivery_required)) |
| } |
| |
| async fn process_messenger_request(&mut self, action: MessengerAction<P, A, R>) { |
| match action { |
| MessengerAction::Create(messenger_descriptor, responder, messenger_tx) => { |
| let mut optional_address = None; |
| if let MessengerType::Addressable(address) = messenger_descriptor.messenger_type { |
| if self.addresses.contains_key(&address) { |
| responder.send(Err(MessageError::AddressConflict { address })).ok(); |
| return; |
| } |
| optional_address = Some(address); |
| } |
| |
| let id = self.next_id; |
| let signature = if let Some(address) = optional_address { |
| Signature::Address(address) |
| } else { |
| Signature::Anonymous(id) |
| }; |
| |
| let messenger = |
| Messenger::new(Fingerprint { id, signature }, self.action_tx.clone()); |
| |
| // Create fuse to delete Messenger. |
| let fuse = ActionFuseBuilder::new() |
| .add_action(Box::new(move || { |
| messenger_tx |
| .unbounded_send(MessengerAction::DeleteBySignature(signature)) |
| .ok(); |
| })) |
| .build(); |
| |
| self.next_id += 1; |
| let (beacon, receptor) = |
| BeaconBuilder::new(messenger.clone()).add_fuse(Arc::clone(&fuse)).build(); |
| self.beacons.insert(id, beacon); |
| |
| match messenger_descriptor.messenger_type { |
| MessengerType::Broker(filter) => { |
| self.brokers.push(Broker { messenger_id: id, filter }); |
| } |
| MessengerType::Addressable(address) => { |
| self.addresses.insert(address, id); |
| } |
| MessengerType::Unbound => { |
| // We do not track Unbounded messengers. |
| } |
| } |
| |
| // Track roles |
| for role in &messenger_descriptor.roles { |
| self.roles.entry(*role).or_insert_with(HashSet::new).insert(id); |
| } |
| |
| // Update descriptor mapping and role records |
| self.messengers.insert(id, messenger_descriptor.roles); |
| |
| responder.send(Ok((MessengerClient::new(messenger, fuse), receptor))).ok(); |
| } |
| MessengerAction::CheckPresence(signature, responder) => { |
| responder.send(Ok(self.resolve_messenger_id(&signature).is_ok())).ok(); |
| } |
| MessengerAction::DeleteBySignature(signature) => self.delete_by_signature(signature), |
| } |
| } |
| |
| fn delete_by_signature(&mut self, signature: Signature<A>) { |
| let id = self.resolve_messenger_id(&signature).expect("messenger should be present"); |
| |
| // Clean up roles |
| if let Some(roles) = self.messengers.remove(&id) { |
| for role in roles { |
| // Remove messenger from each role it belongs to. Remove the |
| // role as well if it no longer has any members. |
| if let Some(messengers) = self.roles.get_mut(&role) { |
| messengers.remove(&id); |
| |
| if messengers.is_empty() { |
| self.roles.remove(&role); |
| } |
| } |
| } |
| } |
| |
| // These are all safe if the containers don't contain any items matching `id`. |
| self.beacons.remove(&id); |
| self.brokers.retain(|broker| id != broker.messenger_id); |
| if let Signature::Address(address) = signature { |
| self.addresses.remove(&address); |
| } |
| |
| self.check_exit(); |
| } |
| |
| // Translates messenger requests into actions upon the MessageHub. |
| async fn process_request( |
| &mut self, |
| fingerprint: Fingerprint<A>, |
| action: MessageAction<P, A, R>, |
| beacon: Option<Beacon<P, A, R>>, |
| ) { |
| let mut outgoing_message = match action { |
| MessageAction::Send(payload, message_type, timestamp) => { |
| Message::new(fingerprint, timestamp, payload, message_type) |
| } |
| MessageAction::Forward(forwarded_message) => { |
| if let Some(beacon) = self.beacons.get(&fingerprint.id) { |
| match forwarded_message.get_type() { |
| MessageType::Origin(audience) => { |
| // Can't forward messages meant for forwarder |
| if Audience::Messenger(fingerprint.signature) == *audience { |
| return; |
| } |
| // Ignore forward requests from leafs in broadcast |
| if !self.is_broker(beacon.get_messenger_id()) { |
| return; |
| } |
| } |
| MessageType::Reply(source) => { |
| if let Some(recipient) = source.get_return_path().last() { |
| // If the reply recipient drops the message, do not forward. |
| if recipient.get_messenger_id() == fingerprint.id { |
| return; |
| } |
| } else { |
| // Every reply should have a return path. |
| forwarded_message.report_status(Status::Undeliverable).await; |
| return; |
| } |
| } |
| } |
| } else { |
| forwarded_message.report_status(Status::Undeliverable).await; |
| return; |
| } |
| forwarded_message |
| } |
| }; |
| |
| if let Some(handle) = beacon { |
| outgoing_message.add_participant(handle); |
| } |
| |
| self.send_to_next(fingerprint.id, outgoing_message).await; |
| } |
| |
| fn process_role_request(&mut self, action: role::Action<R>) -> Result<(), Error> { |
| match action { |
| // Handle creating a new role. Currently there is no way this call |
| // can fail. A signature for the generated role is handed back |
| // through the provided response sender. |
| role::Action::Create(result_sender) => result_sender |
| .send(Ok(role::Response::Role(self.role_generator.generate()))) |
| .map(|_| ()) |
| .map_err(|_| Error::ResponseSendFail("create role".into())), |
| } |
| } |
| } |