blob: 2b6d7321f88be62770ae83927517c391e3d1687c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::constants::{get_socket, RETRY_DELAY},
crate::ssh::build_ssh_command,
crate::target::{ConnectionState, TargetAddr, WeakTarget},
anyhow::{anyhow, Context, Result},
ascendd::Ascendd,
async_std::io::prelude::BufReadExt,
async_std::prelude::StreamExt,
fuchsia_async::{Task, Timer},
futures::channel::oneshot,
futures::future::FutureExt,
futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
hoist::OvernetInstance,
std::future::Future,
std::io,
std::os::unix::io::{FromRawFd, IntoRawFd},
std::process::{Child, Stdio},
std::sync::Arc,
std::time::Duration,
};
fn async_from_sync<S: IntoRawFd>(sync: S) -> async_std::fs::File {
unsafe { async_std::fs::File::from_raw_fd(sync.into_raw_fd()) }
}
type TaskHandle = Task<Result<()>>;
struct HostPipeChild {
inner: Child,
// These are wrapped in Option so drop(&mut self) can consume them
cancel_tx: Option<oneshot::Sender<()>>,
writer_handle: Option<TaskHandle>,
reader_handle: Option<TaskHandle>,
logger_handle: Option<TaskHandle>,
}
async fn latency_sensitive_copy(
reader: &mut (impl AsyncRead + Unpin),
writer: &mut (impl AsyncWrite + Unpin),
) -> std::io::Result<()> {
let mut buf = [0u8; 2048];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
return Ok(());
}
writer.write_all(&buf[..n]).await?;
writer.flush().await?;
}
}
impl HostPipeChild {
pub async fn new(ascendd: Arc<Ascendd>, addrs: Vec<TargetAddr>) -> Result<HostPipeChild> {
let mut inner = build_ssh_command(addrs, vec!["remote_control_runner"])
.await?
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("running target overnet pipe")?;
let (mut pipe_rx, mut pipe_tx) = futures::AsyncReadExt::split(
overnet_pipe(&*ascendd).context("creating local overnet pipe")?,
);
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
let stdout_pipe =
inner.stdout.take().ok_or(anyhow!("unable to get stdout from target pipe"))?;
let writer_handle = Task::spawn(async move {
latency_sensitive_copy(&mut async_from_sync(stdout_pipe), &mut pipe_tx).await?;
log::info!("exiting onet pipe writer task (client -> ascendd)");
Ok(())
});
let stdin_pipe =
inner.stdin.take().ok_or(anyhow!("unable to get stdin from target pipe"))?;
let reader_handle = Task::spawn(async move {
let mut stdin_pipe = async_from_sync(stdin_pipe);
let mut cancel_rx = cancel_rx.fuse();
futures::select! {
copy_res = latency_sensitive_copy(&mut pipe_rx, &mut stdin_pipe).fuse() => copy_res?,
_ = cancel_rx => (),
};
log::info!("exiting onet pipe reader task (ascendd -> client)");
Ok(())
});
let stderr_pipe =
inner.stderr.take().ok_or(anyhow!("unable to stderr from target pipe"))?;
let logger_handle = Task::spawn(async move {
let stderr_pipe = async_from_sync(stderr_pipe);
let mut stderr_lines = async_std::io::BufReader::new(stderr_pipe).lines();
while let Some(line) = stderr_lines.next().await {
log::info!("SSH stderr: {}", line?);
}
Ok(())
});
Ok(HostPipeChild {
inner,
cancel_tx: Some(cancel_tx),
writer_handle: Some(writer_handle),
reader_handle: Some(reader_handle),
logger_handle: Some(logger_handle),
})
}
pub fn kill(&mut self) -> io::Result<()> {
self.inner.kill()
}
pub fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
self.inner.wait()
}
}
impl Drop for HostPipeChild {
fn drop(&mut self) {
// Ignores whether the receiver has been closed, this is just to
// un-stick it from an attempt at reading from Ascendd.
self.cancel_tx.take().map(|c| c.send(()));
let _ = self.kill().map_err(|e| log::warn!("failed to kill HostPipeChild: {:?}", e));
let _ = self.wait().map_err(|e| log::warn!("failed to clean up HostPipeChild: {:?}", e));
let reader_result = self.reader_handle.take().map(|rh| futures::executor::block_on(rh));
let writer_result = self.writer_handle.take().map(|wh| futures::executor::block_on(wh));
let logger_result = self.logger_handle.take().map(|lh| futures::executor::block_on(lh));
log::trace!(
"Dropped HostPipeChild. Writer result: '{:?}'; reader result: '{:?}'; logger result: '{:?}'",
writer_result,
reader_result,
logger_result,
);
}
}
pub struct HostPipeConnection {}
impl HostPipeConnection {
pub fn new(target: WeakTarget) -> impl Future<Output = Result<(), String>> + Send {
HostPipeConnection::new_with_cmd(target, HostPipeChild::new, RETRY_DELAY)
}
async fn new_with_cmd<F>(
target: WeakTarget,
cmd_func: impl FnOnce(Arc<Ascendd>, Vec<TargetAddr>) -> F + Send + Copy + 'static,
relaunch_command_delay: Duration,
) -> Result<(), String>
where
F: futures::Future<Output = Result<HostPipeChild>> + Send,
{
loop {
log::debug!("Spawning new host-pipe instance");
let target = target.upgrade().ok_or("parent Arc<> lost. exiting".to_owned())?;
let addrs = target.addrs().await;
let mut cmd =
cmd_func(target.ascendd().clone(), addrs).await.map_err(|e| e.to_string())?;
// Attempts to run the command. If it exits successfully (disconnect due to peer
// dropping) then will set the target to disconnected state. If
// there was an error running the command for some reason, then
// continue and attempt to run the command again.
match Task::blocking(async move { cmd.wait() })
.await
.map_err(|e| format!("host-pipe error running try-wait: {}", e.to_string()))
{
Ok(_) => {
target
.update_connection_state(|s| match s {
ConnectionState::Rcs(_) => ConnectionState::Disconnected,
_ => s,
})
.await;
log::debug!("rcs disconnected, exiting");
return Ok(());
}
Err(e) => log::debug!("running cmd: {:#?}", e),
}
// TODO(fxbug.dev/52038): Want an exponential backoff that
// is sync'd with an explicit "try to start this again
// anyway" channel using a select! between the two of them.
Timer::new(relaunch_command_delay).await;
}
}
}
pub async fn create_ascendd() -> Result<Ascendd> {
log::info!("Starting ascendd");
Ascendd::new(
ascendd::Opt { sockpath: Some(get_socket().await), ..Default::default() },
// TODO: this just prints serial output to stdout - ffx probably wants to take a more
// nuanced approach here.
Box::new(async_std::io::stdout()),
)
}
fn overnet_pipe(overnet_instance: &dyn OvernetInstance) -> Result<fidl::AsyncSocket> {
let (local_socket, remote_socket) = fidl::Socket::create(fidl::SocketOpts::STREAM)?;
let local_socket = fidl::AsyncSocket::from_socket(local_socket)?;
overnet_instance.connect_as_mesh_controller()?.attach_socket_link(remote_socket)?;
Ok(local_socket)
}
#[cfg(test)]
mod test {
use super::*;
const ERR_CTX: &'static str = "running fake host-pipe command for test";
impl HostPipeChild {
/// Implements some fake join handles that wait on a join command before
/// closing. The reader and writer handles don't do anything other than
/// spin until they receive a message to stop.
pub fn fake_new(child: Child) -> Self {
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
let (other_cancel_tx, other_cancel_rx) = oneshot::channel::<()>();
Self {
inner: child,
cancel_tx: Some(cancel_tx),
writer_handle: Some(Task::spawn(async move {
cancel_rx.await.context("host-pipe fake test writer")?;
other_cancel_tx.send(()).unwrap();
Ok(())
})),
reader_handle: Some(Task::spawn(async move {
other_cancel_rx.await.context("host-pipe fake test writer")?;
Ok(())
})),
logger_handle: Some(Task::spawn(async { Ok(()) })),
}
}
}
async fn start_child_normal_operation(
_ascendd: Arc<Ascendd>,
_t: Vec<TargetAddr>,
) -> Result<HostPipeChild> {
Ok(HostPipeChild::fake_new(
std::process::Command::new("yes")
.arg("test-command")
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.spawn()
.context(ERR_CTX)?,
))
}
async fn start_child_internal_failure(
_ascendd: Arc<Ascendd>,
_t: Vec<TargetAddr>,
) -> Result<HostPipeChild> {
Err(anyhow!(ERR_CTX))
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_host_pipe_start_and_stop_normal_operation() {
let ascendd = Arc::new(create_ascendd().await.unwrap());
let target = crate::target::Target::new(ascendd, "flooooooooberdoober");
let _conn = HostPipeConnection::new_with_cmd(
target.downgrade(),
start_child_normal_operation,
Duration::default(),
);
// Shouldn't panic when dropped.
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_host_pipe_start_and_stop_internal_failure() {
// TODO(awdavies): Verify the error matches.
let ascendd = Arc::new(create_ascendd().await.unwrap());
let target = crate::target::Target::new(ascendd, "flooooooooberdoober");
let conn = HostPipeConnection::new_with_cmd(
target.downgrade(),
start_child_internal_failure,
Duration::default(),
);
assert!(conn.await.is_err());
}
}