blob: efb7ce6816cac0b6e4c6919c6e521611daad25f1 [file] [log] [blame]
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, Context, Result};
use fidl::endpoints::{create_proxy, ProtocolMarker};
use fuchsia_component::client;
use futures::channel::mpsc;
use futures::{future, Future, SinkExt, StreamExt};
use tracing::{debug, error};
use {fidl_fuchsia_power_broker as fpb, fidl_fuchsia_power_system as fps, fuchsia_async as fasync};
const ELEMENT_NAME: &str = "timekeeper-pe";
const POWER_ON: u8 = 0xff;
const POWER_OFF: u8 = 0x00;
const REQUIRED_LEVEL: u8 = fps::ExecutionStateLevel::WakeHandling.into_primitive();
pub async fn manage(activity_signal: mpsc::Sender<super::Command>) -> Result<fasync::Task<()>> {
let governor_proxy = client::connect_to_protocol::<fps::ActivityGovernorMarker>()
.with_context(|| {
format!("while connecting to: {:?}", fps::ActivityGovernorMarker::DEBUG_NAME)
})?;
let topology_proxy = client::connect_to_protocol::<fpb::TopologyMarker>()
.with_context(|| format!("while connecting to: {:?}", fpb::TopologyMarker::DEBUG_NAME))?;
manage_internal(governor_proxy, topology_proxy, activity_signal, management_loop).await
}
async fn manage_internal<F, G>(
governor_proxy: fps::ActivityGovernorProxy,
topology_proxy: fpb::TopologyProxy,
mut activity: mpsc::Sender<super::Command>,
// Injected in tests.
loop_fn: F,
) -> Result<fasync::Task<()>>
where
G: Future<Output = fasync::Task<()>>,
F: Fn(fpb::CurrentLevelProxy, fpb::RequiredLevelProxy) -> G,
{
let power_elements = governor_proxy
.get_power_elements()
.await
.context("in a call to ActivityGovernor/GetPowerElements")?;
let _ignore = activity.send(super::Command::PowerManagement).await;
if let Some(execution_state) = power_elements.execution_state {
if let Some(token) = execution_state.passive_dependency_token {
let deps = vec![fpb::LevelDependency {
dependency_type: fpb::DependencyType::Passive,
dependent_level: POWER_ON,
requires_token: token,
requires_level_by_preference: vec![REQUIRED_LEVEL],
}];
let (current, current_level_channel) = create_proxy::<fpb::CurrentLevelMarker>()?;
let (required, required_level_channel) = create_proxy::<fpb::RequiredLevelMarker>()?;
let result = topology_proxy
.add_element(fpb::ElementSchema {
element_name: Some(ELEMENT_NAME.into()),
initial_current_level: Some(POWER_ON),
valid_levels: Some(vec![POWER_ON, POWER_OFF]),
dependencies: Some(deps),
active_dependency_tokens_to_register: Some(vec![]),
passive_dependency_tokens_to_register: Some(vec![]),
level_control_channels: Some(fpb::LevelControlChannels {
current: current_level_channel,
required: required_level_channel,
}),
..Default::default()
})
.await
.context("while calling fuchsia.power.broker.Topology/AddElement")?;
match result {
Ok(_element_control_ch) => {
return Ok(loop_fn(current, required).await);
}
Err(e) => return Err(anyhow!("error while adding element: {:?}", e)),
}
}
} else {
debug!(
"no execution state power token found, power management integration is shutting down."
);
}
Ok(fasync::Task::local(async {}))
}
// Loop around and react to level control messages. Use separate tasks to ensure
// we can insert a power transition process in between.
//
// Returns the task spawned for transition control.
async fn management_loop(
current: fpb::CurrentLevelProxy,
required: fpb::RequiredLevelProxy,
) -> fasync::Task<()> {
// The Sender is used to ensure that rcv_task does not send before send_task
// is done.
let (mut send, mut rcv) = mpsc::channel::<(u8, mpsc::Sender<()>)>(1);
let rcv_task = fasync::Task::local(async move {
loop {
let result = required.watch().await;
match result {
Ok(Ok(level)) => {
let (s, mut r) = mpsc::channel::<()>(1);
// For now, we just echo the power level back to the power broker.
if let Err(e) = send.send((level, s)).await {
error!("error while processing power level, bailing: {:?}", e);
break;
}
// Wait until rcv_task propagates the new required level.
r.next().await.unwrap();
}
Ok(Err(e)) => {
error!("error while watching level, bailing: {:?}", e);
break;
}
Err(e) => {
error!("error while watching level, bailing: {:?}", e);
break;
}
}
}
debug!("no longer watching required level");
});
let send_task = fasync::Task::local(async move {
while let Some((new_level, mut s)) = rcv.next().await {
match current.update(new_level).await {
Ok(Ok(())) => {
// Allow rcv_task to proceed.
s.send(()).await.unwrap();
}
Ok(Err(e)) => {
error!("error while watching level, bailing: {:?}", e);
break;
}
Err(e) => {
error!("error while watching level, bailing: {:?}", e);
break;
}
}
}
debug!("no longer reporting required level");
});
fasync::Task::local(async move {
future::join(rcv_task, send_task).await;
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Command;
use fidl::endpoints;
use fidl::endpoints::Proxy;
use fuchsia_zircon as zx;
use tracing::debug;
// Returns immediately.
async fn async_send_via(s: &mut mpsc::Sender<u8>, value: u8) {
let mut c = s.clone();
fasync::Task::local(async move {
c.send(value).await.expect("always succeeds");
})
.detach();
}
// Waits for a value to be available on the receiver.
async fn block_recv_from(s: &mut mpsc::Receiver<u8>) -> u8 {
let level = s.next().await.expect("always succeeds");
level
}
#[fuchsia::test]
async fn propagate_level() {
let (current, mut current_stream) =
endpoints::create_proxy_and_stream::<fpb::CurrentLevelMarker>()
.expect("always succeeds");
let (required, mut required_stream) =
endpoints::create_proxy_and_stream::<fpb::RequiredLevelMarker>()
.expect("always succeeds");
// Send the power level in from test into the handler.
let (mut in_send, mut in_recv) = mpsc::channel(1);
// Get the power level out from the handler into the test.
let (mut out_send, mut out_recv) = mpsc::channel(1);
// Serve the topology streams asynchronously.
fasync::Task::local(async move {
debug!("topology: start listening for requests.");
while let Some(next) = current_stream.next().await {
let request: fpb::CurrentLevelRequest = next.unwrap();
debug!("topology: request: {:?}", request);
match request {
fpb::CurrentLevelRequest::Update { current_level, responder, .. } => {
out_send.send(current_level).await.expect("always succeeds");
responder.send(Ok(())).unwrap();
}
_ => {
unimplemented!();
}
}
}
})
.detach();
fasync::Task::local(async move {
debug!("topology: start listening for requests.");
while let Some(next) = required_stream.next().await {
let request: fpb::RequiredLevelRequest = next.unwrap();
debug!("topology: request: {:?}", request);
match request {
fpb::RequiredLevelRequest::Watch { responder, .. } => {
// Emulate hanging get response: block on a new value, then report that
// value.
let new_level = in_recv.next().await.expect("always succeeds");
responder.send(Ok(new_level)).unwrap();
}
_ => {
unimplemented!();
}
}
}
})
.detach();
// Management loop is also asynchronous.
fasync::Task::local(async move {
management_loop(current, required).await.await;
})
.detach();
async_send_via(&mut in_send, POWER_ON).await;
assert_eq!(POWER_ON, block_recv_from(&mut out_recv).await);
async_send_via(&mut in_send, POWER_OFF).await;
assert_eq!(POWER_OFF, block_recv_from(&mut out_recv).await);
async_send_via(&mut in_send, POWER_ON).await;
assert_eq!(POWER_ON, block_recv_from(&mut out_recv).await);
}
async fn empty_loop(_: fpb::CurrentLevelProxy, _: fpb::RequiredLevelProxy) -> fasync::Task<()> {
fasync::Task::local(async move {})
}
// Get a client end for the given protocol marker T, assuming that
// will never get used.
fn dummy_client_end<T>() -> endpoints::ClientEnd<T>
where
T: fidl::endpoints::ProtocolMarker,
<T as fidl::endpoints::ProtocolMarker>::Proxy: std::fmt::Debug,
{
let (p, _) = endpoints::create_proxy_and_stream::<T>().expect("never fails");
p.into_client_end().expect("never fails")
}
#[fuchsia::test]
async fn test_manage_internal() {
let (g_proxy, mut g_stream) =
endpoints::create_proxy_and_stream::<fps::ActivityGovernorMarker>()
.expect("infallible");
let (t_proxy, mut _t_stream) =
endpoints::create_proxy_and_stream::<fpb::TopologyMarker>().expect("infallible");
let (_activity_s, mut activity_r) = mpsc::channel::<Command>(1);
// Run the server side activity governor.
fasync::Task::local(async move {
while let Some(request) = g_stream.next().await {
match request {
Ok(fps::ActivityGovernorRequest::GetPowerElements { responder }) => {
let event = zx::Event::create();
responder
.send(fps::PowerElements {
execution_state: Some(fps::ExecutionState {
passive_dependency_token: Some(event),
..Default::default()
}),
..Default::default()
})
.expect("never fails");
}
Ok(_) | Err(_) => unimplemented!(),
}
}
debug!("governor server side test exiting");
})
.detach();
// Run the server side topology proxy
fasync::Task::local(async move {
while let Some(request) = _t_stream.next().await {
match request {
Ok(fpb::TopologyRequest::AddElement { payload: _, responder }) => {
responder.send(Ok(dummy_client_end())).expect("never fails");
}
Ok(_) | Err(_) => unimplemented!(),
}
}
})
.detach();
fasync::Task::local(async move {
manage_internal(g_proxy, t_proxy, _activity_s, empty_loop).await.unwrap().await;
})
.detach();
activity_r.next().await.unwrap();
}
}