blob: 54464d1c4a56e73cd747485814f9348f63739dfb [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::agent::{AgentError, BlueprintHandle, Context, Invocation, Lifespan, Payload};
use crate::base::SettingType;
use crate::message::base::{Audience, MessengerType};
use crate::monitor;
use crate::policy::PolicyType;
use crate::service;
use crate::service_context::ServiceContext;
use anyhow::{format_err, Context as _, Error};
use std::collections::HashSet;
use std::sync::Arc;
/// Authority provides the ability to execute agents sequentially or simultaneously for a given
/// stage.
pub(crate) struct Authority {
// This is a list of pairs of debug ids and agent addresses.
agent_signatures: Vec<(&'static str, service::message::Signature)>,
// Factory passed to agents for communicating with the service.
delegate: service::message::Delegate,
// Messenger
messenger: service::message::Messenger,
// Available components
available_components: HashSet<SettingType>,
available_policies: HashSet<PolicyType>,
// Available resource monitors
resource_monitor_actor: Option<monitor::environment::Actor>,
}
impl Authority {
pub(crate) async fn create(
delegate: service::message::Delegate,
available_components: HashSet<SettingType>,
available_policies: HashSet<PolicyType>,
resource_monitor_actor: Option<monitor::environment::Actor>,
) -> Result<Authority, Error> {
let (client, _) = delegate
.create(MessengerType::Unbound)
.await
.map_err(|_| anyhow::format_err!("could not create agent messenger for authority"))?;
Ok(Authority {
agent_signatures: Vec::new(),
delegate,
messenger: client,
available_components,
available_policies,
resource_monitor_actor,
})
}
pub(crate) async fn register(&mut self, blueprint: BlueprintHandle) {
let agent_receptor = self
.delegate
.create(MessengerType::Unbound)
.await
.expect("agent receptor should be created")
.1;
let signature = agent_receptor.get_signature();
blueprint
.create(
Context::new(
agent_receptor,
self.delegate.clone(),
self.available_components.clone(),
self.available_policies.clone(),
self.resource_monitor_actor.clone(),
)
.await,
)
.await;
self.agent_signatures.push((blueprint.debug_id(), signature));
}
/// Invokes each registered agent for a given lifespan. If sequential is true,
/// invocations will only proceed to the next agent once the current
/// invocation has been successfully acknowledged. When sequential is false,
/// agents will receive their invocations without waiting. However, the
/// overall completion (signaled through the receiver returned by the method),
/// will not return until all invocations have been acknowledged.
pub(crate) async fn execute_lifespan(
&self,
lifespan: Lifespan,
service_context: Arc<ServiceContext>,
sequential: bool,
) -> Result<(), Error> {
let mut pending_receptors = Vec::new();
for &(debug_id, signature) in &self.agent_signatures {
let mut receptor = self
.messenger
.message(
Payload::Invocation(Invocation {
lifespan,
service_context: Arc::clone(&service_context),
})
.into(),
Audience::Messenger(signature),
)
.send();
if sequential {
let result = process_payload(debug_id, receptor.next_of::<Payload>().await);
if result.is_err() {
return result;
}
} else {
pending_receptors.push((debug_id, receptor));
}
}
// Pending acks should only be present for non sequential execution. In
// this case wait for each to complete.
for (debug_id, mut receptor) in pending_receptors {
let result = process_payload(debug_id, receptor.next_of::<Payload>().await);
if result.is_err() {
return result;
}
}
Ok(())
}
}
fn process_payload(
debug_id: &str,
payload: Result<(Payload, service::message::MessageClient), Error>,
) -> Result<(), Error> {
match payload {
Ok((Payload::Complete(Ok(_) | Err(AgentError::UnhandledLifespan)), _)) => Ok(()),
Ok((Payload::Complete(result), _)) => {
result.with_context(|| format!("Invocation failed for {:?}", debug_id))
}
Ok(_) => Err(format_err!("Unexpected result for {:?}", debug_id)),
Err(e) => Err(e).with_context(|| format!("Invocation failed {:?}", debug_id)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::Blueprint;
use crate::message::message_hub::MessageHub;
use assert_matches::assert_matches;
use fuchsia_async as fasync;
#[fasync::run_until_stalled(test)]
async fn test_log() {
struct TestAgentBlueprint;
impl Blueprint for TestAgentBlueprint {
fn debug_id(&self) -> &'static str {
"test_agent"
}
fn create(&self, context: Context) -> futures::future::BoxFuture<'static, ()> {
Box::pin(async move {
let _ = &context;
let mut receptor = context.receptor;
fasync::Task::spawn(async move {
while let Ok((Payload::Invocation(_), client)) =
receptor.next_of::<Payload>().await
{
client
.reply(Payload::Complete(Err(AgentError::UnexpectedError)).into())
.send()
.ack();
}
})
.detach();
})
}
}
let delegate = MessageHub::create(None);
let mut authority = Authority::create(delegate, HashSet::new(), HashSet::new(), None)
.await
.expect("Should be able to create authority");
let agent = TestAgentBlueprint;
authority.register(Arc::new(agent)).await;
let result = authority
.execute_lifespan(
Lifespan::Initialization,
Arc::new(ServiceContext::new(None, None)),
false,
)
.await;
assert_matches!(result, Err(e) if format!("{:?}", e).contains("test_agent"));
}
}