| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use std::collections::HashMap; |
| use std::convert::TryFrom; |
| use std::sync::Arc; |
| |
| use anyhow::{format_err, Error}; |
| use fuchsia_async as fasync; |
| use fuchsia_inspect::{self as inspect, component, Property}; |
| use fuchsia_syslog::fx_log_err; |
| use futures::StreamExt; |
| |
| use crate::agent::{Context, Payload}; |
| use crate::blueprint_definition; |
| use crate::clock; |
| use crate::handler::device_storage::DeviceStorageAccess; |
| use crate::message::base::{filter, role, MessageEvent, MessengerType}; |
| use crate::policy::{self as policy_base, Payload as PolicyPayload, Request, Role}; |
| use crate::service; |
| use crate::service::message::{Audience, MessageClient, Messenger, Signature}; |
| |
| const INSPECT_NODE_NAME: &str = "policy_values"; |
| |
| blueprint_definition!("inspect_policy", crate::agent::inspect_policy::InspectPolicyAgent::create); |
| |
| /// An agent that listens in on messages sent on the message hub to policy handlers |
| /// to record their internal state to inspect. |
| pub struct InspectPolicyAgent { |
| messenger_client: Messenger, |
| inspect_node: inspect::Node, |
| policy_values: HashMap<&'static str, InspectPolicyInfo>, |
| } |
| |
| /// Information about a policy to be written to inspect. |
| /// |
| /// Inspect nodes and properties are not used, but need to be held as they're deleted from inspect |
| /// once they go out of scope. |
| struct InspectPolicyInfo { |
| /// Node of this info. |
| _node: inspect::Node, |
| |
| /// Debug string representation of the state of this policy. |
| value: inspect::StringProperty, |
| |
| /// Milliseconds since Unix epoch that this policy was modified. |
| timestamp: inspect::StringProperty, |
| } |
| |
| impl DeviceStorageAccess for InspectPolicyAgent { |
| const STORAGE_KEYS: &'static [&'static str] = &[]; |
| } |
| |
| impl InspectPolicyAgent { |
| async fn create(context: Context) { |
| Self::create_with_node( |
| context, |
| component::inspector().root().create_child(INSPECT_NODE_NAME), |
| ) |
| .await; |
| } |
| |
| /// Create an agent to watch messages to the policy layer and record policy |
| /// state to the inspect child node. Agent starts immediately without |
| /// calling invocation, but acknowledges the invocation payload to |
| /// let the Authority know the agent starts properly. |
| pub async fn create_with_node(context: Context, inspect_node: inspect::Node) { |
| // We must take care not to observe all requests as the agent itself can |
| // issue messages and block on their response. If another message is |
| // passed to the agent during this wait, we will deadlock. |
| let (messenger_client, broker_receptor) = match context |
| .messenger_factory |
| .create(MessengerType::Broker(Some(filter::Builder::single( |
| filter::Condition::Custom(Arc::new(|message| { |
| matches!(message.payload(), service::Payload::Policy(PolicyPayload::Request(_))) |
| })), |
| )))) |
| .await |
| { |
| Ok(messenger) => messenger, |
| Err(err) => { |
| fx_log_err!( |
| "broker listening to only policy requests could not be created: {:?}", |
| err |
| ); |
| return; |
| } |
| }; |
| |
| let mut agent = Self { messenger_client, inspect_node, policy_values: HashMap::new() }; |
| |
| fasync::Task::spawn(async move { |
| // Request initial values from all policy handlers. |
| let initial_get_receptor = agent |
| .messenger_client |
| .message( |
| PolicyPayload::Request(Request::Get).into(), |
| Audience::Role(role::Signature::role(service::Role::Policy( |
| Role::PolicyHandler, |
| ))), |
| ) |
| .send(); |
| |
| let initial_get_fuse = initial_get_receptor.fuse(); |
| let broker_fuse = broker_receptor.fuse(); |
| let agent_event = context.receptor.fuse(); |
| futures::pin_mut!(initial_get_fuse, broker_fuse, agent_event); |
| |
| loop { |
| futures::select! { |
| initial_get_message = initial_get_fuse.select_next_some() => { |
| // Received a reply to our initial broadcast to all policy handlers asking |
| // for their value. |
| agent.handle_initial_get(initial_get_message).await; |
| } |
| |
| intercepted_message = broker_fuse.select_next_some() => { |
| // Intercepted a policy request. |
| agent.handle_intercepted_message(intercepted_message).await; |
| } |
| |
| agent_message = agent_event.select_next_some() => { |
| if let MessageEvent::Message( |
| service::Payload::Agent(Payload::Invocation(_invocation)), client) |
| = agent_message { |
| // Since the agent runs at creation, there is no |
| // need to handle state here. |
| client.reply(Payload::Complete(Ok(())).into()).send().ack(); |
| } |
| } |
| |
| // This shouldn't ever be triggered since the inspect agent (and its receptors) |
| // should be active for the duration of the service. This is just a safeguard to |
| // ensure this detached task doesn't run forever if the receptors stop somehow. |
| complete => break, |
| } |
| } |
| }) |
| .detach(); |
| } |
| |
| /// Handles responses to the initial broadcast by the inspect agent to all policy handlers that |
| /// requests their state. |
| async fn handle_initial_get(&mut self, message: service::message::MessageEvent) { |
| if let MessageEvent::Message(payload, _) = message { |
| // Since the order for these events isn't guaranteed, don't overwrite responses obtained |
| // after intercepting a request with these initial values. |
| if let Err(err) = self.write_response_to_inspect(payload, true).await { |
| fx_log_err!("Failed write initial get response to inspect: {:?}", err); |
| } |
| } |
| } |
| |
| /// Handles messages seen over the message hub and requests policy state from handlers as |
| /// needed. |
| async fn handle_intercepted_message(&mut self, message: service::message::MessageEvent) { |
| if let MessageEvent::Message(service::Payload::Policy(PolicyPayload::Request(_)), client) = |
| message |
| { |
| // When we see a request to a policy proxy, we assume that the policy will be modified, |
| // so we wait for the reply to get the signature of the proxy, then ask the proxy for |
| // its latest value. |
| match InspectPolicyAgent::watch_reply(client).await { |
| Ok(reply_signature) => { |
| if let Err(err) = self.request_and_write_to_inspect(reply_signature).await { |
| fx_log_err!("Failed request value from policy proxy: {:?}", err); |
| } |
| } |
| Err(err) => { |
| fx_log_err!("Failed to watch reply to request: {:?}", err); |
| } |
| } |
| } |
| } |
| |
| /// Watches for the reply to a sent message and returns the author of the reply. |
| async fn watch_reply(mut client: MessageClient) -> Result<Signature, Error> { |
| let mut reply_receptor = client.spawn_observer(); |
| |
| reply_receptor.next_payload().await.map(|(_, reply_client)| reply_client.get_author()) |
| } |
| |
| /// Requests the policy state from a given signature for a policy handler and records the result |
| /// in inspect. |
| async fn request_and_write_to_inspect(&mut self, signature: Signature) -> Result<(), Error> { |
| // Send the request to the policy proxy. |
| let mut send_receptor = self |
| .messenger_client |
| .message(PolicyPayload::Request(Request::Get).into(), Audience::Messenger(signature)) |
| .send(); |
| |
| // Wait for a response from the policy proxy. |
| let (payload, _) = send_receptor.next_payload().await?; |
| |
| self.write_response_to_inspect(payload, false).await |
| } |
| |
| /// Writes a policy payload response to inspect. |
| /// |
| /// ignore_if_present will silently not write the response to inspect if a value already exists |
| /// for the policy. |
| async fn write_response_to_inspect( |
| &mut self, |
| payload: service::Payload, |
| ignore_if_present: bool, |
| ) -> Result<(), Error> { |
| let policy_info = if let Ok(PolicyPayload::Response(Ok( |
| policy_base::response::Payload::PolicyInfo(policy_info), |
| ))) = PolicyPayload::try_from(payload) |
| { |
| policy_info |
| } else { |
| return Err(format_err!("did not receive policy state")); |
| }; |
| |
| // Convert the response to a string for inspect. |
| let (policy_name, inspect_str) = policy_info.for_inspect(); |
| |
| let timestamp = clock::inspect_format_now(); |
| |
| match self.policy_values.get_mut(policy_name) { |
| Some(policy_info) => { |
| if ignore_if_present { |
| // Value already present in inspect, ignore this response. |
| return Ok(()); |
| } |
| // Value already known, just update its fields. |
| policy_info.timestamp.set(×tamp); |
| policy_info.value.set(&inspect_str); |
| } |
| None => { |
| // Policy info not recorded yet, create a new inspect node. |
| let node = self.inspect_node.create_child(policy_name); |
| let value_prop = node.create_string("value", inspect_str); |
| let timestamp_prop = node.create_string("timestamp", timestamp.clone()); |
| self.policy_values.insert( |
| policy_name, |
| InspectPolicyInfo { _node: node, value: value_prop, timestamp: timestamp_prop }, |
| ); |
| } |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::collections::HashSet; |
| |
| use fuchsia_inspect as inspect; |
| use fuchsia_inspect::assert_inspect_tree; |
| use fuchsia_zircon::Time; |
| use futures::future::BoxFuture; |
| use futures::StreamExt; |
| |
| use crate::service; |
| use crate::service::message::{create_hub, Audience}; |
| |
| use crate::agent::inspect_policy::{InspectPolicyAgent, INSPECT_NODE_NAME}; |
| use crate::agent::Context; |
| use crate::audio::policy as audio_policy; |
| use crate::audio::policy::{PolicyId, StateBuilder, TransformFlags}; |
| use crate::audio::types::AudioStreamType; |
| use crate::clock; |
| use crate::message::base::{role, MessageEvent, MessengerType, Status}; |
| use crate::policy::{self as policy_base, Payload, PolicyInfo, Role, UnknownInfo}; |
| use crate::tests::message_utils::verify_payload; |
| |
| const GET_REQUEST: Payload = Payload::Request(policy_base::Request::Get); |
| |
| async fn create_context() -> Context { |
| Context::new( |
| create_hub().create(MessengerType::Unbound).await.expect("should be present").1, |
| create_hub(), |
| HashSet::new(), |
| None, |
| ) |
| .await |
| } |
| |
| /// Verifies that inspect agent requests and writes state for each policy on start. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_write_policy_inspect_on_start() { |
| // Set the clock so that timestamps will always be 0. |
| clock::mock::set(Time::from_nanos(0)); |
| |
| let context = create_context().await; |
| // Create a receptor representing the policy proxy, with an appropriate role. |
| let (_, mut policy_receptor) = context |
| .messenger_factory |
| .messenger_builder(MessengerType::Unbound) |
| .add_role(role::Signature::role(service::Role::Policy(Role::PolicyHandler))) |
| .build() |
| .await |
| .unwrap(); |
| |
| // Create the inspect agent. |
| let inspector = inspect::Inspector::new(); |
| let inspect_node = inspector.root().create_child(INSPECT_NODE_NAME); |
| |
| InspectPolicyAgent::create_with_node(context, inspect_node).await; |
| |
| let expected_state = StateBuilder::new() |
| .add_property(AudioStreamType::Media, TransformFlags::TRANSFORM_MAX) |
| .build(); |
| |
| // Policy proxy receives a get request on start and returns the state. |
| let state_clone = expected_state.clone(); |
| verify_payload( |
| GET_REQUEST.into(), |
| &mut policy_receptor, |
| Some(Box::new(|client| -> BoxFuture<'_, ()> { |
| Box::pin(async move { |
| let mut receptor = client |
| .reply( |
| Payload::Response(Ok(policy_base::response::Payload::PolicyInfo( |
| PolicyInfo::Audio(state_clone), |
| ))) |
| .into(), |
| ) |
| .send(); |
| // Wait until the policy inspect agent receives the message and writes to |
| // inspect. |
| while let Some(event) = receptor.next().await { |
| match event { |
| MessageEvent::Status(Status::Received) => { |
| return; |
| } |
| _ => {} |
| } |
| } |
| }) |
| })), |
| ) |
| .await; |
| |
| // Inspect agent writes value to inspect. |
| assert_inspect_tree!(inspector, root: { |
| policy_values: { |
| "Audio": { |
| value: format!("{:?}", expected_state), |
| timestamp: "0.000000000", |
| } |
| } |
| }); |
| } |
| |
| /// Verifies that inspect agent intercepts policy requests and writes their values to inspect. |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_write_inspect_on_changed() { |
| // Set the clock so that timestamps will always be 0. |
| clock::mock::set(Time::from_nanos(0)); |
| |
| let context = create_context().await; |
| |
| // Create a receptor representing the policy proxy, with an appropriate role. |
| let (_, mut policy_receptor) = context |
| .messenger_factory |
| .messenger_builder(MessengerType::Unbound) |
| .add_role(role::Signature::role(service::Role::Policy(Role::PolicyHandler))) |
| .build() |
| .await |
| .unwrap(); |
| |
| // Create a messenger on the policy message hub to send requests for the inspect agent to |
| // intercept. |
| let (policy_sender, _) = |
| context.messenger_factory.create(MessengerType::Unbound).await.unwrap(); |
| |
| // Create the inspect agent. |
| let inspector = inspect::Inspector::new(); |
| let inspect_node = inspector.root().create_child(INSPECT_NODE_NAME); |
| InspectPolicyAgent::create_with_node(context, inspect_node).await; |
| |
| // Starting state for audio policy. |
| let initial_state = StateBuilder::new() |
| .add_property(AudioStreamType::Media, TransformFlags::TRANSFORM_MAX) |
| .build(); |
| |
| // While this isn't a change in state that would happen in the real world, it's fine for |
| // testing. |
| let expected_state = StateBuilder::new() |
| .add_property(AudioStreamType::Background, TransformFlags::TRANSFORM_MIN) |
| .build(); |
| |
| // Policy proxy receives a get request on start and returns the initial state. |
| verify_payload( |
| GET_REQUEST.into(), |
| &mut policy_receptor, |
| Some(Box::new(|client| -> BoxFuture<'_, ()> { |
| Box::pin(async move { |
| client |
| .reply( |
| Payload::Response(Ok(policy_base::response::Payload::PolicyInfo( |
| PolicyInfo::Audio(initial_state), |
| ))) |
| .into(), |
| ) |
| .send(); |
| }) |
| })), |
| ) |
| .await; |
| |
| // Send a message to the policy proxy. Inspect agent acts on any request and waits for a |
| // reply to know where to ask for the policy state so send a nonsensical request + reply. |
| let test_request: service::Payload = Payload::Request(policy_base::Request::Audio( |
| audio_policy::Request::RemovePolicy(PolicyId::create(0)), |
| )) |
| .into(); |
| policy_sender |
| .message(test_request.clone(), Audience::Messenger(policy_receptor.get_signature())) |
| .send(); |
| |
| // Policy proxy receives a request from the policy_sender. |
| verify_payload( |
| test_request.clone(), |
| &mut policy_receptor, |
| Some(Box::new(|client| -> BoxFuture<'_, ()> { |
| Box::pin(async move { |
| client |
| .reply( |
| Payload::Response(Ok(policy_base::response::Payload::PolicyInfo( |
| UnknownInfo(true).into(), |
| ))) |
| .into(), |
| ) |
| .send(); |
| }) |
| })), |
| ) |
| .await; |
| |
| // Policy proxy receives a get request from the inspect agent and returns the expected |
| // state. |
| let state_clone = expected_state.clone(); |
| verify_payload( |
| GET_REQUEST.into(), |
| &mut policy_receptor, |
| Some(Box::new(|client| -> BoxFuture<'_, ()> { |
| Box::pin(async move { |
| let mut receptor = client |
| .reply( |
| Payload::Response(Ok(policy_base::response::Payload::PolicyInfo( |
| PolicyInfo::Audio(state_clone), |
| ))) |
| .into(), |
| ) |
| .send(); |
| // Wait until the policy inspect agent receives the message and writes to |
| // inspect. |
| while let Some(event) = receptor.next().await { |
| match event { |
| MessageEvent::Status(Status::Received) => { |
| return; |
| } |
| _ => {} |
| } |
| } |
| }) |
| })), |
| ) |
| .await; |
| |
| // Inspect agent writes value to inspect. |
| assert_inspect_tree!(inspector, root: { |
| policy_values: { |
| "Audio": { |
| value: format!("{:?}", expected_state), |
| timestamp: "0.000000000", |
| } |
| } |
| }); |
| } |
| } |