blob: 8b604b6161474ebba71d927a8031f4a35ddc8664 [file] [log] [blame]
// Copyright 2025 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use async_trait::async_trait;
use futures::StreamExt;
use futures::lock::Mutex;
use log::warn;
use std::sync::Arc;
use {
fidl_fuchsia_example_power as fexample, fidl_fuchsia_power_system as fsag,
fuchsia_trace as ftrace,
};
const TRACE_NAME_FLUSH: &std::ffi::CStr = c"flush-count-dirty";
/// You probably don't want to create one of these directly, but instead use
/// |SequenceServer| to create one for you and then get a reference to it from
/// |SequenceServer.get_message_tracker|.
///
/// Server components can use this to help manage sending batons to clients
/// at appropriate times. Server components should call |message_sent| when
/// they send a message to a client. Servers should call |set_requester|
/// whenever server receives a hanging-GET for a baton from a client.
pub struct MessageSendTracker {
sent_messages: u64,
// The value of |sent_messages| when we last passed a baton
last_baton_pass: u64,
baton_request: Option<fexample::MessageSourceReceiveBatonResponder>,
sag: fsag::ActivityGovernorProxy,
baton: Option<zx::EventPair>,
resumed: bool,
terminated: bool,
}
#[derive(Debug)]
pub enum MessageTrackerError {
Terminated,
}
impl MessageSendTracker {
pub async fn new(sag: fsag::ActivityGovernorProxy) -> Self {
let me = Self {
sent_messages: 0,
last_baton_pass: 0,
baton_request: None,
sag,
baton: None,
resumed: false,
terminated: false,
};
me
}
/// Inform the tracker a message was sent, equivalent to |messages_sent(1)|.
/// If the system is suspended, a wake lease is taken to guarantee the
/// message can be processed. This is the behavior we probably want if we
/// this message came from a waking interrupt. This means after this
/// function returns, the caller can safely ack an interrupt related to
/// this event.
pub async fn message_sent(&mut self) -> Result<(), MessageTrackerError> {
self.messages_sent(1).await
}
/// Inform the tracker that |message_count| messages were sent. If the
/// system is suspended, a wake lease is taken to guarantee the message
/// can be processed. This is the behavior we probably want if we this
/// message came from a waking interrupt. This means after this function
/// returns, the caller can safely ack an interrupt related to this event.
pub async fn messages_sent(&mut self, message_count: u64) -> Result<(), MessageTrackerError> {
ftrace::instant!(
crate::TRACE_CATEGORY,
c"messages_sent",
ftrace::Scope::Process,
"count" => message_count
);
if self.terminated {
return Err(MessageTrackerError::Terminated);
}
self.sent_messages += message_count;
// If we're not resumed we want to resume the system so we can make
// sure the message will be processed.
if !self.resumed {
ftrace::instant!(
crate::TRACE_CATEGORY,
c"send-during-suspension",
ftrace::Scope::Process
);
self.flush().await;
// Flush acquires a wake lease, which guarantees we are or will be
// resumed at some point, mark the state as such.
self.resumed();
}
Ok(())
}
/// Deposits a request for a baton that a client made. This is a
/// hanging-GET-style communication pattern and so the client receives a
/// response the next time a flush is triggered. If a flush was triggered
/// before |set_requester| was called, the baton is sent now based on the
/// current message index.
pub fn set_requester(&mut self, requester: fexample::MessageSourceReceiveBatonResponder) {
{
// Drop any previous requester
let _previous_requester = self.baton_request.take();
}
if let Some(baton) = self.baton.take() {
let id = self.sent_messages;
self.send_baton(id, requester, baton);
} else {
self.baton_request = Some(requester);
}
}
pub fn suspended(&mut self) {
self.resumed = false;
}
pub fn resumed(&mut self) {
self.resumed = true;
}
fn terminated(&mut self) {
self.terminated = true;
}
pub fn get_message_count(&self) -> u64 {
self.sent_messages
}
fn send_baton(
&mut self,
msg_id: u64,
request: fexample::MessageSourceReceiveBatonResponder,
baton: zx::EventPair,
) {
request
.send(fexample::LeaseBaton {
lease: Some(baton),
msg_index: Some(msg_id),
..Default::default()
})
.expect("send failed");
ftrace::duration_end!(crate::TRACE_CATEGORY, TRACE_NAME_FLUSH, "msg_id" => msg_id);
self.last_baton_pass = msg_id;
}
/// Triggers a response the previous request passed to |set_requester|, if
/// any, when |message(s)_sent| has been called since the previous time a
/// response was set to a requester. If there is no waiting request for
/// a baton, the baton is created, but not passed. It will be passed when
/// |seq_requester| is called next.
async fn flush(&mut self) {
if self.last_baton_pass < self.sent_messages {
let curr_offset = self.sent_messages;
ftrace::duration_begin!(
crate::TRACE_CATEGORY,
TRACE_NAME_FLUSH,
"msg_id" => curr_offset,
"baton_delta" => self.sent_messages - self.last_baton_pass
);
let baton = self
.sag
.acquire_wake_lease("optimistic-lease-baton")
.await
.expect("FIDL failed")
.expect("SAG returned error");
// If there is a hanging-GET for a baton, return the send the baton
// immediately, otherwise store it for later.
if let Some(req) = self.baton_request.take() {
self.send_baton(curr_offset, req, baton);
} else {
self.baton = Some(baton);
return;
}
}
}
}
/// Server components should use class to guarantee clients see all messages
/// sent by the server prior to system suspension. After creating a
/// |SequenceServer| server components should call |manage| to start this
/// management.
///
/// Then server components should use the |MessageSendTracker| returned from
/// |get_message_tracker| to record when messages are sent and deposit
/// hanging-GET requests for batons. Batons are sent to the client whenever the
/// system starts to suspend (as indicated by a SuspendStarted callback from
/// SystemActivityGovernor) *AND* messages count of sent messages is greater
/// than the last time a baton was sent to a the client *AND* a hanging-GET
/// request is pending.
pub struct SequenceServer {
baton_sender: Arc<Mutex<MessageSendTracker>>,
flusher: Option<crate::flush_trigger::FlushTrigger>,
sag: fsag::ActivityGovernorProxy,
}
#[async_trait]
impl crate::flush_trigger::FlushListener for SequenceServer {
async fn flush(&self) {
self.baton_sender.lock().await.flush().await;
}
}
impl SequenceServer {
/// Creates a SequenceServer, but does *not* kick off its logic. |manage|
/// MUST be called to start monitoring for suspend and managing baton hand-
/// offs.
pub async fn new(sag: fsag::ActivityGovernorProxy) -> Self {
let flusher = Some(crate::flush_trigger::FlushTrigger::new(sag.clone()));
let baton_sender = Arc::new(Mutex::new(MessageSendTracker::new(sag.clone()).await));
Self { flusher, baton_sender, sag }
}
/// Returns a future which manages baton passing and a reference to the
/// |MessageSendTracker| clients use to report message sends. The returned
/// future *must* be polled for as long as batons need to be delivered. The
/// future returns, yielding the |SequenceServer| if the channel to
/// ActivityGovernor passed to |new| closes.
///
/// To stop managing batons, simply drop the future. The
/// |MessageSendTracker| should not be used after the future returns.
pub fn manage(
self,
) -> (Arc<Mutex<MessageSendTracker>>, impl futures::Future<Output = Result<Self, fidl::Error>>)
{
let tracker = self.baton_sender.clone();
let fut = async move {
let mut sequence_server = self;
if let None = sequence_server.flusher {
warn!("No flusher available, aborting manage");
return Ok(sequence_server);
}
// Take the flush trigger because we can't borrow sequence_server
// twice to run the two futures.
let flusher = sequence_server.flusher.take().unwrap();
// Await the two futures for what we expect to be effectively
// forever.
let results = futures::future::join(
sequence_server.watch_system_state(),
flusher.run(&sequence_server),
)
.await;
// Both futures return errors only if we can't talk to SAG so probably if
// one has an error the other will have to same or would have the same
// error soon.
if let Err(e) = results.1 {
return Err(e);
}
if let Err(e) = results.0 {
return Err(e);
}
sequence_server.flusher = Some(flusher);
sequence_server.baton_sender.lock().await.terminated();
Ok(sequence_server)
};
(tracker, fut)
}
/// Watch the suspend/resume state of the system and
async fn watch_system_state(&self) -> Result<(), fidl::Error> {
let (client, server) = fidl::endpoints::create_endpoints::<fsag::SuspendBlockerMarker>();
let registration_lease = self
.sag
.register_suspend_blocker(fsag::ActivityGovernorRegisterSuspendBlockerRequest {
suspend_blocker: Some(client),
name: Some("suspend_watcher".into()),
..Default::default()
})
.await?
.expect("error registering suspend blocker");
drop(registration_lease);
let mut request_stream = server.into_stream();
while let Some(req) = request_stream.next().await {
match req {
Ok(fsag::SuspendBlockerRequest::BeforeSuspend { responder }) => {
ftrace::instant!(crate::TRACE_CATEGORY, c"suspended", ftrace::Scope::Process);
self.baton_sender.lock().await.suspended();
let _ = responder.send();
}
Ok(fsag::SuspendBlockerRequest::AfterResume { responder }) => {
ftrace::instant!(crate::TRACE_CATEGORY, c"resumed", ftrace::Scope::Process);
self.baton_sender.lock().await.resumed();
let _ = responder.send();
}
Ok(fsag::SuspendBlockerRequest::_UnknownMethod { .. }) => {
warn!("unrecognized listener method, ignoring");
}
Err(_) => {
warn!("Error receiving next items from stream");
}
}
}
Ok(())
}
}