blob: c2b4a4108fecb150be79249282bdc1dcfbf99eca [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::action_fuse::ActionFuseHandle;
use crate::message::base::{Address, MessageEvent, Payload, Role, Signature, Status};
use crate::message::message_client::MessageClient;
use anyhow::{format_err, Error};
use futures::channel::mpsc::UnboundedReceiver;
use futures::task::{Context, Poll};
use futures::Stream;
use futures::StreamExt;
use std::pin::Pin;
type EventReceiver<P, A, R> = UnboundedReceiver<MessageEvent<P, A, R>>;
/// A Receptor is a wrapper around a channel dedicated towards either receiving
/// top-level messages delivered to the recipient's address or replies to a
/// message the recipient sent previously. Receptors are always paired with a
/// Beacon.
///
/// Clients interact with the Receptor similar to a Receiver, waiting on a new
/// MessageEvent via the watch method.
pub struct Receptor<P: Payload + 'static, A: Address + 'static, R: Role + 'static> {
signature: Signature<A>,
event_rx: EventReceiver<P, A, R>,
// Fuse to be triggered when all receptors go out of scope.
_fuse: ActionFuseHandle,
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Stream for Receptor<P, A, R> {
type Item = MessageEvent<P, A, R>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.event_rx.poll_next_unpin(cx)
}
}
impl<P: Payload + 'static, A: Address + 'static, R: Role + 'static> Receptor<P, A, R> {
pub(super) fn new(
signature: Signature<A>,
event_rx: EventReceiver<P, A, R>,
fuse: ActionFuseHandle,
) -> Self {
Self { signature, event_rx, _fuse: fuse }
}
/// Returns the signature associated the top level messenger associated with
/// this receptor.
pub fn get_signature(&self) -> Signature<A> {
self.signature
}
/// Returns the next pending payload, returning an Error if the origin
/// message (if any) was not deliverable or another error was encountered.
pub async fn next_payload(&mut self) -> Result<(P, MessageClient<P, A, R>), Error> {
while let Some(event) = self.next().await {
match event {
MessageEvent::Message(payload, client) => {
return Ok((payload, client));
}
MessageEvent::Status(Status::Undeliverable) => {
return Err(format_err!("origin message not delivered"));
}
_ => {}
}
}
return Err(format_err!("could not retrieve payload"));
}
pub async fn wait_for_acknowledge(&mut self) -> Result<(), Error> {
while let Some(event) = self.next().await {
match event {
MessageEvent::Status(Status::Acknowledged) => {
return Ok(());
}
MessageEvent::Status(Status::Undeliverable) => {
return Err(format_err!("origin message not delivered"));
}
_ => {}
}
}
return Err(format_err!("did not encounter acknowledged status"));
}
// Used to consume receptor.
pub fn ack(self) {}
}
/// Extracts the payload from a given `MessageEvent`. Such event is provided
/// in an optional argument to match the return value from `Receptor` stream.
pub fn extract_payload<P: Payload + 'static, A: Address + 'static, R: Role + 'static>(
event: Option<MessageEvent<P, A, R>>,
) -> Option<P> {
if let Some(MessageEvent::Message(payload, _)) = event {
Some(payload)
} else {
None
}
}