| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::constants::{get_socket, CURRENT_EXE_BUILDID}, |
| anyhow::{anyhow, bail, Context, Result}, |
| ascendd::Ascendd, |
| async_trait::async_trait, |
| ffx_build_version::build_info, |
| ffx_daemon_core::events::{self, EventHandler}, |
| ffx_daemon_events::{ |
| DaemonEvent, TargetConnectionState, TargetEvent, TargetInfo, WireTrafficType, |
| }, |
| ffx_daemon_protocols::create_protocol_register_map, |
| ffx_daemon_target::target::Target, |
| ffx_daemon_target::target_collection::TargetCollection, |
| ffx_daemon_target::zedboot::zedboot_discovery, |
| ffx_metrics::{add_daemon_launch_event, add_daemon_metrics_event}, |
| ffx_stream_util::TryStreamUtilExt, |
| fidl::{endpoints::ClientEnd, prelude::*}, |
| fidl_fuchsia_developer_ffx::{ |
| self as ffx, DaemonError, DaemonMarker, DaemonRequest, DaemonRequestStream, |
| RepositoryRegistryMarker, TargetCollectionMarker, |
| }, |
| fidl_fuchsia_developer_remotecontrol::{RemoteControlMarker, RemoteControlProxy}, |
| fidl_fuchsia_overnet::Peer, |
| fidl_fuchsia_overnet::{ServiceProviderRequest, ServiceProviderRequestStream}, |
| fidl_fuchsia_overnet_protocol::NodeId, |
| fuchsia_async::{Task, TimeoutExt, Timer}, |
| futures::prelude::*, |
| hoist::{hoist, OvernetInstance}, |
| protocols::{DaemonProtocolProvider, ProtocolError, ProtocolRegister}, |
| rcs::RcsConnection, |
| std::cell::Cell, |
| std::collections::HashSet, |
| std::hash::{Hash, Hasher}, |
| std::rc::Rc, |
| std::time::{Duration, Instant}, |
| }; |
| |
| // Daemon |
| |
| // This is just for mocking config values for unit testing. |
| #[async_trait(?Send)] |
| trait ConfigReader: Send + Sync { |
| async fn get(&self, q: &str) -> Result<Option<String>>; |
| } |
| |
| #[derive(Default)] |
| struct DefaultConfigReader {} |
| |
| #[async_trait(?Send)] |
| impl ConfigReader for DefaultConfigReader { |
| async fn get(&self, q: &str) -> Result<Option<String>> { |
| Ok(ffx_config::get(q).await?) |
| } |
| } |
| |
| pub struct DaemonEventHandler { |
| target_collection: Rc<TargetCollection>, |
| } |
| |
| impl DaemonEventHandler { |
| fn new(target_collection: Rc<TargetCollection>) -> Self { |
| Self { target_collection } |
| } |
| |
| #[tracing::instrument(level = "info", skip(self))] |
| async fn handle_overnet_peer(&self, node_id: u64) { |
| let rcs = match RcsConnection::new(&mut NodeId { id: node_id }) { |
| Ok(rcs) => rcs, |
| Err(e) => { |
| log::error!("Target from Overnet {} failed to connect to RCS: {:?}", node_id, e); |
| return; |
| } |
| }; |
| |
| let target = match Target::from_rcs_connection(rcs).await { |
| Ok(target) => target, |
| Err(err) => { |
| log::error!("Target from Overnet {} could not be identified: {:?}", node_id, err); |
| return; |
| } |
| }; |
| |
| log::trace!("Target from Overnet {} is {}", node_id, target.nodename_str()); |
| let target = self.target_collection.merge_insert(target); |
| target.run_logger(); |
| } |
| |
| #[tracing::instrument(level = "info", skip(self))] |
| async fn handle_overnet_peer_lost(&self, node_id: u64) { |
| if let Some(target) = self |
| .target_collection |
| .targets() |
| .iter() |
| .find(|target| target.overnet_node_id() == Some(node_id)) |
| { |
| target.disconnect(); |
| } |
| } |
| |
| fn handle_fastboot(&self, t: TargetInfo) { |
| log::trace!( |
| "Found new target via fastboot: {}", |
| t.nodename.clone().unwrap_or("<unknown>".to_string()) |
| ); |
| let target = self.target_collection.merge_insert(Target::from_target_info(t.into())); |
| target.update_connection_state(|s| match s { |
| TargetConnectionState::Disconnected | TargetConnectionState::Fastboot(_) => { |
| TargetConnectionState::Fastboot(Instant::now()) |
| } |
| _ => s, |
| }); |
| } |
| |
| async fn handle_zedboot(&self, t: TargetInfo) { |
| log::trace!( |
| "Found new target via zedboot: {}", |
| t.nodename.clone().unwrap_or("<unknown>".to_string()) |
| ); |
| let target = self.target_collection.merge_insert(Target::from_netsvc_target_info(t.into())); |
| target.update_connection_state(|s| match s { |
| TargetConnectionState::Disconnected | TargetConnectionState::Zedboot(_) => { |
| TargetConnectionState::Zedboot(Instant::now()) |
| } |
| _ => s, |
| }); |
| } |
| } |
| |
| #[async_trait(?Send)] |
| impl DaemonProtocolProvider for Daemon { |
| async fn open_protocol(&self, protocol_name: String) -> Result<fidl::Channel> { |
| let (server, client) = fidl::Channel::create().context("creating zx channel")?; |
| self.protocol_register |
| .open( |
| protocol_name, |
| protocols::Context::new(self.clone()), |
| fidl::AsyncChannel::from_channel(server)?, |
| ) |
| .await?; |
| Ok(client) |
| } |
| |
| async fn open_target_proxy( |
| &self, |
| target_identifier: Option<String>, |
| protocol_selector: fidl_fuchsia_diagnostics::Selector, |
| ) -> Result<fidl::Channel> { |
| let (_, channel) = |
| self.open_target_proxy_with_info(target_identifier, protocol_selector).await?; |
| Ok(channel) |
| } |
| |
| async fn get_target_event_queue( |
| &self, |
| target_identifier: Option<String>, |
| ) -> Result<(Rc<Target>, events::Queue<TargetEvent>)> { |
| let target = self |
| .get_target(target_identifier) |
| .await |
| .map_err(|e| anyhow!("{:#?}", e)) |
| .context("getting default target")?; |
| target.run_host_pipe(); |
| let events = target.events.clone(); |
| Ok((target, events)) |
| } |
| |
| async fn open_target_proxy_with_info( |
| &self, |
| target_identifier: Option<String>, |
| protocol_selector: fidl_fuchsia_diagnostics::Selector, |
| ) -> Result<(ffx::TargetInfo, fidl::Channel)> { |
| let target = self.get_rcs_ready_target(target_identifier).await?; |
| let rcs = target |
| .rcs() |
| .ok_or(anyhow!("rcs disconnected after event fired")) |
| .context("getting rcs instance")?; |
| let (server, client) = fidl::Channel::create().context("creating zx channel")?; |
| |
| // TODO(awdavies): Handle these errors properly so the client knows what happened. |
| rcs.proxy |
| .connect(protocol_selector, server) |
| .await |
| .context("FIDL connection")? |
| .map_err(|e| anyhow!("{:#?}", e)) |
| .context("proxy connect")?; |
| Ok((target.as_ref().into(), client)) |
| } |
| |
| async fn get_target_info(&self, target_identifier: Option<String>) -> Result<ffx::TargetInfo> { |
| let target = self |
| .get_target(target_identifier) |
| .await |
| .map_err(|e| anyhow!("{:#?}", e)) |
| .context("getting target")?; |
| Ok(target.as_ref().into()) |
| } |
| |
| #[tracing::instrument(level = "info", skip(self))] |
| async fn open_remote_control( |
| &self, |
| target_identifier: Option<String>, |
| ) -> Result<RemoteControlProxy> { |
| let target = self.get_rcs_ready_target(target_identifier).await?; |
| // Ensure auto-connect has at least started. |
| let mut rcs = target |
| .rcs() |
| .ok_or(anyhow!("rcs disconnected after event fired")) |
| .context("getting rcs instance")?; |
| let (proxy, remote) = fidl::endpoints::create_proxy::<RemoteControlMarker>()?; |
| rcs.copy_to_channel(remote.into_channel())?; |
| Ok(proxy) |
| } |
| |
| async fn daemon_event_queue(&self) -> events::Queue<DaemonEvent> { |
| self.event_queue.clone() |
| } |
| |
| async fn get_target_collection(&self) -> Result<Rc<TargetCollection>> { |
| Ok(self.target_collection.clone()) |
| } |
| } |
| |
| #[async_trait(?Send)] |
| impl EventHandler<DaemonEvent> for DaemonEventHandler { |
| async fn on_event(&self, event: DaemonEvent) -> Result<events::Status> { |
| log::info!("! DaemonEvent::{:?}", event); |
| |
| match event { |
| DaemonEvent::WireTraffic(traffic) => match traffic { |
| WireTrafficType::Mdns(t) => { |
| log::warn!("mdns traffic fired in daemon. This is deprecated: {:?}", t); |
| } |
| WireTrafficType::Fastboot(t) => { |
| self.handle_fastboot(t); |
| } |
| WireTrafficType::Zedboot(t) => { |
| self.handle_zedboot(t).await; |
| } |
| }, |
| DaemonEvent::OvernetPeer(node_id) => { |
| self.handle_overnet_peer(node_id).await; |
| } |
| DaemonEvent::OvernetPeerLost(node_id) => { |
| self.handle_overnet_peer_lost(node_id).await; |
| } |
| _ => (), |
| } |
| |
| // This handler is never done unless the target_collection is dropped. |
| Ok(events::Status::Waiting) |
| } |
| } |
| |
| #[derive(Clone)] |
| /// Defines the daemon object. This is used by "ffx daemon start". |
| /// |
| /// Typical usage is: |
| /// let mut daemon = ffx_daemon::Daemon::new(); |
| /// daemon.start().await |
| pub struct Daemon { |
| // The event queue is a collection of subscriptions to which DaemonEvents will be published. |
| event_queue: events::Queue<DaemonEvent>, |
| // All the targets currently known to the daemon. |
| // This may include targets the daemon has no access to. |
| target_collection: Rc<TargetCollection>, |
| // ascendd is the overnet daemon running on the Linux host. It manages the mesh and the |
| // connections to the devices and other peers (for example, a connection to the frontend). |
| // With ffx, ascendd is embedded within the ffx daemon (when ffx daemon is launched, we don’t |
| // need an extra process for ascendd). |
| ascendd: Rc<Cell<Option<Ascendd>>>, |
| // Handles the registered FIDL protocols and associated handles. This is initialized with the |
| // list of protocols defined in src/developer/ffx/daemon/protocols/BUILD.gn (the deps field in |
| // ffx_protocol) using the macro generate_protocol_map in |
| // src/developer/ffx/build/templates/protocols_macro.md. |
| protocol_register: ProtocolRegister, |
| // All the persistent long running tasks spawned by the daemon. The tasks are standalone. That |
| // means that they execute by themselves without any intervention from the daemon. |
| // The purpose of this vector is to keep the reference strong count positive until the daemon is |
| // dropped. |
| tasks: Vec<Rc<Task<()>>>, |
| } |
| |
| impl Daemon { |
| pub fn new() -> Daemon { |
| let target_collection = Rc::new(TargetCollection::new()); |
| let event_queue = events::Queue::new(&target_collection); |
| target_collection.set_event_queue(event_queue.clone()); |
| |
| Self { |
| target_collection, |
| event_queue, |
| protocol_register: ProtocolRegister::new(create_protocol_register_map()), |
| ascendd: Rc::new(Cell::new(None)), |
| tasks: Vec::new(), |
| } |
| } |
| |
| pub async fn start(&mut self) -> Result<()> { |
| self.log_startup_info().await?; |
| |
| self.start_protocols().await?; |
| self.start_discovery().await?; |
| self.start_ascendd().await?; |
| self.start_target_expiry(Duration::from_secs(1)); |
| self.serve().await |
| } |
| |
| async fn log_startup_info(&self) -> Result<()> { |
| let pid = std::process::id(); |
| let hash: String = |
| ffx_config::get((CURRENT_EXE_BUILDID, ffx_config::ConfigLevel::Runtime)).await?; |
| let version_info = build_info(); |
| let commit_hash = version_info.commit_hash.as_deref().unwrap_or("<unknown>"); |
| let commit_timestamp = |
| version_info.commit_timestamp.map(|t| t.to_string()).unwrap_or("<unknown>".to_owned()); |
| let build_version = version_info.build_version.as_deref().unwrap_or("<unknown>"); |
| |
| log::info!( |
| "Beginning daemon startup\nBuild Version: {}\nCommit Timestamp: {}\nCommit Hash: {}\nBinary Hash: {}\nPID: {}", |
| build_version, |
| commit_timestamp, |
| commit_hash, |
| hash, |
| pid |
| ); |
| add_daemon_launch_event().await; |
| Ok(()) |
| } |
| |
| async fn start_protocols(&mut self) -> Result<()> { |
| let cx = protocols::Context::new(self.clone()); |
| let ((), ()) = futures::future::try_join( |
| self.protocol_register |
| .start(RepositoryRegistryMarker::PROTOCOL_NAME.to_string(), cx.clone()), |
| self.protocol_register.start(TargetCollectionMarker::PROTOCOL_NAME.to_string(), cx), |
| ) |
| .await?; |
| Ok(()) |
| } |
| |
| /// Awaits a target that has RCS active. |
| async fn get_rcs_ready_target(&self, target_query: Option<String>) -> Result<Rc<Target>> { |
| let target = self |
| .get_target(target_query) |
| .await |
| .map_err(|e| anyhow!("{:#?}", e)) |
| .context("getting default target")?; |
| if matches!(target.get_connection_state(), TargetConnectionState::Fastboot(_)) { |
| let nodename = target.nodename().unwrap_or("<No Nodename>".to_string()); |
| bail!("Attempting to open RCS on a fastboot target: {}", nodename); |
| } |
| if matches!(target.get_connection_state(), TargetConnectionState::Zedboot(_)) { |
| let nodename = target.nodename().unwrap_or("<No Nodename>".to_string()); |
| bail!("Attempting to connect to RCS on a zedboot target: {}", nodename); |
| } |
| // Ensure auto-connect has at least started. |
| target.run_host_pipe(); |
| target |
| .events |
| .wait_for(None, |e| e == TargetEvent::RcsActivated) |
| .await |
| .context("waiting for RCS activation")?; |
| Ok(target) |
| } |
| |
| /// Start all discovery tasks |
| async fn start_discovery(&mut self) -> Result<()> { |
| let daemon_event_handler = DaemonEventHandler::new(self.target_collection.clone()); |
| self.event_queue.add_handler(daemon_event_handler).await; |
| |
| // TODO: these tasks could and probably should be managed by the daemon |
| // instead of being detached. |
| Daemon::spawn_onet_discovery(self.event_queue.clone()); |
| let discovery = zedboot_discovery(self.event_queue.clone()).await?; |
| self.tasks.push(Rc::new(discovery)); |
| Ok(()) |
| } |
| |
| async fn start_ascendd(&mut self) -> Result<()> { |
| // Start the ascendd socket only after we have registered our protocols. |
| log::info!("Starting ascendd"); |
| |
| let ascendd = Ascendd::new( |
| ascendd::Opt { sockpath: Some(get_socket().await), ..Default::default() }, |
| // TODO: this just prints serial output to stdout - ffx probably wants to take a more |
| // nuanced approach here. |
| blocking::Unblock::new(std::io::stdout()), |
| ) |
| .await?; |
| |
| self.ascendd.replace(Some(ascendd)); |
| |
| Ok(()) |
| } |
| |
| fn start_target_expiry(&mut self, frequency: Duration) { |
| let target_collection = Rc::downgrade(&self.target_collection); |
| self.tasks.push(Rc::new(Task::local(async move { |
| loop { |
| Timer::new(frequency.clone()).await; |
| |
| match target_collection.upgrade() { |
| Some(target_collection) => { |
| for target in target_collection.targets() { |
| // Manually-added remote targets will not be discovered by mDNS, |
| // and as a result will not have host-pipe triggered automatically |
| // by the mDNS event handler. |
| if target.is_manual() { |
| target.run_host_pipe(); |
| } |
| target.expire_state(); |
| if target.is_manual() && !target.is_connected() { |
| // If a manual target has been allowed to transition to the |
| // "disconnected" state, it should be removed from the collection. |
| target_collection.remove_ephemeral_target(target); |
| } |
| } |
| } |
| None => return, |
| } |
| } |
| }))) |
| } |
| |
| /// get_target attempts to get the target that matches the match string if |
| /// provided, otherwise the default target from the target collection. |
| async fn get_target(&self, matcher: Option<String>) -> Result<Rc<Target>, DaemonError> { |
| #[cfg(not(test))] |
| const GET_TARGET_TIMEOUT: Duration = Duration::from_secs(8); |
| #[cfg(test)] |
| const GET_TARGET_TIMEOUT: Duration = Duration::from_secs(1); |
| |
| // TODO(72818): make target match timeout configurable / paramterable |
| self.target_collection |
| .wait_for_match(matcher) |
| .on_timeout(GET_TARGET_TIMEOUT, || match self.target_collection.is_empty() { |
| true => Err(DaemonError::TargetCacheEmpty), |
| false => Err(DaemonError::TargetNotFound), |
| }) |
| .await |
| } |
| |
| async fn handle_requests_from_stream(&self, stream: DaemonRequestStream) -> Result<()> { |
| stream |
| .map_err(|e| anyhow!("reading FIDL stream: {:#}", e)) |
| .try_for_each_concurrent_while_connected(None, |r| async { |
| let debug_req_string = format!("{:?}", r); |
| if let Err(e) = self.handle_request(r).await { |
| log::error!("error while handling request `{}`: {}", debug_req_string, e); |
| } |
| Ok(()) |
| }) |
| .await |
| } |
| |
| fn spawn_onet_discovery(queue: events::Queue<DaemonEvent>) { |
| fuchsia_async::Task::local(async move { |
| let mut known_peers: HashSet<PeerSetElement> = Default::default(); |
| |
| loop { |
| let svc = match hoist().connect_as_service_consumer() { |
| Ok(svc) => svc, |
| Err(err) => { |
| log::info!("Overnet setup failed: {}, will retry in 1s", err); |
| Timer::new(Duration::from_secs(1)).await; |
| continue; |
| } |
| }; |
| loop { |
| match svc.list_peers().await { |
| Ok(new_peers) => { |
| known_peers = |
| Self::handle_overnet_peers(&queue, known_peers, new_peers); |
| } |
| Err(err) => { |
| log::info!("Overnet peer discovery failed: {}, will retry", err); |
| Timer::new(Duration::from_secs(1)).await; |
| // break out of the peer discovery loop on error in |
| // order to reconnect, in case the error causes the |
| // overnet interface to go bad. |
| break; |
| } |
| }; |
| } |
| } |
| }) |
| .detach(); |
| } |
| |
| fn handle_overnet_peers( |
| queue: &events::Queue<DaemonEvent>, |
| known_peers: HashSet<PeerSetElement>, |
| peers: Vec<Peer>, |
| ) -> HashSet<PeerSetElement> { |
| let mut new_peers: HashSet<PeerSetElement> = Default::default(); |
| for peer in peers { |
| new_peers.insert(PeerSetElement(peer)); |
| } |
| |
| for peer in new_peers.difference(&known_peers) { |
| let peer = &peer.0; |
| let peer_has_rcs = peer |
| .description |
| .services |
| .as_ref() |
| .map(|v| v.contains(&RemoteControlMarker::PROTOCOL_NAME.to_string())) |
| .unwrap_or(false); |
| if peer_has_rcs { |
| queue.push(DaemonEvent::OvernetPeer(peer.id.id)).unwrap_or_else(|err| { |
| log::warn!( |
| "Overnet discovery failed to enqueue event {:?}: {}", |
| DaemonEvent::OvernetPeer(peer.id.id), |
| err |
| ); |
| }); |
| } |
| } |
| |
| for peer in known_peers.difference(&new_peers) { |
| let peer = &peer.0; |
| queue.push(DaemonEvent::OvernetPeerLost(peer.id.id)).unwrap_or_else(|err| { |
| log::warn!( |
| "Overnet discovery failed to enqueue event {:?}: {}", |
| DaemonEvent::OvernetPeerLost(peer.id.id), |
| err |
| ); |
| }); |
| } |
| |
| new_peers |
| } |
| |
| async fn handle_request(&self, req: DaemonRequest) -> Result<()> { |
| log::debug!("daemon received request: {:?}", req); |
| |
| match req { |
| DaemonRequest::Quit { responder } => { |
| log::info!("Received quit request."); |
| |
| match std::fs::remove_file(get_socket().await) { |
| Ok(()) => {} |
| Err(e) => log::error!("failed to remove socket file: {}", e), |
| } |
| |
| if cfg!(test) { |
| panic!("quit() should not be invoked in test code"); |
| } |
| |
| self.protocol_register |
| .shutdown(protocols::Context::new(self.clone())) |
| .await |
| .unwrap_or_else(|e| log::error!("shutting down protocol register: {:?}", e)); |
| |
| add_daemon_metrics_event("quit").await; |
| |
| ffx_config::logging::disable_stdio_logging(); |
| |
| // It is desirable for the client to receive an ACK for the quit |
| // request. As Overnet has a potentially complicated routing |
| // path, it is tricky to implement some notion of a bounded |
| // "flush" for this response, however in practice it is only |
| // necessary here to wait long enough for the message to likely |
| // leave the local process before exiting. Enqueue a detached |
| // timer to shut down the daemon before sending the response. |
| // This is detached because once the client receives the |
| // response, the client will disconnect it's socket. If the |
| // local reactor observes this disconnection before the timer |
| // expires, an in-line timer wait would never fire, and the |
| // daemon would never exit. |
| Task::local( |
| Timer::new(std::time::Duration::from_millis(20)).map(|_| std::process::exit(0)), |
| ) |
| .detach(); |
| |
| responder.send(true).context("error sending response")?; |
| } |
| DaemonRequest::GetVersionInfo { responder } => { |
| let mut info = build_info(); |
| let build_id: String = |
| ffx_config::get((CURRENT_EXE_BUILDID, ffx_config::ConfigLevel::Runtime)) |
| .await?; |
| info.build_id = Some(build_id); |
| return responder.send(info).context("sending GetVersionInfo response"); |
| } |
| DaemonRequest::ConnectToProtocol { name, server_end, responder } => { |
| let name_for_analytics = name.clone(); |
| match self |
| .protocol_register |
| .open( |
| name, |
| protocols::Context::new(self.clone()), |
| fidl::AsyncChannel::from_channel(server_end)?, |
| ) |
| .await |
| { |
| Ok(()) => responder.send(&mut Ok(())).context("fidl response")?, |
| Err(e) => { |
| log::error!("{}", e); |
| match e { |
| ProtocolError::NoProtocolFound(_) => { |
| responder.send(&mut Err(DaemonError::ProtocolNotFound))? |
| } |
| ProtocolError::StreamOpenError(_) => { |
| responder.send(&mut Err(DaemonError::ProtocolOpenError))? |
| } |
| ProtocolError::BadRegisterState(_) |
| | ProtocolError::DuplicateTaskId(..) => { |
| responder.send(&mut Err(DaemonError::BadProtocolRegisterState))? |
| } |
| } |
| } |
| } |
| add_daemon_metrics_event( |
| format!("connect_to_protocol: {}", &name_for_analytics).as_str(), |
| ) |
| .await; |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| async fn serve(&self) -> Result<()> { |
| let (s, p) = fidl::Channel::create().context("failed to create zx channel")?; |
| let chan = fidl::AsyncChannel::from_channel(s).context("failed to make async channel")?; |
| let mut stream = ServiceProviderRequestStream::from_channel(chan); |
| |
| log::info!("Starting daemon overnet server"); |
| hoist::hoist().publish_service(DaemonMarker::PROTOCOL_NAME, ClientEnd::new(p))?; |
| |
| log::info!("Starting daemon serve loop"); |
| while let Some(ServiceProviderRequest::ConnectToService { |
| chan, |
| info: _, |
| control_handle: _control_handle, |
| }) = stream.try_next().await.context("error running protocol provider server")? |
| { |
| log::trace!("Received protocol request for protocol"); |
| let chan = |
| fidl::AsyncChannel::from_channel(chan).context("failed to make async channel")?; |
| let daemon_clone = self.clone(); |
| Task::local(async move { |
| daemon_clone |
| .handle_requests_from_stream(DaemonRequestStream::from_channel(chan)) |
| .await |
| .unwrap_or_else(|err| panic!("fatal error handling request: {:?}", err)); |
| }) |
| .detach(); |
| } |
| Ok(()) |
| } |
| } |
| |
| // PeerSetElement wraps an overnet Peer object for inclusion in a Set |
| // or other collection reliant on Eq and HAsh, using the NodeId as the |
| // discriminator. |
| struct PeerSetElement(Peer); |
| impl PartialEq for PeerSetElement { |
| fn eq(&self, other: &Self) -> bool { |
| self.0.id == other.0.id |
| } |
| } |
| impl Eq for PeerSetElement {} |
| impl Hash for PeerSetElement { |
| fn hash<H: Hasher>(&self, state: &mut H) { |
| self.0.id.hash(state); |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use { |
| super::*, |
| addr::TargetAddr, |
| assert_matches::assert_matches, |
| chrono::Utc, |
| ffx_daemon_target::target::TargetAddrEntry, |
| ffx_daemon_target::target::TargetAddrType, |
| fidl_fuchsia_developer_ffx::{DaemonMarker, DaemonProxy}, |
| fidl_fuchsia_developer_remotecontrol::RemoteControlMarker, |
| fidl_fuchsia_overnet_protocol::PeerDescription, |
| fuchsia_async::Task, |
| std::cell::RefCell, |
| std::collections::BTreeSet, |
| std::iter::FromIterator, |
| std::time::SystemTime, |
| }; |
| |
| fn spawn_test_daemon() -> (DaemonProxy, Daemon, Task<Result<()>>) { |
| let d = Daemon::new(); |
| |
| let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap(); |
| |
| let d2 = d.clone(); |
| let task = Task::local(async move { d2.handle_requests_from_stream(stream).await }); |
| |
| (proxy, d, task) |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_open_rcs_on_fastboot_error() { |
| let (_proxy, daemon, _task) = spawn_test_daemon(); |
| let target = Target::new_with_serial("abc"); |
| daemon.target_collection.merge_insert(target); |
| let result = daemon.open_remote_control(None).await; |
| assert!(result.is_err()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_open_rcs_on_zedboot_error() { |
| let (_proxy, daemon, _task) = spawn_test_daemon(); |
| let target = Target::new_with_netsvc_addrs( |
| Some("abc"), |
| BTreeSet::from_iter(vec![TargetAddr::new("[fe80::1%1]:22").unwrap()].into_iter()), |
| ); |
| daemon.target_collection.merge_insert(target); |
| let result = daemon.open_remote_control(None).await; |
| assert!(result.is_err()); |
| } |
| |
| struct FakeConfigReader { |
| query_expected: String, |
| value: String, |
| } |
| |
| #[async_trait(?Send)] |
| impl ConfigReader for FakeConfigReader { |
| async fn get(&self, q: &str) -> Result<Option<String>> { |
| assert_eq!(q, self.query_expected); |
| Ok(Some(self.value.clone())) |
| } |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_target_empty() { |
| let d = Daemon::new(); |
| let nodename = "where-is-my-hasenpfeffer"; |
| let t = Target::new_autoconnected(nodename); |
| d.target_collection.merge_insert(t.clone()); |
| assert_eq!(nodename, d.get_target(None).await.unwrap().nodename().unwrap()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_target_query() { |
| let d = Daemon::new(); |
| let nodename = "where-is-my-hasenpfeffer"; |
| let t = Target::new_autoconnected(nodename); |
| d.target_collection.merge_insert(t.clone()); |
| assert_eq!( |
| nodename, |
| d.get_target(Some(nodename.to_string())).await.unwrap().nodename().unwrap() |
| ); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_target_collection_empty_error() { |
| let d = Daemon::new(); |
| assert_eq!(DaemonError::TargetCacheEmpty, d.get_target(None).await.unwrap_err()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_target_ambiguous() { |
| let d = Daemon::new(); |
| let t = Target::new_autoconnected("where-is-my-hasenpfeffer"); |
| let t2 = Target::new_autoconnected("it-is-rabbit-season"); |
| d.target_collection.merge_insert(t.clone()); |
| d.target_collection.merge_insert(t2.clone()); |
| assert_eq!(DaemonError::TargetAmbiguous, d.get_target(None).await.unwrap_err()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_target_expiry() { |
| let mut daemon = Daemon::new(); |
| let target = Target::new_named("goodbye-world"); |
| let then = Instant::now() - Duration::from_secs(10); |
| target.update_connection_state(|_| TargetConnectionState::Mdns(then)); |
| daemon.target_collection.merge_insert(target.clone()); |
| |
| assert_eq!(TargetConnectionState::Mdns(then), target.get_connection_state()); |
| |
| daemon.start_target_expiry(Duration::from_millis(1)); |
| |
| while target.get_connection_state() == TargetConnectionState::Mdns(then) { |
| futures_lite::future::yield_now().await |
| } |
| |
| assert_eq!(TargetConnectionState::Disconnected, target.get_connection_state()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_ephemeral_target_expiry() { |
| let mut daemon = Daemon::new(); |
| let expiring_target = Target::new_with_addr_entries( |
| Some("goodbye-world"), |
| vec![TargetAddrEntry { |
| addr: TargetAddr::new("127.0.0.1:8088").unwrap(), |
| timestamp: Utc::now(), |
| addr_type: TargetAddrType::Manual(Some(SystemTime::now())), |
| }] |
| .into_iter(), |
| ); |
| expiring_target.set_ssh_port(Some(8022)); |
| |
| let persistent_target = Target::new_with_addr_entries( |
| Some("i-will-stick-around"), |
| vec![TargetAddrEntry { |
| addr: TargetAddr::new("127.0.0.1:8089").unwrap(), |
| timestamp: Utc::now(), |
| addr_type: TargetAddrType::Manual(None), |
| }] |
| .into_iter(), |
| ); |
| persistent_target.set_ssh_port(Some(8023)); |
| |
| let then = Instant::now() - Duration::from_secs(10); |
| expiring_target.update_connection_state(|_| TargetConnectionState::Mdns(then)); |
| persistent_target.update_connection_state(|_| TargetConnectionState::Mdns(then)); |
| |
| assert!(daemon.target_collection.is_empty()); |
| |
| daemon.target_collection.merge_insert(expiring_target.clone()); |
| daemon.target_collection.merge_insert(persistent_target.clone()); |
| |
| assert_eq!(TargetConnectionState::Mdns(then), expiring_target.get_connection_state()); |
| assert_eq!(TargetConnectionState::Mdns(then), persistent_target.get_connection_state()); |
| |
| daemon.start_target_expiry(Duration::from_millis(1)); |
| |
| while expiring_target.get_connection_state() == TargetConnectionState::Mdns(then) { |
| futures_lite::future::yield_now().await |
| } |
| while persistent_target.get_connection_state() == TargetConnectionState::Mdns(then) { |
| futures_lite::future::yield_now().await |
| } |
| assert_eq!(TargetConnectionState::Disconnected, expiring_target.get_connection_state()); |
| assert_matches!( |
| persistent_target.get_connection_state(), |
| TargetConnectionState::Manual(None) |
| ); |
| assert_eq!(daemon.target_collection.targets().len(), 1); |
| } |
| |
| struct NullDaemonEventSynthesizer(); |
| |
| #[async_trait(?Send)] |
| impl events::EventSynthesizer<DaemonEvent> for NullDaemonEventSynthesizer { |
| async fn synthesize_events(&self) -> Vec<DaemonEvent> { |
| return Default::default(); |
| } |
| } |
| |
| #[test] |
| fn test_handle_overnet_peers_known_peer_exclusion() { |
| let queue = events::Queue::<DaemonEvent>::new(&Rc::new(NullDaemonEventSynthesizer {})); |
| let mut known_peers: HashSet<PeerSetElement> = Default::default(); |
| |
| let peer1 = Peer { |
| description: PeerDescription { |
| services: None, |
| unknown_data: None, |
| ..PeerDescription::EMPTY |
| }, |
| id: NodeId { id: 1 }, |
| is_self: false, |
| }; |
| let peer2 = Peer { |
| description: PeerDescription { |
| services: None, |
| unknown_data: None, |
| ..PeerDescription::EMPTY |
| }, |
| id: NodeId { id: 2 }, |
| is_self: false, |
| }; |
| |
| let new_peers = |
| Daemon::handle_overnet_peers(&queue, known_peers, vec![peer1.clone(), peer2.clone()]); |
| assert!(new_peers.contains(&PeerSetElement(peer1.clone()))); |
| assert!(new_peers.contains(&PeerSetElement(peer2.clone()))); |
| |
| known_peers = new_peers; |
| |
| let new_peers = Daemon::handle_overnet_peers(&queue, known_peers, vec![]); |
| assert!(!new_peers.contains(&PeerSetElement(peer1.clone()))); |
| assert!(!new_peers.contains(&PeerSetElement(peer2.clone()))); |
| } |
| |
| struct DaemonEventRecorder { |
| /// All events observed by the handler will be logged into this field. |
| event_log: Rc<RefCell<Vec<DaemonEvent>>>, |
| } |
| #[async_trait(?Send)] |
| impl EventHandler<DaemonEvent> for DaemonEventRecorder { |
| async fn on_event(&self, event: DaemonEvent) -> Result<events::Status> { |
| self.event_log.borrow_mut().push(event); |
| Ok(events::Status::Waiting) |
| } |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_handle_overnet_peer_leave_and_return() { |
| let queue = events::Queue::<DaemonEvent>::new(&Rc::new(NullDaemonEventSynthesizer {})); |
| let mut known_peers: HashSet<PeerSetElement> = Default::default(); |
| |
| let peer1 = Peer { |
| description: PeerDescription { |
| services: Some(vec![RemoteControlMarker::PROTOCOL_NAME.to_string()]), |
| unknown_data: None, |
| ..PeerDescription::EMPTY |
| }, |
| id: NodeId { id: 1 }, |
| is_self: false, |
| }; |
| let peer2 = Peer { |
| description: PeerDescription { |
| services: Some(vec![RemoteControlMarker::PROTOCOL_NAME.to_string()]), |
| unknown_data: None, |
| ..PeerDescription::EMPTY |
| }, |
| id: NodeId { id: 2 }, |
| is_self: false, |
| }; |
| |
| // First the targets are discovered: |
| let new_peers = |
| Daemon::handle_overnet_peers(&queue, known_peers, vec![peer1.clone(), peer2.clone()]); |
| assert!(new_peers.contains(&PeerSetElement(peer1.clone()))); |
| assert!(new_peers.contains(&PeerSetElement(peer2.clone()))); |
| |
| known_peers = new_peers; |
| |
| // Make a new queue so we don't get any of the historical events. |
| let queue = events::Queue::<DaemonEvent>::new(&Rc::new(NullDaemonEventSynthesizer {})); |
| let event_log = Rc::new(RefCell::new(Vec::<DaemonEvent>::new())); |
| |
| // Now wire up the event handler, we want to assert that we observe OvernetPeerLost events for the leaving targets. |
| queue.add_handler(DaemonEventRecorder { event_log: event_log.clone() }).await; |
| |
| // Next the targets are lost: |
| let new_peers = Daemon::handle_overnet_peers(&queue, known_peers, vec![]); |
| assert!(!new_peers.contains(&PeerSetElement(peer1.clone()))); |
| assert!(!new_peers.contains(&PeerSetElement(peer2.clone()))); |
| |
| let start = Instant::now(); |
| while event_log.borrow().len() != 2 { |
| if Instant::now().duration_since(start) > Duration::from_secs(1) { |
| break; |
| } |
| futures_lite::future::yield_now().await; |
| } |
| |
| assert_eq!(event_log.borrow().len(), 2); |
| assert_matches!(event_log.borrow()[0], DaemonEvent::OvernetPeerLost(_)); |
| assert_matches!(event_log.borrow()[1], DaemonEvent::OvernetPeerLost(_)); |
| |
| known_peers = new_peers; |
| |
| assert_eq!(known_peers.len(), 0); |
| |
| // Make a new queue so we don't get any of the historical events. |
| let queue = events::Queue::<DaemonEvent>::new(&Rc::new(NullDaemonEventSynthesizer {})); |
| let event_log = Rc::new(RefCell::new(Vec::<DaemonEvent>::new())); |
| |
| // Now wire up the event handler, we want to assert that we observe NewTarget events for the returning targets. |
| queue.add_handler(DaemonEventRecorder { event_log: event_log.clone() }).await; |
| |
| // Now the targets return: |
| let new_peers = |
| Daemon::handle_overnet_peers(&queue, known_peers, vec![peer1.clone(), peer2.clone()]); |
| assert!(new_peers.contains(&PeerSetElement(peer1.clone()))); |
| assert!(new_peers.contains(&PeerSetElement(peer2.clone()))); |
| |
| let start = Instant::now(); |
| while event_log.borrow().len() != 2 { |
| if Instant::now().duration_since(start) > Duration::from_secs(1) { |
| break; |
| } |
| futures_lite::future::yield_now().await; |
| } |
| |
| // Ensure that we observed a new target event for each target that returned. |
| assert_eq!(event_log.borrow().len(), 2); |
| assert_matches!(event_log.borrow()[0], DaemonEvent::OvernetPeer(_)); |
| assert_matches!(event_log.borrow()[1], DaemonEvent::OvernetPeer(_)); |
| } |
| } |