// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Support for running the ServiceFs until stalled.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use detect_stall::StallableRequestStream;
use fidl::endpoints::ServerEnd;
use futures::channel::oneshot::{self, Canceled};
use futures::future::FusedFuture;
use futures::{FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use vfs::directory::immutable::connection::ImmutableConnection;
use vfs::directory::immutable::Simple;
use vfs::execution_scope::{ActiveGuard, ExecutionScope};
use vfs::ToObjectRequest;
use zx::Duration;
use {fidl_fuchsia_io as fio, fuchsia_async as fasync, fuchsia_zircon as zx};
use super::{ServiceFs, ServiceObjTrait};
/// The future type that resolves when an outgoing directory connection has stalled
/// for a timeout or completed.
type StalledFut = Pin<Box<dyn FusedFuture<Output = Option<zx::Channel>>>>;
/// A wrapper around the base [`ServiceFs`] that streams out capability connection requests.
/// Additionally, it will yield [`Item::Stalled`] if there is no work happening in the fs
/// and the main outgoing directory connection has not received messages for some time.
/// Use [`ServiceFs::until_stalled`] to produce an instance. Refer to details there.
pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
fs: ServiceFs<ServiceObjTy>,
connector: OutgoingConnector,
state: State,
debounce_interval: zx::Duration,
is_terminated: bool,
/// The item yielded by a [`StallableServiceFs`] stream.
pub enum Item<Output> {
/// A new connection request to a capability. `ServiceObjTy::Output` contains more
/// information identifying the capability requested. The [`ActiveGuard`] should be
/// held alive as long as you are processing the connection, or doing any other work
/// where you would like to prevent the [`ServiceFs`] from shutting down.
Request(Output, ActiveGuard),
/// The [`ServiceFs`] has stalled. The unbound outgoing directory server endpoint will
/// be returned here. The stream will complete right after this. You should typically
/// escrow the server endpoint back to component manager, and then exit the component.
// Implementation detail below
/// We use a state machine to detect stalling. The general structure is:
/// - When the service fs is running, wait for the outgoing directory connection to stall.
/// - If the outgoing directory stalled, unbind it and wait for readable.
/// - If it is readable, we'll add back the connection to the service fs and back to wait for stall.
/// - If the service fs finished while the outgoing directory is unbound, we'll
/// complete the stream and return the endpoint to the user. Note that the service fs might take
/// a while to finish even after the outgoing directory has been unbound, due to
/// [`ActiveGuard`]s held by the user or due to other long-running connections.
enum State {
Running { stalled: StalledFut },
// If the `channel` is `None`, the outgoing directory stream completed without stalling.
// We just need to wait for the `ServiceFs` to finish.
Stalled { channel: Option<fasync::OnSignals<'static, zx::Channel>> },
impl<ServiceObjTy: ServiceObjTrait + Send> Stream for StallableServiceFs<ServiceObjTy> {
type Item = Item<ServiceObjTy::Output>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if *this.is_terminated {
return Poll::Ready(None);
// Poll the underlying service fs to handle requests.
let poll_fs = this.fs.poll_next_unpin(cx);
if let Poll::Ready(Some(request)) = poll_fs {
// If there is some connection request, always return that to the user first.
return Poll::Ready(Some(Item::Request(request, this.connector.scope.active_guard())));
// If we get here, the underlying service fs is either finished, or pending.
// Poll in a loop until the state no longer changes.
loop {
match &mut this.state {
State::Running { stalled } => {
let channel = std::task::ready!(stalled.as_mut().poll(cx));
let channel = channel
.map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
// The state will be polled on the next loop iteration.
*this.state = State::Stalled { channel };
State::Stalled { channel } => {
if let Poll::Ready(None) = poll_fs {
// The service fs finished. Return the channel if we have it.
*this.is_terminated = true;
return Poll::Ready(
channel.take().map(|wait| Item::Stalled(wait.take_handle().into())),
if channel.is_none() {
// The outgoing directory FIDL stream completed (client closed or
// errored) without stalling, but the service fs is processing
// other requests. Simply wait for that to finish.
return Poll::Pending;
// Otherwise, arrange to be polled again if the channel is readable.
let readable = channel.as_mut().unwrap().poll_unpin(cx);
let _ = std::task::ready!(readable);
// Server endpoint is readable again. Restore the connection.
let wait = channel.take().unwrap();
let stalled =
this.connector.serve(wait.take_handle().into(), *this.debounce_interval);
// The state will be polled on the next loop iteration.
*this.state = State::Running { stalled };
struct OutgoingConnector {
flags: fio::OpenFlags,
scope: ExecutionScope,
dir: Arc<Simple>,
impl OutgoingConnector {
/// Adds a stallable outgoing directory connection.
/// If the request stream completed, the returned future will resolve with `None`.
/// If the request stream did not encounter new requests for `debounce_interval`, it will be
/// unbound, and the returned future will resolve with `Some(channel)`.
fn serve(
&mut self,
server_end: ServerEnd<fio::DirectoryMarker>,
debounce_interval: Duration,
) -> StalledFut {
let (unbound_sender, unbound_receiver) = oneshot::channel();
let object_request = self.flags.to_object_request(server_end);
let scope = self.scope.clone();
let dir = self.dir.clone();
let flags = self.flags;
object_request.spawn(&scope.clone(), move |object_request_ref| {
async move {
move |stream| {
// This function will be called with the server endpoint when
// the directory request stream is stalled for `debounce_interval`
move |maybe_channel: Option<zx::Channel>| {
_ = unbound_sender.send(maybe_channel);
.map(|result| match result {
Ok(maybe_channel) => maybe_channel,
Err(Canceled) => None,
impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
pub(crate) fn new(mut fs: ServiceFs<ServiceObjTy>, debounce_interval: zx::Duration) -> Self {
let channel_queue =
fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
channel_queue.len() == 1,
"Must have exactly one connection to serve, \
e.g. did you call ServiceFs::take_and_serve_directory_handle?"
let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
let scope = fs.scope.clone();
let dir = fs.dir.clone();
let mut connector = OutgoingConnector { flags, scope, dir };
let stalled = connector.serve(server_end, debounce_interval);
Self {
state: State::Running { stalled },
is_terminated: false,
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use assert_matches::assert_matches;
use fasync::TestExecutor;
use fidl::endpoints::ClientEnd;
use fidl_fuchsia_component_client_test::{
ServiceAMarker, ServiceARequest, ServiceARequestStream,
use futures::future::BoxFuture;
use futures::{pin_mut, select, TryStreamExt};
use test_util::Counter;
use zx::AsHandleRef;
use super::*;
enum Requests {
struct MockServer {
call_count: Arc<Counter>,
stalled: Arc<AtomicBool>,
server_end: Arc<Mutex<Option<zx::Channel>>>,
impl MockServer {
fn new() -> Self {
let call_count = Arc::new(Counter::new(0));
let stalled = Arc::new(AtomicBool::new(false));
let server_end = Arc::new(Mutex::new(None));
Self { call_count, stalled, server_end }
fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
let stalled = self.stalled.clone();
let call_count = self.call_count.clone();
let server_end = self.server_end.clone();
async move {
match item {
Item::Request(requests, active_guard) => {
let _active_guard = active_guard;
let Requests::ServiceA(mut request_stream) = requests;
while let Ok(Some(request)) = request_stream.try_next().await {
match request {
ServiceARequest::Foo { responder } => {;
let _ = responder.send();
Item::Stalled(channel) => {
*server_end.lock().unwrap() = Some(channel);, Ordering::SeqCst);
fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
let reclaimed_server_end: zx::Channel = self.server_end.lock().unwrap().take().unwrap();
/// Initializes fake time; creates VFS with a single mock server, and returns them.
async fn setup_test(
server_end: ServerEnd<fio::DirectoryMarker>,
) -> (fasync::Time, MockServer, impl FusedFuture<Output = ()>) {
let initial = fasync::Time::from_nanos(0);
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
let mut fs = ServiceFs::new();
let mock_server = MockServer::new();
let mock_server_clone = mock_server.clone();
let fs = fs
.for_each_concurrent(None, move |item| mock_server_clone.handle(item));
(initial, mock_server, fs)
#[fuchsia::test(allow_stalls = false)]
async fn drain_request() {
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
const NUM_FOO_REQUESTS: usize = 10;
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let (initial, mock_server, fs) = setup_test(server_end).await;
let mut proxies = Vec::new();
for _ in 0..NUM_FOO_REQUESTS {
// Accept the connections.
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
// Active FIDL connections block idle, no matter the wait.
TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
// Make some requests.
for proxy in proxies.iter() {
select! {
result = => assert_matches!(result, Ok(_)),
_ = fs => unreachable!(),
// Dropping FIDL connections free the ServiceFs to complete.
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
// Requests were handled.
assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
#[fuchsia::test(allow_stalls = false)]
async fn no_request() {
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let (initial, mock_server, fs) = setup_test(server_end).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
TestExecutor::advance_to(initial + IDLE_DURATION).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
assert_eq!(mock_server.call_count.get(), 0);
#[fuchsia::test(allow_stalls = false)]
async fn outgoing_dir_client_closed() {
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let (_initial, mock_server, fs) = setup_test(server_end).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
assert_eq!(mock_server.call_count.get(), 0);
#[fuchsia::test(allow_stalls = false)]
async fn request_then_stalled() {
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let proxy =
let foo =;
assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
let (initial, mock_server, fs) = setup_test(server_end).await;
// Poll the fs to process the FIDL.
assert_eq!(mock_server.call_count.get(), 0);
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
assert_eq!(mock_server.call_count.get(), 1);
assert_matches!(foo.await, Ok(_));
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
TestExecutor::advance_to(initial + IDLE_DURATION).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
assert_eq!(mock_server.call_count.get(), 1);
#[fuchsia::test(allow_stalls = false)]
async fn stalled_then_request() {
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let (initial, mock_server, fs) = setup_test(server_end).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
let proxy =
select! {
result = => assert_matches!(result, Ok(_)),
_ = fs => unreachable!(),
assert_eq!(mock_server.call_count.get(), 1);
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
/// If periodic FIDL connections are made at an interval below the idle
/// duration, the service fs should not stall.
/// If periodic FIDL connections are made at an interval above the idle
/// duration, the service fs should stall.
#[fuchsia::test(allow_stalls = false)]
async fn periodic_requests() {
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let (mut current_time, mock_server, fs) = setup_test(server_end).await;
let fs = fasync::Task::local(fs);
// Interval below the idle duration.
const NUM_FOO_REQUESTS: usize = 10;
for _ in 0..NUM_FOO_REQUESTS {
let request_interval = IDLE_DURATION / 2;
current_time += request_interval;
let proxy =
assert_matches!(, Ok(_));
assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
// Interval above the idle duration.
for _ in 0..NUM_FOO_REQUESTS {
let request_interval = IDLE_DURATION * 2;
current_time += request_interval;
let proxy =
let foo =;
assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
/// If there are other connections to the outgoing directory, then the fs will not return unless
/// those connections are closed by the client. That's because we currently don't have a way to
/// escrow those connections, so we don't want to disrupt them.
#[fuchsia::test(allow_stalls = false)]
async fn some_other_outgoing_dir_connection_blocks_stalling() {
const IDLE_DURATION: Duration = Duration::from_nanos(1_000_000);
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let (initial, mock_server, fs) = setup_test(server_end).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
// We can open another connection that's not the main outgoing directory connection,
let svc = crate::directory::open_directory_no_describe(
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::DIRECTORY,
TestExecutor::advance_to(initial + IDLE_DURATION).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
Ok(ref entries)
if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ServiceA"
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
// ... and the service fs won't stall even if we wait past the timeout.
TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
// Closing that connection frees the fs to stall.
/// Emulates a component that receives a bunch of requests, processes them, and then stalls.
/// After that, if the outgoing directory is readable, serve it again. No request should be
/// dropped, and the fs should stall a bunch of times.
#[fuchsia::test(allow_stalls = false)]
async fn end_to_end() {
let initial = fasync::Time::from_nanos(0);
let mock_server = MockServer::new();
let mock_server_clone = mock_server.clone();
const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
let idle_duration = Duration::from_nanos(MIN_REQUEST_INTERVAL * 5);
let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
let component_task = async move {
let mut server_end = Some(server_end);
let mut loop_count = 0;
loop {
let mut fs = ServiceFs::new();
let mock_server_clone = mock_server_clone.clone();
.for_each_concurrent(None, move |item| mock_server_clone.handle(item))
let stalled_server_end = mock_server.server_end.lock().unwrap().take();
let Some(stalled_server_end) = stalled_server_end else {
// Client closed.
return loop_count;
server_end = Some(stalled_server_end.into());
loop_count += 1;
let component_task = fasync::Task::local(component_task);
// Make connection requests at increasing intervals, starting from below the idle duration,
// to above the idle duration.
let mut deadline = initial;
const NUM_REQUESTS: usize = 30;
for delay_factor in 0..NUM_REQUESTS {
let proxy =
deadline += Duration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
let loop_count = component_task.await;
// Why 25: there are 30 requests. The first 5 intervals are below the idle duration.
assert_eq!(loop_count, 25);
assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);