blob: f5cb21e4c24cb127b127da9edc67e7edd17e5f76 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::agent::base::{AgentError, Authority, BlueprintHandle, Context, Invocation, Lifespan};
use crate::internal::agent;
use crate::internal::event;
use crate::internal::switchboard;
use crate::message::base::{Audience, MessengerType};
use crate::service_context::ServiceContextHandle;
use crate::switchboard::base::SettingType;
use anyhow::{format_err, Error};
use async_trait::async_trait;
use std::collections::HashSet;
/// AuthorityImpl is the default implementation of the Authority trait. It
/// provides the ability to execute agents sequentially or simultaneously for a
/// given stage.
pub struct AuthorityImpl {
// A mapping of agent addresses
agent_signatures: Vec<agent::message::Signature>,
// Factory to generate messengers to comunicate with the agent
messenger_factory: agent::message::Factory,
// Factory passed to agents for communicating with the switchboard.
switchboard_messenger_factory: switchboard::message::Factory,
// Messenger
messenger: agent::message::Messenger,
// Factory to generate event messengers
event_factory: event::message::Factory,
// Available components
available_components: HashSet<SettingType>,
}
impl AuthorityImpl {
pub async fn create(
messenger_factory: agent::message::Factory,
switchboard_messenger_factory: switchboard::message::Factory,
event_factory: event::message::Factory,
available_components: HashSet<SettingType>,
) -> Result<AuthorityImpl, Error> {
let messenger_result = messenger_factory.create(MessengerType::Unbound).await;
if messenger_result.is_err() {
return Err(anyhow::format_err!("could not create agent messenger for authority"));
}
let (client, _) = messenger_result.unwrap();
return Ok(AuthorityImpl {
agent_signatures: Vec::new(),
messenger_factory,
switchboard_messenger_factory,
messenger: client,
event_factory,
available_components,
});
}
/// Invokes each registered agent for a given lifespan. If sequential is true,
/// invocations will only proceed to the next agent once the current
/// invocation has been successfully acknowledged. When sequential is false,
/// agents will receive their invocations without waiting. However, the
/// overall completion (signaled through the receiver returned by the method),
/// will not return until all invocations have been acknowledged.
pub async fn execute_lifespan(
&self,
lifespan: Lifespan,
service_context: ServiceContextHandle,
sequential: bool,
) -> Result<(), Error> {
let mut pending_receptors = Vec::new();
for signature in &self.agent_signatures {
let mut receptor = self
.messenger
.message(
agent::Payload::Invocation(Invocation {
lifespan: lifespan.clone(),
service_context: service_context.clone(),
}),
Audience::Messenger(signature.clone()),
)
.send();
if sequential {
let result = process_payload(receptor.next_payload().await);
if result.is_err() {
return result;
}
} else {
pending_receptors.push(receptor);
}
}
// Pending acks should only be present for non sequential execution. In
// this case wait for each to complete.
for mut receptor in pending_receptors {
let result = process_payload(receptor.next_payload().await);
if result.is_err() {
return result;
}
}
Ok(())
}
}
fn process_payload(
payload: Result<(agent::Payload, agent::message::Client), Error>,
) -> Result<(), Error> {
match payload {
Ok((agent::Payload::Complete(Ok(_)), _)) => Ok(()),
Ok((agent::Payload::Complete(Err(AgentError::UnhandledLifespan)), _)) => Ok(()),
_ => Err(format_err!("invocation failed")),
}
}
#[async_trait]
impl Authority for AuthorityImpl {
async fn register(&mut self, blueprint: BlueprintHandle) -> Result<(), Error> {
let create_result = self.messenger_factory.create(MessengerType::Unbound).await;
if create_result.is_err() {
return Err(format_err!("could not register"));
}
let (messenger, receptor) = create_result?;
let signature = messenger.get_signature();
blueprint
.create(
Context::new(
receptor,
blueprint.get_descriptor(),
self.switchboard_messenger_factory.clone(),
self.event_factory.clone(),
self.available_components.clone(),
)
.await,
)
.await;
self.agent_signatures.push(signature);
Ok(())
}
}