blob: 99bcfe996ef3a46284bc3ccd030269f79b304756 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
use anyhow::{format_err, Error};
use fuchsia_async as fasync;
use fuchsia_inspect::{self as inspect, component, Property};
use fuchsia_syslog::fx_log_err;
use futures::StreamExt;
use crate::agent::{Context, Payload};
use crate::blueprint_definition;
use crate::clock;
use crate::handler::device_storage::DeviceStorageAccess;
use crate::message::base::{filter, role, MessageEvent, MessengerType};
use crate::policy::{self as policy_base, Payload as PolicyPayload, Request, Role};
use crate::service;
use crate::service::message::{Audience, MessageClient, Messenger, Signature};
const INSPECT_NODE_NAME: &str = "policy_values";
blueprint_definition!("inspect_policy", crate::agent::inspect_policy::InspectPolicyAgent::create);
/// An agent that listens in on messages sent on the message hub to policy handlers
/// to record their internal state to inspect.
pub struct InspectPolicyAgent {
messenger_client: Messenger,
inspect_node: inspect::Node,
policy_values: HashMap<&'static str, InspectPolicyInfo>,
/// Information about a policy to be written to inspect.
/// Inspect nodes and properties are not used, but need to be held as they're deleted from inspect
/// once they go out of scope.
struct InspectPolicyInfo {
/// Node of this info.
_node: inspect::Node,
/// Debug string representation of the state of this policy.
value: inspect::StringProperty,
/// Milliseconds since Unix epoch that this policy was modified.
timestamp: inspect::StringProperty,
impl DeviceStorageAccess for InspectPolicyAgent {
const STORAGE_KEYS: &'static [&'static str] = &[];
impl InspectPolicyAgent {
async fn create(context: Context) {
/// Create an agent to watch messages to the policy layer and record policy
/// state to the inspect child node. Agent starts immediately without
/// calling invocation, but acknowledges the invocation payload to
/// let the Authority know the agent starts properly.
pub async fn create_with_node(context: Context, inspect_node: inspect::Node) {
// We must take care not to observe all requests as the agent itself can
// issue messages and block on their response. If another message is
// passed to the agent during this wait, we will deadlock.
let (messenger_client, broker_receptor) = match context
filter::Condition::Custom(Arc::new(|message| {
matches!(message.payload(), service::Payload::Policy(PolicyPayload::Request(_)))
Ok(messenger) => messenger,
Err(err) => {
"broker listening to only policy requests could not be created: {:?}",
let mut agent = Self { messenger_client, inspect_node, policy_values: HashMap::new() };
fasync::Task::spawn(async move {
// Request initial values from all policy handlers.
let initial_get_receptor = agent
let initial_get_fuse = initial_get_receptor.fuse();
let broker_fuse = broker_receptor.fuse();
let agent_event = context.receptor.fuse();
futures::pin_mut!(initial_get_fuse, broker_fuse, agent_event);
loop {
futures::select! {
initial_get_message = initial_get_fuse.select_next_some() => {
// Received a reply to our initial broadcast to all policy handlers asking
// for their value.
intercepted_message = broker_fuse.select_next_some() => {
// Intercepted a policy request.
agent_message = agent_event.select_next_some() => {
if let MessageEvent::Message(
service::Payload::Agent(Payload::Invocation(_invocation)), client)
= agent_message {
// Since the agent runs at creation, there is no
// need to handle state here.
// This shouldn't ever be triggered since the inspect agent (and its receptors)
// should be active for the duration of the service. This is just a safeguard to
// ensure this detached task doesn't run forever if the receptors stop somehow.
complete => break,
/// Handles responses to the initial broadcast by the inspect agent to all policy handlers that
/// requests their state.
async fn handle_initial_get(&mut self, message: service::message::MessageEvent) {
if let MessageEvent::Message(payload, _) = message {
// Since the order for these events isn't guaranteed, don't overwrite responses obtained
// after intercepting a request with these initial values.
if let Err(err) = self.write_response_to_inspect(payload, true).await {
fx_log_err!("Failed write initial get response to inspect: {:?}", err);
/// Handles messages seen over the message hub and requests policy state from handlers as
/// needed.
async fn handle_intercepted_message(&mut self, message: service::message::MessageEvent) {
if let MessageEvent::Message(service::Payload::Policy(PolicyPayload::Request(_)), client) =
// When we see a request to a policy proxy, we assume that the policy will be modified,
// so we wait for the reply to get the signature of the proxy, then ask the proxy for
// its latest value.
match InspectPolicyAgent::watch_reply(client).await {
Ok(reply_signature) => {
if let Err(err) = self.request_and_write_to_inspect(reply_signature).await {
fx_log_err!("Failed request value from policy proxy: {:?}", err);
Err(err) => {
fx_log_err!("Failed to watch reply to request: {:?}", err);
/// Watches for the reply to a sent message and returns the author of the reply.
async fn watch_reply(mut client: MessageClient) -> Result<Signature, Error> {
let mut reply_receptor = client.spawn_observer();
reply_receptor.next_payload()|(_, reply_client)| reply_client.get_author())
/// Requests the policy state from a given signature for a policy handler and records the result
/// in inspect.
async fn request_and_write_to_inspect(&mut self, signature: Signature) -> Result<(), Error> {
// Send the request to the policy proxy.
let mut send_receptor = self
.message(PolicyPayload::Request(Request::Get).into(), Audience::Messenger(signature))
// Wait for a response from the policy proxy.
let (payload, _) = send_receptor.next_payload().await?;
self.write_response_to_inspect(payload, false).await
/// Writes a policy payload response to inspect.
/// ignore_if_present will silently not write the response to inspect if a value already exists
/// for the policy.
async fn write_response_to_inspect(
&mut self,
payload: service::Payload,
ignore_if_present: bool,
) -> Result<(), Error> {
let policy_info = if let Ok(PolicyPayload::Response(Ok(
))) = PolicyPayload::try_from(payload)
} else {
return Err(format_err!("did not receive policy state"));
// Convert the response to a string for inspect.
let (policy_name, inspect_str) = policy_info.for_inspect();
let timestamp = clock::inspect_format_now();
match self.policy_values.get_mut(policy_name) {
Some(policy_info) => {
if ignore_if_present {
// Value already present in inspect, ignore this response.
return Ok(());
// Value already known, just update its fields.
None => {
// Policy info not recorded yet, create a new inspect node.
let node = self.inspect_node.create_child(policy_name);
let value_prop = node.create_string("value", inspect_str);
let timestamp_prop = node.create_string("timestamp", timestamp.clone());
InspectPolicyInfo { _node: node, value: value_prop, timestamp: timestamp_prop },
mod tests {
use std::collections::HashSet;
use fuchsia_inspect as inspect;
use fuchsia_inspect::assert_inspect_tree;
use fuchsia_zircon::Time;
use futures::future::BoxFuture;
use futures::StreamExt;
use crate::service;
use crate::service::message::{create_hub, Audience};
use crate::agent::inspect_policy::{InspectPolicyAgent, INSPECT_NODE_NAME};
use crate::agent::Context;
use crate::audio::policy as audio_policy;
use crate::audio::policy::{PolicyId, StateBuilder, TransformFlags};
use crate::audio::types::AudioStreamType;
use crate::clock;
use crate::message::base::{role, MessageEvent, MessengerType, Status};
use crate::policy::{self as policy_base, Payload, PolicyInfo, Role, UnknownInfo};
use crate::tests::message_utils::verify_payload;
const GET_REQUEST: Payload = Payload::Request(policy_base::Request::Get);
async fn create_context() -> Context {
create_hub().create(MessengerType::Unbound).await.expect("should be present").1,
/// Verifies that inspect agent requests and writes state for each policy on start.
async fn test_write_policy_inspect_on_start() {
// Set the clock so that timestamps will always be 0.
let context = create_context().await;
// Create a receptor representing the policy proxy, with an appropriate role.
let (_, mut policy_receptor) = context
// Create the inspect agent.
let inspector = inspect::Inspector::new();
let inspect_node = inspector.root().create_child(INSPECT_NODE_NAME);
InspectPolicyAgent::create_with_node(context, inspect_node).await;
let expected_state = StateBuilder::new()
.add_property(AudioStreamType::Media, TransformFlags::TRANSFORM_MAX)
// Policy proxy receives a get request on start and returns the state.
let state_clone = expected_state.clone();
&mut policy_receptor,
Some(Box::new(|client| -> BoxFuture<'_, ()> {
Box::pin(async move {
let mut receptor = client
// Wait until the policy inspect agent receives the message and writes to
// inspect.
while let Some(event) = {
match event {
MessageEvent::Status(Status::Received) => {
_ => {}
// Inspect agent writes value to inspect.
assert_inspect_tree!(inspector, root: {
policy_values: {
"Audio": {
value: format!("{:?}", expected_state),
timestamp: "0.000000000",
/// Verifies that inspect agent intercepts policy requests and writes their values to inspect.
async fn test_write_inspect_on_changed() {
// Set the clock so that timestamps will always be 0.
let context = create_context().await;
// Create a receptor representing the policy proxy, with an appropriate role.
let (_, mut policy_receptor) = context
// Create a messenger on the policy message hub to send requests for the inspect agent to
// intercept.
let (policy_sender, _) =
// Create the inspect agent.
let inspector = inspect::Inspector::new();
let inspect_node = inspector.root().create_child(INSPECT_NODE_NAME);
InspectPolicyAgent::create_with_node(context, inspect_node).await;
// Starting state for audio policy.
let initial_state = StateBuilder::new()
.add_property(AudioStreamType::Media, TransformFlags::TRANSFORM_MAX)
// While this isn't a change in state that would happen in the real world, it's fine for
// testing.
let expected_state = StateBuilder::new()
.add_property(AudioStreamType::Background, TransformFlags::TRANSFORM_MIN)
// Policy proxy receives a get request on start and returns the initial state.
&mut policy_receptor,
Some(Box::new(|client| -> BoxFuture<'_, ()> {
Box::pin(async move {
// Send a message to the policy proxy. Inspect agent acts on any request and waits for a
// reply to know where to ask for the policy state so send a nonsensical request + reply.
let test_request: service::Payload = Payload::Request(policy_base::Request::Audio(
.message(test_request.clone(), Audience::Messenger(policy_receptor.get_signature()))
// Policy proxy receives a request from the policy_sender.
&mut policy_receptor,
Some(Box::new(|client| -> BoxFuture<'_, ()> {
Box::pin(async move {
// Policy proxy receives a get request from the inspect agent and returns the expected
// state.
let state_clone = expected_state.clone();
&mut policy_receptor,
Some(Box::new(|client| -> BoxFuture<'_, ()> {
Box::pin(async move {
let mut receptor = client
// Wait until the policy inspect agent receives the message and writes to
// inspect.
while let Some(event) = {
match event {
MessageEvent::Status(Status::Received) => {
_ => {}
// Inspect agent writes value to inspect.
assert_inspect_tree!(inspector, root: {
policy_values: {
"Audio": {
value: format!("{:?}", expected_state),
timestamp: "0.000000000",