| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::constants::{MAX_RETRY_COUNT, RETRY_DELAY, SOCKET}, |
| crate::discovery::{TargetFinder, TargetFinderConfig}, |
| crate::logger::setup_logger, |
| crate::mdns::MdnsTargetFinder, |
| crate::ok_or_continue, |
| crate::onet, |
| crate::ssh::build_ssh_command, |
| crate::target::{RCSConnection, Target, TargetCollection}, |
| anyhow::{anyhow, Context, Error}, |
| async_std::task, |
| async_trait::async_trait, |
| fidl::endpoints::{ClientEnd, RequestStream, ServiceMarker}, |
| fidl_fuchsia_developer_bridge::{ |
| DaemonError, DaemonMarker, DaemonRequest, DaemonRequestStream, |
| }, |
| fidl_fuchsia_developer_remotecontrol::RemoteControlMarker, |
| fidl_fuchsia_overnet::{ |
| ServiceConsumerProxyInterface, ServiceProviderRequest, ServiceProviderRequestStream, |
| }, |
| futures::channel::mpsc, |
| futures::lock::Mutex, |
| futures::prelude::*, |
| hoist::spawn, |
| std::rc::Rc, |
| std::sync::Arc, |
| std::time::Duration, |
| }; |
| |
| #[async_trait] |
| pub trait DiscoveryHook { |
| async fn on_new_target(&self, target: &Arc<Target>, tc: &Arc<TargetCollection>); |
| } |
| |
| #[derive(Default)] |
| struct RCSActivatorHook {} |
| |
| #[async_trait] |
| impl DiscoveryHook for RCSActivatorHook { |
| async fn on_new_target(&self, target: &Arc<Target>, _tc: &Arc<TargetCollection>) { |
| let addrs_clone = target.clone_addrs().await; |
| let mut state = target.state.lock().await; |
| if state.overnet_started { |
| return; |
| } |
| |
| { |
| let mut host_pipe = state.host_pipe.lock().await; |
| |
| if host_pipe.is_none() { |
| match onet::connect_to_onet(&target, addrs_clone).await { |
| Ok(c) => { |
| host_pipe.replace(c); |
| } |
| Err(e) => { |
| log::warn!( |
| "failed to start host-pipe process for '{}': {}", |
| target.nodename, |
| e |
| ); |
| return; |
| } |
| } |
| } |
| } |
| |
| match Daemon::start_remote_control(&target).await { |
| Ok(()) => state.overnet_started = true, |
| Err(e) => { |
| log::warn!("unable to start remote control for '{}': {}", target.nodename, e); |
| return; |
| } |
| } |
| } |
| } |
| |
| // Daemon |
| #[derive(Clone)] |
| pub struct Daemon { |
| target_collection: Arc<TargetCollection>, |
| |
| discovered_target_hooks: Arc<Mutex<Vec<Rc<dyn DiscoveryHook>>>>, |
| } |
| |
| impl Daemon { |
| pub async fn new() -> Result<Daemon, Error> { |
| log::info!("Starting daemon overnet server"); |
| let (tx, rx) = mpsc::unbounded::<Target>(); |
| let target_collection = Arc::new(TargetCollection::new()); |
| let discovered_target_hooks = Arc::new(Mutex::new(Vec::<Rc<dyn DiscoveryHook>>::new())); |
| Daemon::spawn_receiver_loop(rx, target_collection.clone(), discovered_target_hooks.clone()); |
| Daemon::spawn_onet_discovery(target_collection.clone()); |
| let mut d = Daemon { |
| target_collection: target_collection.clone(), |
| discovered_target_hooks: discovered_target_hooks.clone(), |
| }; |
| d.register_hook(RCSActivatorHook::default()).await; |
| |
| // MDNS must be started as late as possible to avoid races with registered |
| // hooks. |
| let config = |
| TargetFinderConfig { broadcast_interval: Duration::from_secs(120), mdns_ttl: 255 }; |
| let mdns = MdnsTargetFinder::new(&config)?; |
| mdns.start(&tx)?; |
| |
| Ok(d) |
| } |
| |
| pub async fn register_hook(&mut self, cb: impl DiscoveryHook + 'static) { |
| let mut hooks = self.discovered_target_hooks.lock().await; |
| hooks.push(Rc::new(cb)); |
| } |
| |
| pub fn spawn_receiver_loop( |
| mut rx: mpsc::UnboundedReceiver<Target>, |
| tc: Arc<TargetCollection>, |
| hooks: Arc<Mutex<Vec<Rc<dyn DiscoveryHook>>>>, |
| ) { |
| spawn(async move { |
| loop { |
| let target = rx.next().await.unwrap(); |
| let target_clone = tc.merge_insert(target).await; |
| let tc_clone = tc.clone(); |
| let hooks_clone = (*hooks.lock().await).clone(); |
| spawn(async move { |
| futures::future::join_all( |
| hooks_clone.iter().map(|hook| hook.on_new_target(&target_clone, &tc_clone)), |
| ) |
| .await; |
| }); |
| } |
| }); |
| } |
| |
| #[cfg(test)] |
| pub fn new_with_rx(rx: mpsc::UnboundedReceiver<Target>) -> Daemon { |
| let target_collection = Arc::new(TargetCollection::new()); |
| let discovered_target_hooks = Arc::new(Mutex::new(Vec::<Rc<dyn DiscoveryHook>>::new())); |
| Daemon::spawn_receiver_loop(rx, target_collection.clone(), discovered_target_hooks.clone()); |
| Daemon { target_collection, discovered_target_hooks } |
| } |
| |
| pub async fn handle_requests_from_stream( |
| &self, |
| mut stream: DaemonRequestStream, |
| quiet: bool, |
| ) -> Result<(), Error> { |
| while let Some(req) = stream.try_next().await? { |
| self.handle_request(req, quiet).await?; |
| } |
| Ok(()) |
| } |
| |
| pub fn spawn_onet_discovery(tc: Arc<TargetCollection>) { |
| spawn(async move { |
| let svc = hoist::connect_as_service_consumer().unwrap(); |
| loop { |
| let peers = svc.list_peers().await.unwrap(); |
| for mut peer in peers { |
| if peer.description.services.is_none() { |
| continue; |
| } |
| if peer |
| .description |
| .services |
| .unwrap() |
| .iter() |
| .find(|name| *name == RemoteControlMarker::NAME) |
| .is_none() |
| { |
| continue; |
| } |
| if tc.get(peer.id.id.into()).await.is_some() { |
| continue; |
| } |
| let remote_control_proxy = ok_or_continue!(RCSConnection::new(&mut peer.id) |
| .await |
| .context("unable to convert proxy to target")); |
| let target = ok_or_continue!( |
| Target::from_rcs_connection(remote_control_proxy).await, |
| "unable to convert proxy to target", |
| ); |
| tc.merge_insert(target).await; |
| } |
| } |
| }); |
| } |
| |
| async fn start_remote_control(target: &Target) -> Result<(), Error> { |
| for _ in 0..MAX_RETRY_COUNT { |
| let args = [ |
| "run", |
| "fuchsia-pkg://fuchsia.com/remote-control-runner#meta/remote-control-runner.cmx", |
| ]; |
| let mut cmd = build_ssh_command(target.clone_addrs().await, args.to_vec()).await?; |
| |
| let output = cmd |
| .stdin(std::process::Stdio::null()) |
| .output() |
| .context("Failed to SSH into device")?; |
| if output.stdout.starts_with(b"Successfully") { |
| return Ok(()); |
| } |
| task::sleep(RETRY_DELAY).await; |
| } |
| |
| Err(anyhow!("Starting RCS failed. Check target system logs for details.")) |
| } |
| |
| /// Attempts to get at most one target. If there is more than one target, |
| /// returns an error. |
| /// TODO(fxb/47843): Implement target lookup for commands to deprecate this |
| /// function, and as a result remove the inner_lock() function. |
| async fn target_from_cache(&self) -> Result<Arc<Target>, Error> { |
| let targets = self.target_collection.inner_lock().await; |
| if targets.len() > 1 { |
| return Err(anyhow!("more than one target")); |
| } |
| |
| match targets.values().next() { |
| Some(t) => Ok(t.clone()), |
| None => Err(anyhow!("no targets found")), |
| } |
| } |
| |
| pub async fn handle_request(&self, req: DaemonRequest, quiet: bool) -> Result<(), Error> { |
| log::debug!("daemon received request: {:?}", req); |
| match req { |
| DaemonRequest::EchoString { value, responder } => { |
| if !quiet { |
| log::info!("Received echo request for string {:?}", value); |
| } |
| responder.send(value.as_ref()).context("error sending response")?; |
| if !quiet { |
| log::info!("echo response sent successfully"); |
| } |
| } |
| DaemonRequest::ListTargets { value, responder } => { |
| if !quiet { |
| log::info!("Received list target request for '{:?}'", value); |
| } |
| // TODO(awdavies): Make this into a common format for easy |
| // parsing. |
| let response = match value.as_ref() { |
| "" => futures::future::join_all( |
| self.target_collection.targets().await.iter().map(|t| t.to_string_async()), |
| ) |
| .await |
| .join("\n"), |
| _ => format!( |
| "{}", |
| match self.target_collection.get(value.into()).await { |
| Some(t) => t.to_string_async().await, |
| None => String::new(), |
| } |
| ), |
| }; |
| responder.send(response.as_ref()).context("error sending response")?; |
| } |
| DaemonRequest::GetRemoteControl { remote, responder } => { |
| let target = match self.target_from_cache().await { |
| Ok(t) => t, |
| Err(e) => { |
| log::warn!("{}", e); |
| responder |
| .send(&mut Err(DaemonError::TargetCacheError)) |
| .context("sending error response")?; |
| return Ok(()); |
| } |
| }; |
| let mut target_state = |
| match target.wait_for_state_with_rcs(MAX_RETRY_COUNT, RETRY_DELAY).await { |
| Ok(state) => state, |
| Err(e) => { |
| log::warn!("{}", e); |
| responder |
| .send(&mut Err(DaemonError::TargetStateError)) |
| .context("sending error response")?; |
| return Ok(()); |
| } |
| }; |
| let mut response = target_state |
| .rcs |
| .as_mut() |
| .unwrap() |
| .copy_to_channel(remote.into_channel()) |
| .map_err(|_| DaemonError::RcsConnectionError); |
| responder.send(&mut response).context("error sending response")?; |
| } |
| DaemonRequest::Quit { responder } => { |
| if !quiet { |
| log::info!("Received quit request."); |
| } |
| responder.send(true).context("error sending response")?; |
| |
| task::sleep(std::time::Duration::from_millis(10)).await; |
| |
| match std::fs::remove_file(SOCKET) { |
| Ok(()) => {} |
| Err(e) => log::error!("failed to remove socket file: {}", e), |
| } |
| |
| let targets = self.target_collection.targets().await; |
| |
| for t in targets.iter() { |
| let t = t.clone(); |
| |
| let state = t.state.lock().await; |
| let mut child_lock = state.host_pipe.lock().await; |
| if let Some(mut child) = child_lock.take() { |
| child.kill()?; |
| } |
| } |
| |
| std::process::exit(0); |
| } |
| } |
| Ok(()) |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Overnet Server implementation |
| |
| async fn next_request( |
| stream: &mut ServiceProviderRequestStream, |
| ) -> Result<Option<ServiceProviderRequest>, Error> { |
| Ok(stream.try_next().await.context("error running service provider server")?) |
| } |
| |
| async fn exec_server(daemon: Daemon, quiet: bool) -> Result<(), Error> { |
| let (s, p) = fidl::Channel::create().context("failed to create zx channel")?; |
| let chan = fidl::AsyncChannel::from_channel(s).context("failed to make async channel")?; |
| let mut stream = ServiceProviderRequestStream::from_channel(chan); |
| hoist::publish_service(DaemonMarker::NAME, ClientEnd::new(p))?; |
| while let Some(ServiceProviderRequest::ConnectToService { |
| chan, |
| info: _, |
| control_handle: _control_handle, |
| }) = next_request(&mut stream).await? |
| { |
| if !quiet { |
| log::trace!("Received service request for service"); |
| } |
| let chan = |
| fidl::AsyncChannel::from_channel(chan).context("failed to make async channel")?; |
| let daemon_clone = daemon.clone(); |
| spawn(async move { |
| daemon_clone |
| .handle_requests_from_stream(DaemonRequestStream::from_channel(chan), quiet) |
| .await |
| .unwrap_or_else(|err| panic!("fatal error handling request: {:?}", err)); |
| }); |
| } |
| Ok(()) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // start |
| |
| pub fn is_daemon_running() -> bool { |
| // Try to connect directly to the socket. This will fail if nothing is listening on the other side |
| // (even if the path exists). |
| match std::os::unix::net::UnixStream::connect(SOCKET) { |
| Ok(_) => true, |
| Err(_) => false, |
| } |
| } |
| |
| pub async fn start() -> Result<(), Error> { |
| if is_daemon_running() { |
| return Ok(()); |
| } |
| setup_logger("ffx.daemon").await; |
| onet::start_ascendd().await; |
| let daemon = Daemon::new().await?; |
| exec_server(daemon, true).await |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // tests |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use crate::target::TargetState; |
| use chrono::Utc; |
| use fidl::endpoints::create_proxy; |
| use fidl_fuchsia_developer_bridge::DaemonMarker; |
| use fidl_fuchsia_developer_remotecontrol::{ |
| RemoteControlMarker, RemoteControlProxy, RemoteControlRequest, |
| }; |
| use fidl_fuchsia_overnet_protocol::NodeId; |
| use std::collections::HashSet; |
| |
| struct TestHookFakeRCS { |
| ready_channel: mpsc::UnboundedSender<bool>, |
| } |
| |
| impl TestHookFakeRCS { |
| pub fn new(ready_channel: mpsc::UnboundedSender<bool>) -> Self { |
| Self { ready_channel } |
| } |
| } |
| |
| #[async_trait] |
| impl DiscoveryHook for TestHookFakeRCS { |
| async fn on_new_target(&self, target: &Arc<Target>, _tc: &Arc<TargetCollection>) { |
| let mut target_state = target.state.lock().await; |
| target_state.rcs = match &target_state.rcs { |
| Some(_) => panic!("fake RCS should be set at most once"), |
| None => Some(RCSConnection::new_with_proxy( |
| setup_fake_target_service(), |
| &NodeId { id: 0u64 }, |
| )), |
| }; |
| self.ready_channel.unbounded_send(true).unwrap(); |
| } |
| } |
| |
| struct TargetControlChannels { |
| target_ready_channel: mpsc::UnboundedReceiver<bool>, |
| target_detected_channel: mpsc::UnboundedSender<Target>, |
| } |
| |
| impl TargetControlChannels { |
| pub async fn send_target(&mut self, t: Target) { |
| self.target_detected_channel.unbounded_send(t).unwrap(); |
| assert!(self.next_target_ready().await); |
| } |
| |
| pub async fn next_target_ready(&mut self) -> bool { |
| self.target_ready_channel.next().await.unwrap() |
| } |
| } |
| |
| async fn spawn_daemon_server_with_target_ctrl( |
| stream: DaemonRequestStream, |
| ) -> TargetControlChannels { |
| let (target_in, target_out) = mpsc::unbounded::<Target>(); |
| let (target_ready_channel_in, target_ready_channel_out) = mpsc::unbounded::<bool>(); |
| spawn(async move { |
| let mut d = Daemon::new_with_rx(target_out); |
| d.register_hook(TestHookFakeRCS::new(target_ready_channel_in)).await; |
| d.handle_requests_from_stream(stream, false) |
| .await |
| .unwrap_or_else(|err| panic!("Fatal error handling request: {:?}", err)); |
| }); |
| |
| TargetControlChannels { |
| target_ready_channel: target_ready_channel_out, |
| target_detected_channel: target_in, |
| } |
| } |
| |
| async fn spawn_daemon_server_with_fake_target( |
| stream: DaemonRequestStream, |
| ) -> TargetControlChannels { |
| let mut res = spawn_daemon_server_with_target_ctrl(stream).await; |
| res.send_target(Target::new("foobar", Utc::now())).await; |
| res |
| } |
| |
| fn setup_fake_target_service() -> RemoteControlProxy { |
| let (proxy, mut stream) = |
| fidl::endpoints::create_proxy_and_stream::<RemoteControlMarker>().unwrap(); |
| |
| spawn(async move { |
| while let Ok(req) = stream.try_next().await { |
| match req { |
| Some(RemoteControlRequest::StartComponent { responder, .. }) => { |
| let _ = responder.send(&mut Ok(())).context("sending ok response"); |
| } |
| _ => assert!(false), |
| } |
| } |
| }); |
| |
| proxy |
| } |
| |
| #[test] |
| fn test_echo() { |
| let echo = "test-echo"; |
| let (daemon_proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap(); |
| hoist::run(async move { |
| let _ctrl = spawn_daemon_server_with_target_ctrl(stream).await; |
| let echoed = daemon_proxy.echo_string(echo).await.unwrap(); |
| assert_eq!(echoed, echo); |
| }); |
| } |
| |
| #[test] |
| fn test_getting_rcs_multiple_targets() -> Result<(), Error> { |
| let (daemon_proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap(); |
| let (_, remote_server_end) = create_proxy::<RemoteControlMarker>()?; |
| hoist::run(async move { |
| let mut ctrl = spawn_daemon_server_with_fake_target(stream).await; |
| ctrl.send_target(Target::new("bazmumble", Utc::now())).await; |
| match daemon_proxy.get_remote_control(remote_server_end).await.unwrap() { |
| Ok(_) => panic!("failure expected for multiple targets"), |
| _ => (), |
| } |
| }); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_list_targets() -> Result<(), Error> { |
| let (daemon_proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap(); |
| hoist::run(async move { |
| let mut ctrl = spawn_daemon_server_with_fake_target(stream).await; |
| ctrl.send_target(Target::new("baz", Utc::now())).await; |
| ctrl.send_target(Target::new("quux", Utc::now())).await; |
| let res = daemon_proxy.list_targets("").await.unwrap(); |
| |
| // TODO(awdavies): This check is in lieu of having an |
| // established format for the list_targets output. |
| assert!(res.contains("foobar")); |
| assert!(res.contains("baz")); |
| assert!(res.contains("quux")); |
| |
| let res = daemon_proxy.list_targets("mlorp").await.unwrap(); |
| assert!(!res.contains("foobar")); |
| assert!(!res.contains("baz")); |
| assert!(!res.contains("quux")); |
| }); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_quit() -> Result<(), Error> { |
| let (daemon_proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap(); |
| |
| if std::path::Path::new(SOCKET).is_file() { |
| std::fs::remove_file(SOCKET).unwrap(); |
| } |
| |
| hoist::run(async move { |
| let mut _ctrl = spawn_daemon_server_with_fake_target(stream).await; |
| let r = daemon_proxy.quit().await.unwrap(); |
| |
| assert!(r); |
| |
| assert!(!std::path::Path::new(SOCKET).is_file()); |
| }); |
| |
| Ok(()) |
| } |
| |
| struct TestHookFirst { |
| callbacks_done: mpsc::UnboundedSender<bool>, |
| } |
| |
| #[async_trait] |
| impl DiscoveryHook for TestHookFirst { |
| async fn on_new_target(&self, target: &Arc<Target>, tc: &Arc<TargetCollection>) { |
| // This will crash if the target isn't already inserted. |
| let t = tc.get(target.nodename.clone().into()).await.unwrap().clone(); |
| assert_eq!(t.nodename, "nothin"); |
| assert_eq!(*t.state.lock().await, TargetState::new()); |
| assert_eq!(*t.addrs.lock().await, HashSet::new()); |
| self.callbacks_done.unbounded_send(true).unwrap(); |
| } |
| } |
| |
| struct TestHookSecond { |
| callbacks_done: mpsc::UnboundedSender<bool>, |
| } |
| |
| #[async_trait] |
| impl DiscoveryHook for TestHookSecond { |
| async fn on_new_target(&self, _target: &Arc<Target>, _tc: &Arc<TargetCollection>) { |
| self.callbacks_done.unbounded_send(true).unwrap(); |
| } |
| } |
| |
| #[test] |
| fn test_receive_target() { |
| hoist::run(async move { |
| let (tx_from_callback, mut rx_from_callback) = mpsc::unbounded::<bool>(); |
| let (tx, rx) = mpsc::unbounded::<Target>(); |
| let mut daemon = Daemon::new_with_rx(rx); |
| daemon.register_hook(TestHookFirst { callbacks_done: tx_from_callback.clone() }).await; |
| daemon.register_hook(TestHookSecond { callbacks_done: tx_from_callback }).await; |
| tx.unbounded_send(Target::new("nothin", Utc::now())).unwrap(); |
| assert!(rx_from_callback.next().await.unwrap()); |
| assert!(rx_from_callback.next().await.unwrap()); |
| }); |
| } |
| } |