| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| super::ssh::build_ssh_command, |
| crate::target::Target, |
| crate::RETRY_DELAY, |
| anyhow::{anyhow, Context, Result}, |
| async_io::Async, |
| ffx_daemon_core::events, |
| ffx_daemon_events::{HostPipeErr, TargetEvent}, |
| fuchsia_async::{unblock, Task, Timer}, |
| futures::io::{copy_buf, AsyncBufRead, BufReader}, |
| futures_lite::io::AsyncBufReadExt, |
| futures_lite::stream::StreamExt, |
| hoist::OvernetInstance, |
| std::cell::RefCell, |
| std::collections::VecDeque, |
| std::fmt, |
| std::future::Future, |
| std::io, |
| std::net::SocketAddr, |
| std::process::{Child, Stdio}, |
| std::rc::Rc, |
| std::rc::Weak, |
| std::time::Duration, |
| }; |
| |
| const BUFFER_SIZE: usize = 65536; |
| |
| #[derive(Debug)] |
| pub struct LogBuffer { |
| buf: RefCell<VecDeque<String>>, |
| capacity: usize, |
| } |
| |
| impl LogBuffer { |
| pub fn new(capacity: usize) -> Self { |
| Self { buf: RefCell::new(VecDeque::with_capacity(capacity)), capacity } |
| } |
| |
| pub fn push_line(&self, line: String) { |
| let mut buf = self.buf.borrow_mut(); |
| if buf.len() == self.capacity { |
| buf.pop_front(); |
| } |
| |
| buf.push_back(line) |
| } |
| |
| pub fn lines(&self) -> Vec<String> { |
| let buf = self.buf.borrow_mut(); |
| buf.range(..).cloned().collect() |
| } |
| |
| pub fn clear(&self) { |
| let mut buf = self.buf.borrow_mut(); |
| buf.truncate(0); |
| } |
| } |
| |
| #[derive(Debug, Clone)] |
| pub struct HostAddr(String); |
| |
| impl fmt::Display for HostAddr { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| self.0.fmt(f) |
| } |
| } |
| |
| impl From<&str> for HostAddr { |
| fn from(s: &str) -> Self { |
| HostAddr(s.to_string()) |
| } |
| } |
| |
| impl From<String> for HostAddr { |
| fn from(s: String) -> Self { |
| HostAddr(s) |
| } |
| } |
| |
| struct HostPipeChild { |
| inner: Child, |
| task: Option<Task<()>>, |
| } |
| |
| impl HostPipeChild { |
| async fn new( |
| addr: SocketAddr, |
| id: u64, |
| stderr_buf: Rc<LogBuffer>, |
| event_queue: events::Queue<TargetEvent>, |
| ) -> Result<(Option<HostAddr>, HostPipeChild)> { |
| // Before running remote_control_runner, we look up the environment |
| // variable for $SSH_CONNECTION. This contains the IP address, including |
| // scope_id, of the ssh client from the perspective of the ssh server. |
| // This is useful because the target might need to use a specific |
| // interface to talk to the host device. |
| let mut ssh = build_ssh_command( |
| addr, |
| vec![ |
| "echo", |
| "++ $SSH_CONNECTION ++", |
| "&&", |
| "remote_control_runner", |
| format!("{}", id).as_str(), |
| ], |
| ) |
| .await?; |
| |
| log::debug!("Spawning new ssh instance: {:?}", ssh); |
| |
| let mut ssh = ssh |
| .stdout(Stdio::piped()) |
| .stdin(Stdio::piped()) |
| .stderr(Stdio::piped()) |
| .spawn() |
| .context("running target overnet pipe")?; |
| |
| let (pipe_rx, mut pipe_tx) = futures::AsyncReadExt::split( |
| overnet_pipe(hoist::hoist()).context("creating local overnet pipe")?, |
| ); |
| |
| let stdout = |
| Async::new(ssh.stdout.take().ok_or(anyhow!("unable to get stdout from target pipe"))?)?; |
| |
| let mut stdin = |
| Async::new(ssh.stdin.take().ok_or(anyhow!("unable to get stdin from target pipe"))?)?; |
| |
| let stderr = |
| Async::new(ssh.stderr.take().ok_or(anyhow!("unable to stderr from target pipe"))?)?; |
| |
| // Read the first line. This can be either either be an empty string "", |
| // which signifies the STDOUT has been closed, or the $SSH_CONNECTION |
| // value. |
| let mut stdout = BufReader::with_capacity(BUFFER_SIZE, stdout); |
| |
| let ssh_host_address = |
| match parse_ssh_connection(&mut stdout).await.context("reading ssh connection") { |
| Ok(Some(addr)) => Some(HostAddr(addr)), |
| Ok(None) => None, |
| Err(e) => { |
| log::error!("Failed to read ssh client address: {:?}", e); |
| None |
| } |
| }; |
| |
| let copy_in = async move { |
| if let Err(e) = copy_buf(stdout, &mut pipe_tx).await { |
| log::error!("SSH stdout read failure: {:?}", e); |
| } |
| }; |
| let copy_out = async move { |
| if let Err(e) = |
| copy_buf(BufReader::with_capacity(BUFFER_SIZE, pipe_rx), &mut stdin).await |
| { |
| log::error!("SSH stdin write failure: {:?}", e); |
| } |
| }; |
| |
| let log_stderr = async move { |
| let mut stderr_lines = futures_lite::io::BufReader::new(stderr).lines(); |
| while let Some(result) = stderr_lines.next().await { |
| match result { |
| Ok(line) => { |
| log::info!("SSH stderr: {}", line); |
| stderr_buf.push_line(line.clone()); |
| event_queue |
| .push(TargetEvent::SshHostPipeErr(HostPipeErr::from(line))) |
| .unwrap_or_else(|e| { |
| log::warn!("queueing host pipe err event: {:?}", e) |
| }); |
| } |
| Err(e) => log::error!("SSH stderr read failure: {:?}", e), |
| } |
| } |
| }; |
| |
| Ok(( |
| ssh_host_address, |
| HostPipeChild { |
| inner: ssh, |
| task: Some(Task::local(async move { |
| futures::join!(copy_in, copy_out, log_stderr); |
| })), |
| }, |
| )) |
| } |
| |
| fn kill(&mut self) -> io::Result<()> { |
| self.inner.kill() |
| } |
| |
| fn wait(&mut self) -> io::Result<std::process::ExitStatus> { |
| self.inner.wait() |
| } |
| } |
| |
| #[derive(Debug, thiserror::Error)] |
| enum ParseSshConnectionError { |
| #[error(transparent)] |
| Io(#[from] std::io::Error), |
| #[error("Parse error: {:?}", .0)] |
| Parse(String), |
| } |
| |
| async fn parse_ssh_connection<R: AsyncBufRead + Unpin>( |
| rdr: &mut R, |
| ) -> std::result::Result<Option<String>, ParseSshConnectionError> { |
| let mut line = String::new(); |
| rdr.read_line(&mut line).await.map_err(ParseSshConnectionError::Io)?; |
| |
| if line.is_empty() { |
| return Ok(None); |
| } |
| |
| let mut parts = line.split(' '); |
| |
| // The first part should be our anchor. |
| match parts.next() { |
| Some("++") => {} |
| Some(_) | None => { |
| return Err(ParseSshConnectionError::Parse(line)); |
| } |
| } |
| |
| // The next part should be the client address. This is left as a string since |
| // std::net::IpAddr does not support string scope_ids. |
| let client_address = if let Some(part) = parts.next() { |
| part |
| } else { |
| return Err(ParseSshConnectionError::Parse(line)); |
| }; |
| |
| // Followed by the client port. |
| let _client_port = if let Some(part) = parts.next() { |
| part |
| } else { |
| return Err(ParseSshConnectionError::Parse(line)); |
| }; |
| |
| // Followed by the server address. |
| let _server_address = if let Some(part) = parts.next() { |
| part |
| } else { |
| return Err(ParseSshConnectionError::Parse(line)); |
| }; |
| |
| // Followed by the server port. |
| let _server_port = if let Some(part) = parts.next() { |
| part |
| } else { |
| return Err(ParseSshConnectionError::Parse(line)); |
| }; |
| |
| // The last part should be our anchor. |
| match parts.next() { |
| Some("++\n") => {} |
| Some(_) | None => { |
| return Err(ParseSshConnectionError::Parse(line)); |
| } |
| } |
| |
| // Finally, there should be nothing left. |
| if let Some(_) = parts.next() { |
| return Err(ParseSshConnectionError::Parse(line)); |
| } |
| |
| Ok(Some(client_address.to_string())) |
| } |
| |
| impl Drop for HostPipeChild { |
| fn drop(&mut self) { |
| match self.inner.try_wait() { |
| Ok(Some(result)) => { |
| log::info!("HostPipeChild exited with {}", result); |
| } |
| Ok(None) => { |
| let _ = |
| self.kill().map_err(|e| log::warn!("failed to kill HostPipeChild: {:?}", e)); |
| let _ = self |
| .wait() |
| .map_err(|e| log::warn!("failed to clean up HostPipeChild: {:?}", e)); |
| } |
| Err(e) => { |
| log::warn!("failed to soft-wait HostPipeChild: {:?}", e); |
| // defensive kill & wait, both may fail. |
| let _ = |
| self.kill().map_err(|e| log::warn!("failed to kill HostPipeChild: {:?}", e)); |
| let _ = self |
| .wait() |
| .map_err(|e| log::warn!("failed to clean up HostPipeChild: {:?}", e)); |
| } |
| }; |
| |
| drop(self.task.take()); |
| } |
| } |
| |
| pub struct HostPipeConnection {} |
| |
| impl HostPipeConnection { |
| pub fn new(target: Weak<Target>) -> impl Future<Output = Result<()>> { |
| HostPipeConnection::new_with_cmd(target, HostPipeChild::new, RETRY_DELAY) |
| } |
| |
| async fn new_with_cmd<F>( |
| target: Weak<Target>, |
| cmd_func: impl FnOnce(SocketAddr, u64, Rc<LogBuffer>, events::Queue<TargetEvent>) -> F |
| + Copy |
| + 'static, |
| relaunch_command_delay: Duration, |
| ) -> Result<()> |
| where |
| F: futures::Future<Output = Result<(Option<HostAddr>, HostPipeChild)>>, |
| { |
| loop { |
| let target = target.upgrade().ok_or(anyhow!("Target has gone"))?; |
| let target_nodename = target.nodename(); |
| log::debug!("Spawning new host-pipe instance to target {:?}", target_nodename); |
| let log_buf = target.host_pipe_log_buffer(); |
| log_buf.clear(); |
| |
| let ssh_address = target.ssh_address().ok_or_else(|| { |
| anyhow!("target {:?} does not yet have an ssh address", target_nodename) |
| })?; |
| let (host_addr, mut cmd) = |
| cmd_func(ssh_address, target.id(), log_buf.clone(), target.events.clone()) |
| .await |
| .with_context(|| { |
| format!("creating host-pipe command to target {:?}", target_nodename) |
| })?; |
| |
| *target.ssh_host_address.borrow_mut() = host_addr; |
| |
| // Attempts to run the command. If it exits successfully (disconnect |
| // due to peer dropping) then will set the target to disconnected |
| // state. If there was an error running the command for some reason, |
| // then continue and attempt to run the command again. |
| let res = unblock(move || cmd.wait()).await.map_err(|e| { |
| anyhow!( |
| "host-pipe error to target {:?} running try-wait: {}", |
| target_nodename, |
| e.to_string() |
| ) |
| }); |
| log::debug!("host-pipe command res: {:?}", res); |
| |
| target.ssh_host_address.borrow_mut().take(); |
| |
| match res { |
| Ok(_) => { |
| return Ok(()); |
| } |
| Err(e) => log::debug!("running cmd on {:?}: {:#?}", target_nodename, e), |
| } |
| |
| // TODO(fxbug.dev/52038): Want an exponential backoff that |
| // is sync'd with an explicit "try to start this again |
| // anyway" channel using a select! between the two of them. |
| Timer::new(relaunch_command_delay).await; |
| } |
| } |
| } |
| |
| fn overnet_pipe(overnet_instance: &dyn OvernetInstance) -> Result<fidl::AsyncSocket> { |
| let (local_socket, remote_socket) = fidl::Socket::create(fidl::SocketOpts::STREAM)?; |
| let local_socket = fidl::AsyncSocket::from_socket(local_socket)?; |
| overnet_instance.connect_as_mesh_controller()?.attach_socket_link(remote_socket)?; |
| |
| Ok(local_socket) |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use addr::TargetAddr; |
| use assert_matches::assert_matches; |
| use std::rc::Rc; |
| |
| const ERR_CTX: &'static str = "running fake host-pipe command for test"; |
| |
| impl HostPipeChild { |
| /// Implements some fake join handles that wait on a join command before |
| /// closing. The reader and writer handles don't do anything other than |
| /// spin until they receive a message to stop. |
| pub fn fake_new(child: Child) -> Self { |
| Self { inner: child, task: Some(Task::local(async {})) } |
| } |
| } |
| |
| async fn start_child_normal_operation( |
| _addr: SocketAddr, |
| _id: u64, |
| _buf: Rc<LogBuffer>, |
| _events: events::Queue<TargetEvent>, |
| ) -> Result<(Option<HostAddr>, HostPipeChild)> { |
| Ok(( |
| Some(HostAddr("127.0.0.1".to_string())), |
| HostPipeChild::fake_new( |
| std::process::Command::new("echo") |
| .arg("127.0.0.1 44315 192.168.1.1 22") |
| .stdout(Stdio::piped()) |
| .stdin(Stdio::piped()) |
| .spawn() |
| .context(ERR_CTX)?, |
| ), |
| )) |
| } |
| |
| async fn start_child_internal_failure( |
| _addr: SocketAddr, |
| _id: u64, |
| _buf: Rc<LogBuffer>, |
| _events: events::Queue<TargetEvent>, |
| ) -> Result<(Option<HostAddr>, HostPipeChild)> { |
| Err(anyhow!(ERR_CTX)) |
| } |
| |
| async fn start_child_ssh_failure( |
| _addr: SocketAddr, |
| _id: u64, |
| _buf: Rc<LogBuffer>, |
| events: events::Queue<TargetEvent>, |
| ) -> Result<(Option<HostAddr>, HostPipeChild)> { |
| events.push(TargetEvent::SshHostPipeErr(HostPipeErr::Unknown("foo".to_string()))).unwrap(); |
| Ok(( |
| Some(HostAddr("127.0.0.1".to_string())), |
| HostPipeChild::fake_new( |
| std::process::Command::new("echo") |
| .arg("127.0.0.1 44315 192.168.1.1 22") |
| .stdout(Stdio::piped()) |
| .stdin(Stdio::piped()) |
| .spawn() |
| .context(ERR_CTX)?, |
| ), |
| )) |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_host_pipe_start_and_stop_normal_operation() { |
| let target = crate::target::Target::new_with_addrs( |
| Some("flooooooooberdoober"), |
| [TargetAddr::new("192.168.1.1:22").unwrap()].into(), |
| ); |
| let res = HostPipeConnection::new_with_cmd( |
| Rc::downgrade(&target), |
| start_child_normal_operation, |
| Duration::default(), |
| ) |
| .await; |
| assert_matches!(res, Ok(_)); |
| // Shouldn't panic when dropped. |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_host_pipe_start_and_stop_internal_failure() { |
| // TODO(awdavies): Verify the error matches. |
| let target = crate::target::Target::new_with_addrs( |
| Some("flooooooooberdoober"), |
| [TargetAddr::new("192.168.1.1:22").unwrap()].into(), |
| ); |
| let res = HostPipeConnection::new_with_cmd( |
| Rc::downgrade(&target), |
| start_child_internal_failure, |
| Duration::default(), |
| ) |
| .await; |
| assert!(res.is_err()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_host_pipe_start_and_stop_ssh_failure() { |
| let target = crate::target::Target::new_with_addrs( |
| Some("flooooooooberdoober"), |
| [TargetAddr::new("192.168.1.1:22").unwrap()].into(), |
| ); |
| let events = target.events.clone(); |
| let task = Task::local(async move { |
| events |
| .wait_for(None, |e| { |
| assert_matches!(e, TargetEvent::SshHostPipeErr(_)); |
| true |
| }) |
| .await |
| .unwrap(); |
| }); |
| // This is here to allow for the above task to get polled so that the `wait_for` can be |
| // placed on at the appropriate time (before the failure occurs in the function below). |
| futures_lite::future::yield_now().await; |
| let res = HostPipeConnection::new_with_cmd( |
| Rc::downgrade(&target), |
| start_child_ssh_failure, |
| Duration::default(), |
| ) |
| .await; |
| assert_matches!(res, Ok(_)); |
| // If things are not setup correctly this will hang forever. |
| task.await; |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_parse_ssh_connection_works() { |
| for (line, expected) in [ |
| (&""[..], None), |
| (&"++ 192.168.1.1 1234 10.0.0.1 22 ++\n"[..], Some("192.168.1.1".to_string())), |
| ( |
| &"++ fe80::111:2222:3333:444 56671 10.0.0.1 22 ++\n", |
| Some("fe80::111:2222:3333:444".to_string()), |
| ), |
| ( |
| &"++ fe80::111:2222:3333:444%ethxc2 56671 10.0.0.1 22 ++\n", |
| Some("fe80::111:2222:3333:444%ethxc2".to_string()), |
| ), |
| ] { |
| match parse_ssh_connection(&mut line.as_bytes()).await { |
| Ok(actual) => assert_eq!(expected, actual), |
| res => panic!( |
| "unexpected result for {:?}: expected {:?}, got {:?}", |
| line, expected, res |
| ), |
| } |
| } |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_parse_ssh_connection_errors() { |
| for line in [ |
| // Test for invalid anchors |
| &"192.168.1.1 1234 10.0.0.1 22"[..], |
| &"++192.168.1.1 1234 10.0.0.1 22++"[..], |
| &"++192.168.1.1 1234 10.0.0.1 22 ++"[..], |
| &"++ 192.168.1.1 1234 10.0.0.1 22++"[..], |
| &"## 192.168.1.1 1234 10.0.0.1 22 ##"[..], |
| // Truncation |
| &"++"[..], |
| &"++ 192.168.1.1"[..], |
| &"++ 192.168.1.1 1234"[..], |
| &"++ 192.168.1.1 1234 "[..], |
| &"++ 192.168.1.1 1234 10.0.0.1"[..], |
| &"++ 192.168.1.1 1234 10.0.0.1 22"[..], |
| &"++ 192.168.1.1 1234 10.0.0.1 22 "[..], |
| &"++ 192.168.1.1 1234 10.0.0.1 22 ++"[..], |
| ] { |
| match parse_ssh_connection(&mut line.as_bytes()).await { |
| Err(ParseSshConnectionError::Parse(actual)) => { |
| assert_eq!(line, actual); |
| } |
| res => panic!("unexpected result for {:?}: {:?}", line, res), |
| } |
| } |
| } |
| |
| #[test] |
| fn test_log_buffer_empty() { |
| let buf = LogBuffer::new(2); |
| assert!(buf.lines().is_empty()); |
| } |
| |
| #[test] |
| fn test_log_buffer() { |
| let buf = LogBuffer::new(2); |
| |
| buf.push_line(String::from("1")); |
| buf.push_line(String::from("2")); |
| buf.push_line(String::from("3")); |
| |
| assert_eq!(buf.lines(), vec![String::from("2"), String::from("3")]); |
| } |
| |
| #[test] |
| fn test_clear_log_buffer() { |
| let buf = LogBuffer::new(2); |
| |
| buf.push_line(String::from("1")); |
| buf.push_line(String::from("2")); |
| |
| buf.clear(); |
| |
| assert!(buf.lines().is_empty()); |
| } |
| } |