| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use addr::TargetAddr; |
| use anyhow::{Context as _, Result}; |
| use async_lock::Mutex; |
| use compat_info::CompatibilityInfo; |
| use discovery::{DiscoverySources, TargetEvent, TargetHandle, TargetState}; |
| use errors::{ffx_bail, FfxError}; |
| use ffx_config::{keys::TARGET_DEFAULT_KEY, EnvironmentContext}; |
| use fidl::{endpoints::create_proxy, prelude::*}; |
| use fidl_fuchsia_developer_ffx::{ |
| self as ffx, DaemonError, DaemonProxy, TargetAddrInfo, TargetCollectionMarker, |
| TargetCollectionProxy, TargetInfo, TargetIp, TargetMarker, TargetQuery, |
| }; |
| use fidl_fuchsia_developer_remotecontrol::{RemoteControlMarker, RemoteControlProxy}; |
| use fidl_fuchsia_net as net; |
| use fuchsia_async::Timer; |
| use futures::{select, Future, FutureExt, StreamExt, TryStreamExt}; |
| use itertools::Itertools; |
| use netext::IsLocalAddr; |
| use std::cmp::Ordering; |
| use std::collections::HashSet; |
| use std::net::{IpAddr, Ipv6Addr}; |
| use std::net::{SocketAddr, SocketAddrV6}; |
| use std::sync::Arc; |
| use std::time::Duration; |
| use thiserror::Error; |
| use timeout::timeout; |
| use tracing::{debug, info}; |
| |
| mod desc; |
| mod fidl_pipe; |
| mod overnet_connector; |
| mod query; |
| mod ssh_connector; |
| |
| const SSH_PORT_DEFAULT: u16 = 22; |
| |
| pub use desc::{Description, FastbootInterface}; |
| pub use fidl_pipe::create_overnet_socket; |
| pub use fidl_pipe::FidlPipe; |
| pub use query::TargetInfoQuery; |
| |
| /// Re-export of [`fidl_fuchsia_developer_ffx::TargetProxy`] for ease of use |
| pub use fidl_fuchsia_developer_ffx::TargetProxy; |
| |
| const FASTBOOT_INLINE_TARGET: &str = "ffx.fastboot.inline_target"; |
| const CONFIG_LOCAL_DISCOVERY_TIMEOUT: &str = "discovery.timeout"; |
| |
| /// Attempt to connect to RemoteControl on a target device using a connection to a daemon. |
| /// |
| /// The optional |target| is a string matcher as defined in fuchsia.developer.ffx.TargetQuery |
| /// fidl table. |
| #[tracing::instrument] |
| pub async fn get_remote_proxy( |
| target_spec: Option<String>, |
| daemon_proxy: DaemonProxy, |
| proxy_timeout: Duration, |
| mut target_info: Option<&mut Option<TargetInfo>>, |
| env_context: &EnvironmentContext, |
| ) -> Result<RemoteControlProxy> { |
| let (target_proxy, target_proxy_fut) = |
| open_target_with_fut(target_spec.clone(), daemon_proxy, proxy_timeout, env_context)?; |
| let mut target_proxy_fut = target_proxy_fut.boxed_local().fuse(); |
| let (remote_proxy, remote_server_end) = create_proxy::<RemoteControlMarker>()?; |
| let mut open_remote_control_fut = |
| target_proxy.open_remote_control(remote_server_end).boxed_local().fuse(); |
| let res = loop { |
| select! { |
| res = open_remote_control_fut => { |
| match res { |
| Err(e) => { |
| // Getting here is most likely the result of a PEER_CLOSED error, which |
| // may be because the target_proxy closure has propagated faster than |
| // the error (which can happen occasionally). To counter this, wait for |
| // the target proxy to complete, as it will likely only need to be |
| // polled once more (open_remote_control_fut partially depends on it). |
| target_proxy_fut.await?; |
| return Err(e.into()); |
| } |
| Ok(r) => break(r), |
| } |
| } |
| res = target_proxy_fut => { |
| res?; |
| if let Some(ref mut info_out) = target_info { |
| **info_out = Some(target_proxy.identity().await?); |
| } |
| } |
| } |
| }; |
| let target_spec = target_spec.as_ref().map(ToString::to_string); |
| match res { |
| Ok(_) => Ok(remote_proxy), |
| Err(err) => Err(anyhow::Error::new(FfxError::TargetConnectionError { |
| err, |
| target: target_spec, |
| logs: Some(target_proxy.get_ssh_logs().await?), |
| })), |
| } |
| } |
| |
| /// Attempt to connect to a target given a connection to a daemon. |
| /// |
| /// The returned future must be polled to completion. It is returned separately |
| /// from the TargetProxy to enable immediately pushing requests onto the TargetProxy |
| /// before connecting to the target completes. |
| /// |
| /// The optional |target| is a string matcher as defined in fuchsia.developer.ffx.TargetQuery |
| /// fidl table. |
| #[tracing::instrument] |
| pub fn open_target_with_fut<'a, 'b: 'a>( |
| target: Option<String>, |
| daemon_proxy: DaemonProxy, |
| target_timeout: Duration, |
| env_context: &'b EnvironmentContext, |
| ) -> Result<(TargetProxy, impl Future<Output = Result<()>> + 'a)> { |
| let (tc_proxy, tc_server_end) = create_proxy::<TargetCollectionMarker>()?; |
| let (target_proxy, target_server_end) = create_proxy::<TargetMarker>()?; |
| let t_clone = target.clone(); |
| let target_collection_fut = async move { |
| daemon_proxy |
| .connect_to_protocol( |
| TargetCollectionMarker::PROTOCOL_NAME, |
| tc_server_end.into_channel(), |
| ) |
| .await? |
| .map_err(|err| FfxError::DaemonError { err, target: t_clone })?; |
| Result::<()>::Ok(()) |
| }; |
| let t_clone = target.clone(); |
| let target_handle_fut = async move { |
| let is_fastboot_inline = env_context.get(FASTBOOT_INLINE_TARGET).await.unwrap_or(false); |
| if is_fastboot_inline { |
| if let Some(ref serial_number) = target { |
| tracing::trace!("got serial number: {serial_number}"); |
| timeout(target_timeout, tc_proxy.add_inline_fastboot_target(&serial_number)) |
| .await??; |
| } |
| } |
| timeout( |
| target_timeout, |
| tc_proxy.open_target( |
| &TargetQuery { string_matcher: t_clone.clone(), ..Default::default() }, |
| target_server_end, |
| ), |
| ) |
| .await |
| .map_err(|_| FfxError::DaemonError { err: DaemonError::Timeout, target: t_clone })?? |
| .map_err(|err| FfxError::OpenTargetError { err, target })?; |
| Result::<()>::Ok(()) |
| }; |
| let fut = async move { |
| let ((), ()) = futures::try_join!(target_collection_fut, target_handle_fut)?; |
| Ok(()) |
| }; |
| |
| Ok((target_proxy, fut)) |
| } |
| |
| pub(crate) fn target_addr_info_to_socket(ti: &TargetAddrInfo) -> SocketAddr { |
| let (target_ip, port) = match ti { |
| TargetAddrInfo::Ip(a) => (a.clone(), 0), |
| TargetAddrInfo::IpPort(ip) => (TargetIp { ip: ip.ip, scope_id: ip.scope_id }, ip.port), |
| }; |
| let socket = match target_ip { |
| TargetIp { ip: net::IpAddress::Ipv4(net::Ipv4Address { addr }), .. } => { |
| SocketAddr::new(IpAddr::from(addr), port) |
| } |
| TargetIp { ip: net::IpAddress::Ipv6(net::Ipv6Address { addr }), scope_id, .. } => { |
| SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(addr), port, 0, scope_id)) |
| } |
| }; |
| socket |
| } |
| |
| pub async fn is_discovery_enabled(ctx: &EnvironmentContext) -> bool { |
| !ffx_config::is_usb_discovery_disabled(ctx).await |
| || !ffx_config::is_mdns_discovery_disabled(ctx).await |
| } |
| |
| fn non_empty_match_name(on1: &Option<String>, on2: &Option<String>) -> bool { |
| match (on1, on2) { |
| (Some(n1), Some(n2)) => n1 == n2, |
| _ => false, |
| } |
| } |
| |
| fn non_empty_match_addr(state: &discovery::TargetState, sa: &Option<SocketAddr>) -> bool { |
| match (state, sa) { |
| (discovery::TargetState::Product(addrs), Some(sa)) => { |
| addrs.iter().any(|a| a.ip() == sa.ip()) |
| } |
| _ => false, |
| } |
| } |
| |
| // Descriptions are used for matching against a TargetInfoQuery |
| fn handle_to_description(handle: &discovery::TargetHandle) -> Description { |
| let addresses = match &handle.state { |
| discovery::TargetState::Product(target_addr) => target_addr.clone(), |
| _ => vec![], |
| }; |
| Description { nodename: handle.node_name.clone(), addresses, ..Default::default() } |
| } |
| |
| pub async fn resolve_target_query( |
| query: TargetInfoQuery, |
| ctx: &EnvironmentContext, |
| ) -> Result<Vec<discovery::TargetHandle>> { |
| // Get nodename, in case we're trying to find an exact match |
| let (qname, qaddr) = match query { |
| TargetInfoQuery::NodenameOrSerial(ref s) => (Some(s.clone()), None), |
| TargetInfoQuery::Addr(ref a) => (None, Some(a.clone())), |
| _ => (None, None), |
| }; |
| let sources = DiscoverySources::MDNS |
| | DiscoverySources::USB |
| | DiscoverySources::MANUAL |
| | DiscoverySources::EMULATOR; |
| |
| let filter = move |handle: &TargetHandle| { |
| let description = handle_to_description(handle); |
| query.match_description(&description) |
| }; |
| let stream = discovery::wait_for_devices(filter, true, false, sources).await?; |
| let discovery_delay = ctx.get(CONFIG_LOCAL_DISCOVERY_TIMEOUT).await.unwrap_or(1000); |
| let delay = Duration::from_millis(discovery_delay); |
| |
| // This is tricky. We want the stream to complete immediately if we find |
| // a target whose name matches the query exactly. Otherwise, run until the |
| // timer fires. |
| // We can't use `Stream::wait_until()`, because that would require us |
| // to return true for the found item, and false for the _next_ item. |
| // But there may be no next item, so the stream would end up waiting for |
| // the timer anyway. Instead, we create two futures: the timer, and one |
| // that is ready when we find the name we're looking for. Then we use |
| // `Stream::take_until()`, waiting until _either_ of those futures is ready |
| // (by using `race()`). The only remaining tricky part is that we need to |
| // examine each event to determine if it matches what we're looking for -- |
| // so we interpose a closure via `Stream::map()` that examines each item, |
| // before returning them unmodified. |
| // Oh, and once we've got a set of results, if any of them are Err, cause |
| // the whole thing to be an Err. We could stop the race early in case of |
| // failure by using the same technique, I suppose. |
| let target_events: Result<Vec<TargetEvent>> = { |
| let timer = Timer::new(delay).fuse(); |
| let found_target_event = async_utils::event::Event::new(); |
| let found_it = found_target_event.wait().fuse(); |
| let results: Vec<Result<_>> = stream |
| .map(move |ev| { |
| if let Ok(TargetEvent::Added(ref h)) = ev { |
| if non_empty_match_name(&h.node_name, &qname) { |
| found_target_event.signal(); |
| } else if non_empty_match_addr(&h.state, &qaddr) { |
| found_target_event.signal(); |
| } |
| ev |
| } else { |
| unreachable!() |
| } |
| }) |
| .take_until(futures_lite::future::race(timer, found_it)) |
| .collect() |
| .await; |
| // Fail if any results are Err |
| let r: Result<Vec<_>> = results.into_iter().collect(); |
| r |
| }; |
| |
| // Extract handles from Added events |
| let added_handles: Vec<_> = |
| target_events? |
| .into_iter() |
| .map(|e| { |
| if let discovery::TargetEvent::Added(handle) = e { |
| handle |
| } else { |
| unreachable!() |
| } |
| }) |
| .collect(); |
| |
| // Sometimes libdiscovery returns multiple Added events for the same target (I think always |
| // user emulators). The information is always the same, let's just extract the unique entries. |
| let unique_handles = added_handles.into_iter().collect::<HashSet<_>>(); |
| Ok(unique_handles.into_iter().collect()) |
| } |
| |
| /// Attempts to resolve the query into a target's ssh-able address. Returns Some(_) if a target has been |
| /// found, None otherwise. |
| pub async fn resolve_target_address( |
| target_spec: Option<String>, |
| env_context: &EnvironmentContext, |
| ) -> Result<Option<SocketAddr>> { |
| let query = TargetInfoQuery::from(target_spec.clone()); |
| // If it's already an address, return it |
| if let TargetInfoQuery::Addr(a) = query { |
| let scope_id = if let SocketAddr::V6(addr) = a { addr.scope_id() } else { 0 }; |
| return Ok(Some(TargetAddr::new(a.ip(), scope_id, SSH_PORT_DEFAULT).into())); |
| } |
| let handles = resolve_target_query(query, env_context).await?; |
| if handles.len() == 0 { |
| return Ok(None); |
| } |
| if handles.len() > 1 { |
| return Err(FfxError::DaemonError { |
| err: DaemonError::TargetAmbiguous, |
| target: target_spec, |
| } |
| .into()); |
| } |
| let target = &handles[0]; |
| |
| match &target.state { |
| TargetState::Product(ref addresses) => { |
| if addresses.is_empty() { |
| return Err(anyhow::anyhow!( |
| "Target discovered but does not contain addresses: {target:?}" |
| )); |
| } |
| let mut addrs_sorted = addresses |
| .into_iter() |
| .map(SocketAddr::from) |
| .sorted_by(|a1, a2| { |
| match (a1.ip().is_link_local_addr(), a2.ip().is_link_local_addr()) { |
| (true, true) | (false, false) => Ordering::Equal, |
| (true, false) => Ordering::Less, |
| (false, true) => Ordering::Greater, |
| } |
| }) |
| .collect::<Vec<_>>(); |
| let mut sock: SocketAddr = addrs_sorted.pop().unwrap(); |
| if sock.port() == 0 { |
| sock.set_port(SSH_PORT_DEFAULT) |
| } |
| Ok(Some(sock)) |
| } |
| state => Err(anyhow::anyhow!("Target discovered but not in the correct state: {state:?}")), |
| } |
| } |
| |
| /// If daemon discovery is disabled, attempts to resolve the query into an |
| /// _address_ string query that can be passed to the daemon. If already an |
| /// address, just return it. Otherwise, perform discovery to find the address. |
| /// Returns Some(_) if a target has been found, None otherwise. |
| pub async fn maybe_locally_resolve_target_spec( |
| target_spec: Option<String>, |
| env_context: &EnvironmentContext, |
| ) -> Result<Option<String>> { |
| if is_discovery_enabled(env_context).await { |
| Ok(target_spec) |
| } else { |
| let query = TargetInfoQuery::from(target_spec.clone()); |
| let addr = match query { |
| TargetInfoQuery::Addr(addr) => Some(addr), |
| _ => resolve_target_address(target_spec, env_context).await?, |
| }; |
| Ok(addr.map(|addr| addr.to_string())) |
| } |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum KnockError { |
| #[error("critical error encountered: {0:?}")] |
| CriticalError(anyhow::Error), |
| #[error("non-critical error encountered: {0:?}")] |
| NonCriticalError(#[from] anyhow::Error), |
| } |
| |
| const RCS_TIMEOUT: Duration = Duration::from_secs(3); |
| |
| /// Attempts to "knock" a target to determine if it is up and connectable via RCS. |
| /// |
| /// This is intended to be run in a loop, with a non-critical error implying the caller |
| /// should call again, and a critical error implying the caller should raise the error |
| /// and no longer loop. |
| pub async fn knock_target(target: &TargetProxy) -> Result<(), KnockError> { |
| knock_target_with_timeout(target, RCS_TIMEOUT).await |
| } |
| |
| /// Attempts to "knock" a target to determine if it is up and connectable via RCS, within |
| /// a specified timeout. |
| /// |
| /// This is intended to be run in a loop, with a non-critical error implying the caller |
| /// should call again, and a critical error implying the caller should raise the error |
| /// and no longer loop. |
| pub async fn knock_target_with_timeout( |
| target: &TargetProxy, |
| rcs_timeout: Duration, |
| ) -> Result<(), KnockError> { |
| let (rcs_proxy, remote_server_end) = create_proxy::<RemoteControlMarker>() |
| .map_err(|e| KnockError::NonCriticalError(e.into()))?; |
| timeout(rcs_timeout, target.open_remote_control(remote_server_end)) |
| .await |
| .context("timing out")? |
| .context("opening remote_control")? |
| .map_err(|e| anyhow::anyhow!("open remote control err: {:?}", e))?; |
| rcs::knock_rcs(&rcs_proxy) |
| .await |
| .map_err(|e| KnockError::NonCriticalError(anyhow::anyhow!("{e:?}"))) |
| } |
| |
| /// Same as `knock_target_with_timeout` but takes a `TargetCollection` and an |
| /// optional target name and finds the target to knock. Uses the configured |
| /// default target if `target_name` is `None`. |
| pub async fn knock_target_by_name( |
| target_name: &Option<String>, |
| target_collection_proxy: &TargetCollectionProxy, |
| open_timeout: Duration, |
| rcs_timeout: Duration, |
| ) -> Result<(), KnockError> { |
| let (target_proxy, target_remote) = |
| create_proxy::<TargetMarker>().map_err(|e| KnockError::NonCriticalError(e.into()))?; |
| |
| timeout::timeout( |
| open_timeout, |
| target_collection_proxy.open_target( |
| &TargetQuery { string_matcher: target_name.clone(), ..Default::default() }, |
| target_remote, |
| ), |
| ) |
| .await |
| .map_err(|_e| { |
| KnockError::NonCriticalError(errors::ffx_error!("Timeout opening target.").into()) |
| })? |
| .map_err(|e| { |
| KnockError::CriticalError( |
| errors::ffx_error!("Lost connection to the Daemon. Full context:\n{}", e).into(), |
| ) |
| })? |
| .map_err(|e| { |
| KnockError::CriticalError(errors::ffx_error!("Error opening target: {:?}", e).into()) |
| })?; |
| |
| knock_target_with_timeout(&target_proxy, rcs_timeout).await |
| } |
| |
| struct OvernetClient { |
| node: Arc<overnet_core::Router>, |
| } |
| |
| impl OvernetClient { |
| async fn locate_remote_control_node(&self) -> Result<overnet_core::NodeId> { |
| let lpc = self.node.new_list_peers_context().await; |
| let node_id; |
| 'found: loop { |
| let new_peers = lpc.list_peers().await?; |
| for peer in &new_peers { |
| let peer_has_remote_control = |
| peer.services.contains(&RemoteControlMarker::PROTOCOL_NAME.to_string()); |
| if peer_has_remote_control { |
| node_id = peer.node_id; |
| break 'found; |
| } |
| } |
| } |
| Ok(node_id) |
| } |
| |
| /// This is the remote control proxy that should be used for everything. |
| /// |
| /// If this is dropped, it will close the FidlPipe connection. |
| pub(crate) async fn connect_remote_control(&self) -> Result<RemoteControlProxy> { |
| let (server, client) = fidl::Channel::create(); |
| let node_id = self.locate_remote_control_node().await?; |
| let _ = self |
| .node |
| .connect_to_service(node_id, RemoteControlMarker::PROTOCOL_NAME, server) |
| .await?; |
| let proxy = RemoteControlProxy::new(fidl::AsyncChannel::from_channel(client)); |
| Ok(proxy) |
| } |
| } |
| |
| /// Represents a direct (no daemon) connection to a Fuchsia target. |
| pub struct DirectConnection { |
| overnet: OvernetClient, |
| fidl_pipe: FidlPipe, |
| rcs_proxy: Mutex<Option<RemoteControlProxy>>, |
| } |
| |
| impl DirectConnection { |
| pub async fn new(target_spec: String, context: EnvironmentContext) -> Result<Self> { |
| let addr = resolve_target_address(Some(target_spec.clone()), &context) |
| .await? |
| .ok_or_else(|| anyhow::anyhow!("Unable to resolve address of target '{target_spec}'")) |
| .context("resolving target address")?; |
| let node = overnet_core::Router::new(None)?; |
| let fidl_pipe = FidlPipe::new(context.clone(), addr, node.clone()) |
| .await |
| .context("starting fidl pipe")?; |
| Ok(Self { overnet: OvernetClient { node }, fidl_pipe, rcs_proxy: Default::default() }) |
| } |
| |
| pub async fn rcs_proxy(&self) -> Result<RemoteControlProxy> { |
| let mut rcs = self.rcs_proxy.lock().await; |
| if rcs.is_none() { |
| *rcs = Some( |
| self.overnet |
| .connect_remote_control() |
| .await |
| .map_err(|e| self.wrap_connection_errors(e).context("getting RCS proxy"))?, |
| ); |
| } |
| Ok(rcs.as_ref().unwrap().clone()) |
| } |
| |
| pub async fn knock_rcs(&self) -> Result<Option<CompatibilityInfo>, KnockError> { |
| let mut rcs = self.rcs_proxy.lock().await; |
| // These are the two places where errors can propagate to the user. For other code using |
| // the FIDL pipe it might be trickier to ensure that errors from the pipe are caught. |
| // For things like FHO integration these errors will probably just be handled outside of |
| // the main subtool. |
| if rcs.is_none() { |
| *rcs = Some(self.overnet.connect_remote_control().await.map_err(|e| { |
| KnockError::CriticalError( |
| self.wrap_connection_errors(e).context("finding RCS proxy"), |
| ) |
| })?); |
| } |
| let res = rcs::knock_rcs(rcs.as_ref().unwrap()).await.map_err(|e| anyhow::anyhow!("{e:?}")); |
| match res { |
| Ok(()) => Ok(self.fidl_pipe.compatibility_info()), |
| Err(e) => Err(KnockError::NonCriticalError( |
| self.wrap_connection_errors(e).context("getting RCS proxy"), |
| )), |
| } |
| } |
| |
| /// Takes a given connection error and, if there have been underlying connection errors, adds |
| /// additional context to the passed error, else leaves the error the same. |
| /// |
| /// This function is used to overcome some of the shortcomings around FIDL errors, as on the |
| /// host they are being used to simulate what is essentially a networked connection, and not an |
| /// OS-backed operation (like when using FIDL on a Fuchsia device). |
| pub fn wrap_connection_errors(&self, e: anyhow::Error) -> anyhow::Error { |
| if let Some(pipe_errors) = self.fidl_pipe.try_drain_errors() { |
| return anyhow::anyhow!("{e:?}\n{pipe_errors:?}"); |
| } |
| e |
| } |
| } |
| |
| /// Identical to the above "knock_target" but does not use the daemon. |
| /// |
| /// Unlike other errors, this is not intended to be run in a tight loop. |
| pub async fn knock_target_daemonless( |
| target_spec: String, |
| context: &EnvironmentContext, |
| ) -> Result<Option<CompatibilityInfo>, KnockError> { |
| let conn = DirectConnection::new(target_spec, context.clone()).await?; |
| conn.knock_rcs().await |
| } |
| |
| /// Get the target specifier. This uses the normal config mechanism which |
| /// supports flexible config values: it can be a string naming the target, or |
| /// a list of strings, in which case the first valid entry is used. (The most |
| /// common use of this functionality would be to specify an array of environment |
| /// variables, e.g. ["$FUCHSIA_TARGET_ADDR", "FUCHSIA_NODENAME"]). |
| /// The result is a string which can be turned into a `TargetInfoQuery` to match |
| /// against the available targets (by name, address, etc). We don't return the query |
| /// itself, because some callers assume the specifier is the name of the target, |
| /// for the purposes of error messages, etc. E.g. The repo server only works if |
| /// an explicit _name_ is provided. In other contexts, it is valid for the specifier |
| /// to be a substring, a network address, etc. |
| pub async fn get_target_specifier(context: &EnvironmentContext) -> Result<Option<String>> { |
| let target_spec = context.get(TARGET_DEFAULT_KEY).await?; |
| match target_spec { |
| Some(ref target) => info!("Target specifier: ['{target:?}']"), |
| None => debug!("No target specified"), |
| } |
| Ok(target_spec) |
| } |
| |
| pub async fn add_manual_target( |
| target_collection_proxy: &TargetCollectionProxy, |
| addr: IpAddr, |
| scope_id: u32, |
| port: u16, |
| wait: bool, |
| ) -> Result<()> { |
| let ip = match addr { |
| IpAddr::V6(i) => net::IpAddress::Ipv6(net::Ipv6Address { addr: i.octets().into() }), |
| IpAddr::V4(i) => net::IpAddress::Ipv4(net::Ipv4Address { addr: i.octets().into() }), |
| }; |
| let addr = if port > 0 { |
| ffx::TargetAddrInfo::IpPort(ffx::TargetIpPort { ip, port, scope_id }) |
| } else { |
| ffx::TargetAddrInfo::Ip(ffx::TargetIp { ip, scope_id }) |
| }; |
| |
| let (client, mut stream) = |
| fidl::endpoints::create_request_stream::<ffx::AddTargetResponder_Marker>() |
| .context("create endpoints")?; |
| target_collection_proxy |
| .add_target( |
| &addr, |
| &ffx::AddTargetConfig { verify_connection: Some(wait), ..Default::default() }, |
| client, |
| ) |
| .context("calling AddTarget")?; |
| let res = if let Ok(Some(req)) = stream.try_next().await { |
| match req { |
| ffx::AddTargetResponder_Request::Success { .. } => Ok(()), |
| ffx::AddTargetResponder_Request::Error { err, .. } => Err(err), |
| } |
| } else { |
| ffx_bail!("ffx lost connection to the daemon before receiving a response."); |
| }; |
| res.map_err(|e| { |
| let err = e.connection_error.unwrap(); |
| let logs = e.connection_error_logs.map(|v| v.join("\n")); |
| let target = Some(format!("{addr:?}")); |
| FfxError::TargetConnectionError { err, target, logs }.into() |
| }) |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use crate::overnet_connector::{OvernetConnection, OvernetConnector}; |
| use async_channel::Receiver; |
| use ffx_config::{macro_deps::serde_json::Value, test_init, ConfigLevel}; |
| use fidl_fuchsia_developer_remotecontrol as rcs; |
| use fuchsia_async::Task; |
| |
| fn create_overnet_circuit(router: Arc<overnet_core::Router>) -> fidl::AsyncSocket { |
| let (local_socket, remote_socket) = fidl::Socket::create_stream(); |
| let local_socket = fidl::AsyncSocket::from_socket(local_socket); |
| |
| let socket = fidl::AsyncSocket::from_socket(remote_socket); |
| let (mut rx, mut tx) = futures::AsyncReadExt::split(socket); |
| Task::spawn(async move { |
| let (errors_sender, errors) = futures::channel::mpsc::unbounded(); |
| if let Err(e) = futures::future::join( |
| circuit::multi_stream::multi_stream_node_connection_to_async( |
| router.circuit_node(), |
| &mut rx, |
| &mut tx, |
| true, |
| circuit::Quality::NETWORK, |
| errors_sender, |
| "client".to_owned(), |
| ), |
| errors |
| .map(|e| { |
| eprintln!("A client circuit stream failed: {e:?}"); |
| }) |
| .collect::<()>(), |
| ) |
| .map(|(result, ())| result) |
| .await |
| { |
| if let circuit::Error::ConnectionClosed(msg) = e { |
| eprintln!("testing overnet link closed: {:?}", msg); |
| } else { |
| eprintln!("error handling Overnet link: {:?}", e); |
| } |
| } |
| }) |
| .detach(); |
| |
| local_socket |
| } |
| |
| #[fuchsia::test] |
| async fn test_get_empty_default_target() { |
| let env = test_init().await.unwrap(); |
| let target_spec = get_target_specifier(&env.context).await.unwrap(); |
| assert_eq!(target_spec, None); |
| } |
| |
| #[fuchsia::test] |
| async fn test_set_default_target() { |
| let env = test_init().await.unwrap(); |
| env.context |
| .query(TARGET_DEFAULT_KEY) |
| .level(Some(ConfigLevel::User)) |
| .set(Value::String("some_target".to_owned())) |
| .await |
| .unwrap(); |
| |
| let target_spec = get_target_specifier(&env.context).await.unwrap(); |
| assert_eq!(target_spec, Some("some_target".to_owned())); |
| } |
| |
| #[fuchsia::test] |
| async fn test_default_first_target_in_array() { |
| let env = test_init().await.unwrap(); |
| let ts: Vec<Value> = ["t1", "t2"].iter().map(|s| Value::String(s.to_string())).collect(); |
| env.context |
| .query(TARGET_DEFAULT_KEY) |
| .level(Some(ConfigLevel::User)) |
| .set(Value::Array(ts)) |
| .await |
| .unwrap(); |
| |
| let target_spec = get_target_specifier(&env.context).await.unwrap(); |
| assert_eq!(target_spec, Some("t1".to_owned())); |
| } |
| |
| #[fuchsia::test] |
| async fn test_default_missing_env_ignored() { |
| let env = test_init().await.unwrap(); |
| let ts: Vec<Value> = |
| ["$THIS_BETTER_NOT_EXIST", "t2"].iter().map(|s| Value::String(s.to_string())).collect(); |
| env.context |
| .query(TARGET_DEFAULT_KEY) |
| .level(Some(ConfigLevel::User)) |
| .set(Value::Array(ts)) |
| .await |
| .unwrap(); |
| |
| let target_spec = get_target_specifier(&env.context).await.unwrap(); |
| assert_eq!(target_spec, Some("t2".to_owned())); |
| } |
| |
| #[fuchsia::test] |
| async fn test_default_env_present() { |
| std::env::set_var("MY_LITTLE_TMPKEY", "t1"); |
| let env = test_init().await.unwrap(); |
| let ts: Vec<Value> = |
| ["$MY_LITTLE_TMPKEY", "t2"].iter().map(|s| Value::String(s.to_string())).collect(); |
| env.context |
| .query(TARGET_DEFAULT_KEY) |
| .level(Some(ConfigLevel::User)) |
| .set(Value::Array(ts)) |
| .await |
| .unwrap(); |
| |
| let target_spec = get_target_specifier(&env.context).await.unwrap(); |
| assert_eq!(target_spec, Some("t1".to_owned())); |
| std::env::remove_var("MY_LITTLE_TMPKEY"); |
| } |
| |
| #[derive(Debug, Clone, Eq, PartialEq)] |
| enum FakeOvernetBehavior { |
| CloseRcsImmediately, |
| KeepRcsOpen, |
| } |
| |
| #[derive(Debug)] |
| struct FakeOvernet { |
| circuit_node: Arc<overnet_core::Router>, |
| error_receiver: Receiver<anyhow::Error>, |
| behavior: FakeOvernetBehavior, |
| } |
| |
| impl FakeOvernet { |
| async fn handle_transaction( |
| req: rcs::RemoteControlRequest, |
| behavior: &FakeOvernetBehavior, |
| ) { |
| match req { |
| rcs::RemoteControlRequest::OpenCapability { server_channel, responder, .. } => { |
| match behavior { |
| FakeOvernetBehavior::KeepRcsOpen => { |
| // We're just going to assume this capability is always going to be |
| // RCS, and avoid string matching for the sake of avoiding changes |
| // to monikers and/or capability connecting. |
| let mut stream = rcs::RemoteControlRequestStream::from_channel( |
| fidl::AsyncChannel::from_channel(server_channel), |
| ); |
| // This task is here to ensure the channel stays open, but won't |
| // necessarily need do anything. |
| Task::spawn(async move { |
| while let Ok(Some(req)) = stream.try_next().await { |
| eprintln!("Got a request: {req:?}") |
| } |
| }) |
| .detach(); |
| } |
| FakeOvernetBehavior::CloseRcsImmediately => { |
| drop(server_channel); |
| } |
| } |
| responder.send(Ok(())).unwrap(); |
| } |
| rcs::RemoteControlRequest::EchoString { value, responder } => { |
| responder.send(&value).unwrap() |
| } |
| _ => panic!("Received an unexpected request: {req:?}"), |
| } |
| } |
| } |
| |
| impl OvernetConnector for FakeOvernet { |
| async fn connect(&mut self) -> Result<OvernetConnection> { |
| let circuit_socket = create_overnet_circuit(self.circuit_node.clone()); |
| let (rcs_sender, rcs_receiver) = async_channel::unbounded(); |
| self.circuit_node |
| .register_service( |
| rcs::RemoteControlMarker::PROTOCOL_NAME.to_owned(), |
| move |channel| { |
| let _ = rcs_sender.try_send(channel).unwrap(); |
| Ok(()) |
| }, |
| ) |
| .await |
| .unwrap(); |
| let behavior = self.behavior.clone(); |
| let rcs_task = Task::local(async move { |
| while let Ok(channel) = rcs_receiver.recv().await { |
| let mut stream = rcs::RemoteControlRequestStream::from_channel( |
| fidl::AsyncChannel::from_channel(channel), |
| ); |
| while let Ok(Some(req)) = stream.try_next().await { |
| Self::handle_transaction(req, &behavior).await; |
| } |
| } |
| }); |
| let (circuit_reader, circuit_writer) = tokio::io::split(circuit_socket); |
| Ok(OvernetConnection { |
| output: Box::new(tokio::io::BufReader::new(circuit_reader)), |
| input: Box::new(circuit_writer), |
| errors: self.error_receiver.clone(), |
| compat: None, |
| main_task: Some(rcs_task), |
| }) |
| } |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_overnet_rcs_knock() { |
| let node = overnet_core::Router::new(None).unwrap(); |
| let overnet_socket = create_overnet_socket(node.clone()).unwrap(); |
| let (reader, writer) = tokio::io::split(overnet_socket); |
| let circuit_node = overnet_core::Router::new(None).unwrap(); |
| let (_sender, error_receiver) = async_channel::unbounded(); |
| let circuit = FakeOvernet { |
| circuit_node: circuit_node.clone(), |
| error_receiver, |
| behavior: FakeOvernetBehavior::KeepRcsOpen, |
| }; |
| let fidl_pipe = |
| FidlPipe::start_internal("127.0.0.1:22".parse().unwrap(), reader, writer, circuit) |
| .await |
| .unwrap(); |
| let conn = DirectConnection { |
| overnet: OvernetClient { node }, |
| fidl_pipe, |
| rcs_proxy: Default::default(), |
| }; |
| assert!(conn.knock_rcs().await.is_ok()); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_overnet_rcs_knock_failure_disconnect() { |
| let node = overnet_core::Router::new(None).unwrap(); |
| let overnet_socket = create_overnet_socket(node.clone()).unwrap(); |
| let (reader, writer) = tokio::io::split(overnet_socket); |
| let circuit_node = overnet_core::Router::new(None).unwrap(); |
| let (error_sender, error_receiver) = async_channel::unbounded(); |
| let circuit = FakeOvernet { |
| circuit_node: circuit_node.clone(), |
| error_receiver, |
| behavior: FakeOvernetBehavior::CloseRcsImmediately, |
| }; |
| let fidl_pipe = |
| FidlPipe::start_internal("[::1]:22".parse().unwrap(), reader, writer, circuit) |
| .await |
| .unwrap(); |
| let conn = DirectConnection { |
| overnet: OvernetClient { node }, |
| fidl_pipe, |
| rcs_proxy: Default::default(), |
| }; |
| error_sender.send(anyhow::anyhow!("kaboom")).await.unwrap(); |
| let err = conn.knock_rcs().await; |
| assert!(err.is_err()); |
| let err_string = err.unwrap_err().to_string(); |
| assert!(err_string.contains("kaboom")); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_overnet_rcs_echo_multiple_times() { |
| let node = overnet_core::Router::new(None).unwrap(); |
| let overnet_socket = create_overnet_socket(node.clone()).unwrap(); |
| let (reader, writer) = tokio::io::split(overnet_socket); |
| let circuit_node = overnet_core::Router::new(None).unwrap(); |
| let (_sender, error_receiver) = async_channel::unbounded(); |
| let circuit = FakeOvernet { |
| circuit_node: circuit_node.clone(), |
| error_receiver, |
| behavior: FakeOvernetBehavior::KeepRcsOpen, |
| }; |
| let fidl_pipe = |
| FidlPipe::start_internal("127.0.0.1:22".parse().unwrap(), reader, writer, circuit) |
| .await |
| .unwrap(); |
| let conn = DirectConnection { |
| overnet: OvernetClient { node }, |
| fidl_pipe, |
| rcs_proxy: Default::default(), |
| }; |
| let rcs = conn.rcs_proxy().await.unwrap(); |
| assert_eq!(rcs.echo_string("foobart").await.unwrap(), "foobart".to_owned()); |
| let rcs2 = conn.rcs_proxy().await.unwrap(); |
| assert_eq!(rcs2.echo_string("foobarr").await.unwrap(), "foobarr".to_owned()); |
| drop(rcs); |
| drop(rcs2); |
| let rcs3 = conn.rcs_proxy().await.unwrap(); |
| assert_eq!(rcs3.echo_string("foobarz").await.unwrap(), "foobarz".to_owned()); |
| } |
| } |