blob: ad735bae1cffbd858410b22b3ff81877315850e4 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, Result};
use async_net::unix::UnixStream;
use async_trait::async_trait;
use fidl_fuchsia_developer_ffx as ffx;
use fidl_fuchsia_gpu_agis as agis;
use protocols::prelude::*;
use std::path::Path;
#[ffx_protocol]
#[derive(Default, Debug)]
pub struct ListenerProtocol {
task_manager: tasks::TaskManager,
}
#[async_trait(?Send)]
impl FidlProtocol for ListenerProtocol {
type Protocol = ffx::ListenerMarker;
// Use a singleton within the daemon.
type StreamHandler = FidlStreamHandler<Self>;
async fn handle(
self: &ListenerProtocol,
ctx: &Context,
req: ffx::ListenerRequest,
) -> Result<(), anyhow::Error> {
match req {
ffx::ListenerRequest::Listen { responder, target_query, global_id } => {
// Retrieve the Connector proxy.
let (_target, connector) = ctx
.open_target_proxy_with_info::<agis::ConnectorMarker>(
target_query.string_matcher,
"/core/agis",
)
.await?;
// Retrieve the |ffx_socket| endpoint from the |connector|.
let ffx_socket = match connector.get_socket(global_id).await? {
Ok(socket) => fidl::handle::AsyncSocket::from_socket(socket),
Err(e) => {
responder.send(Err(e.clone()))?;
return Err(anyhow!("Connector::GetSocket error: {:?}", e));
}
};
self.task_manager.spawn(async move {
// Use async_net to establish a UnixStream and copy between ffx and unix
// sockets.
// Construct unix socket path.
let mut path: String = "/tmp/agis".to_owned();
path.push_str(&global_id.to_string());
let socket_path = Path::new(&path);
tracing::info!("FFX Daemon Agis is listening on {:?}", socket_path);
let unix_stream = match UnixStream::connect(socket_path).await {
Ok(stream) => stream,
Err(_) => {
tracing::error!("UnixStream::connect failed at {:?}", socket_path);
return;
}
};
// Split sockets for bi-directional communication.
let (read_half_ffx, mut write_half_ffx) =
futures::AsyncReadExt::split(ffx_socket);
let (read_half_unix, mut write_half_unix) =
futures::AsyncReadExt::split(unix_stream);
let unix_reader = futures::io::BufReader::with_capacity(
65536, /* buf size */
read_half_unix,
);
let ffx_reader = futures::io::BufReader::with_capacity(
65536, /* buf size */
read_half_ffx,
);
let ffx_copier = async {
// Read from ffx side, write to unix side.
match futures::io::copy_buf(ffx_reader, &mut write_half_unix).await {
Ok(_) => {
tracing::info!("ffx_to_unix copy succeeded");
}
Err(_) => {
tracing::error!("agis daemon: ffx_to_unix copy failed");
}
}
};
// Read from unix side, write to ffx side.
let unix_copier = async {
match futures::io::copy_buf(unix_reader, &mut write_half_ffx).await {
Ok(_) => {
tracing::info!("unix_to_ffx copy succeeded");
}
Err(_) => {
tracing::error!("agis daemon: unix_to_ffx copy failed");
}
}
};
let (_, _) = futures::join!(ffx_copier, unix_copier);
});
responder.send(Ok(()))?;
Ok(())
}
ffx::ListenerRequest::Shutdown { responder } => {
if self.task_manager.num_tasks() == 0 {
tracing::info!("no tasks to cancel");
return Ok(());
}
let tasks = self.task_manager.drain();
for t in tasks {
tracing::info!("cancelling task {:?}", t);
}
responder.send(Ok(()))?;
Ok(())
}
}
}
}