blob: 67aa73dcf081ab019d803b01682c83fe51c0f1bf [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use addr::TargetAddr;
use anyhow::{Context as _, Result};
use async_lock::Mutex;
use compat_info::CompatibilityInfo;
use discovery::{DiscoverySources, TargetEvent, TargetHandle, TargetState};
use errors::{ffx_bail, FfxError};
use ffx_config::{keys::TARGET_DEFAULT_KEY, EnvironmentContext};
use fidl::{endpoints::create_proxy, prelude::*};
use fidl_fuchsia_developer_ffx::{
self as ffx, DaemonError, DaemonProxy, TargetAddrInfo, TargetCollectionMarker,
TargetCollectionProxy, TargetInfo, TargetIp, TargetMarker, TargetQuery,
};
use fidl_fuchsia_developer_remotecontrol::{RemoteControlMarker, RemoteControlProxy};
use fidl_fuchsia_net as net;
use fuchsia_async::Timer;
use futures::{select, Future, FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use netext::IsLocalAddr;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::net::{IpAddr, Ipv6Addr};
use std::net::{SocketAddr, SocketAddrV6};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use timeout::timeout;
use tracing::{debug, info};
mod desc;
mod fidl_pipe;
mod overnet_connector;
mod query;
mod ssh_connector;
const SSH_PORT_DEFAULT: u16 = 22;
pub use desc::{Description, FastbootInterface};
pub use fidl_pipe::create_overnet_socket;
pub use fidl_pipe::FidlPipe;
pub use query::TargetInfoQuery;
/// Re-export of [`fidl_fuchsia_developer_ffx::TargetProxy`] for ease of use
pub use fidl_fuchsia_developer_ffx::TargetProxy;
const FASTBOOT_INLINE_TARGET: &str = "ffx.fastboot.inline_target";
const CONFIG_LOCAL_DISCOVERY_TIMEOUT: &str = "discovery.timeout";
/// Attempt to connect to RemoteControl on a target device using a connection to a daemon.
///
/// The optional |target| is a string matcher as defined in fuchsia.developer.ffx.TargetQuery
/// fidl table.
#[tracing::instrument]
pub async fn get_remote_proxy(
target_spec: Option<String>,
daemon_proxy: DaemonProxy,
proxy_timeout: Duration,
mut target_info: Option<&mut Option<TargetInfo>>,
env_context: &EnvironmentContext,
) -> Result<RemoteControlProxy> {
let (target_proxy, target_proxy_fut) =
open_target_with_fut(target_spec.clone(), daemon_proxy, proxy_timeout, env_context)?;
let mut target_proxy_fut = target_proxy_fut.boxed_local().fuse();
let (remote_proxy, remote_server_end) = create_proxy::<RemoteControlMarker>()?;
let mut open_remote_control_fut =
target_proxy.open_remote_control(remote_server_end).boxed_local().fuse();
let res = loop {
select! {
res = open_remote_control_fut => {
match res {
Err(e) => {
// Getting here is most likely the result of a PEER_CLOSED error, which
// may be because the target_proxy closure has propagated faster than
// the error (which can happen occasionally). To counter this, wait for
// the target proxy to complete, as it will likely only need to be
// polled once more (open_remote_control_fut partially depends on it).
target_proxy_fut.await?;
return Err(e.into());
}
Ok(r) => break(r),
}
}
res = target_proxy_fut => {
res?;
if let Some(ref mut info_out) = target_info {
**info_out = Some(target_proxy.identity().await?);
}
}
}
};
let target_spec = target_spec.as_ref().map(ToString::to_string);
match res {
Ok(_) => Ok(remote_proxy),
Err(err) => Err(anyhow::Error::new(FfxError::TargetConnectionError {
err,
target: target_spec,
logs: Some(target_proxy.get_ssh_logs().await?),
})),
}
}
/// Attempt to connect to a target given a connection to a daemon.
///
/// The returned future must be polled to completion. It is returned separately
/// from the TargetProxy to enable immediately pushing requests onto the TargetProxy
/// before connecting to the target completes.
///
/// The optional |target| is a string matcher as defined in fuchsia.developer.ffx.TargetQuery
/// fidl table.
#[tracing::instrument]
pub fn open_target_with_fut<'a, 'b: 'a>(
target: Option<String>,
daemon_proxy: DaemonProxy,
target_timeout: Duration,
env_context: &'b EnvironmentContext,
) -> Result<(TargetProxy, impl Future<Output = Result<()>> + 'a)> {
let (tc_proxy, tc_server_end) = create_proxy::<TargetCollectionMarker>()?;
let (target_proxy, target_server_end) = create_proxy::<TargetMarker>()?;
let t_clone = target.clone();
let target_collection_fut = async move {
daemon_proxy
.connect_to_protocol(
TargetCollectionMarker::PROTOCOL_NAME,
tc_server_end.into_channel(),
)
.await?
.map_err(|err| FfxError::DaemonError { err, target: t_clone })?;
Result::<()>::Ok(())
};
let t_clone = target.clone();
let target_handle_fut = async move {
let is_fastboot_inline = env_context.get(FASTBOOT_INLINE_TARGET).await.unwrap_or(false);
if is_fastboot_inline {
if let Some(ref serial_number) = target {
tracing::trace!("got serial number: {serial_number}");
timeout(target_timeout, tc_proxy.add_inline_fastboot_target(&serial_number))
.await??;
}
}
timeout(
target_timeout,
tc_proxy.open_target(
&TargetQuery { string_matcher: t_clone.clone(), ..Default::default() },
target_server_end,
),
)
.await
.map_err(|_| FfxError::DaemonError { err: DaemonError::Timeout, target: t_clone })??
.map_err(|err| FfxError::OpenTargetError { err, target })?;
Result::<()>::Ok(())
};
let fut = async move {
let ((), ()) = futures::try_join!(target_collection_fut, target_handle_fut)?;
Ok(())
};
Ok((target_proxy, fut))
}
pub(crate) fn target_addr_info_to_socket(ti: &TargetAddrInfo) -> SocketAddr {
let (target_ip, port) = match ti {
TargetAddrInfo::Ip(a) => (a.clone(), 0),
TargetAddrInfo::IpPort(ip) => (TargetIp { ip: ip.ip, scope_id: ip.scope_id }, ip.port),
};
let socket = match target_ip {
TargetIp { ip: net::IpAddress::Ipv4(net::Ipv4Address { addr }), .. } => {
SocketAddr::new(IpAddr::from(addr), port)
}
TargetIp { ip: net::IpAddress::Ipv6(net::Ipv6Address { addr }), scope_id, .. } => {
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(addr), port, 0, scope_id))
}
};
socket
}
pub async fn is_discovery_enabled(ctx: &EnvironmentContext) -> bool {
!ffx_config::is_usb_discovery_disabled(ctx).await
|| !ffx_config::is_mdns_discovery_disabled(ctx).await
}
fn non_empty_match_name(on1: &Option<String>, on2: &Option<String>) -> bool {
match (on1, on2) {
(Some(n1), Some(n2)) => n1 == n2,
_ => false,
}
}
fn non_empty_match_addr(state: &discovery::TargetState, sa: &Option<SocketAddr>) -> bool {
match (state, sa) {
(discovery::TargetState::Product(addrs), Some(sa)) => {
addrs.iter().any(|a| a.ip() == sa.ip())
}
_ => false,
}
}
// Descriptions are used for matching against a TargetInfoQuery
fn handle_to_description(handle: &discovery::TargetHandle) -> Description {
let addresses = match &handle.state {
discovery::TargetState::Product(target_addr) => target_addr.clone(),
_ => vec![],
};
Description { nodename: handle.node_name.clone(), addresses, ..Default::default() }
}
pub async fn resolve_target_query(
query: TargetInfoQuery,
ctx: &EnvironmentContext,
) -> Result<Vec<discovery::TargetHandle>> {
// Get nodename, in case we're trying to find an exact match
let (qname, qaddr) = match query {
TargetInfoQuery::NodenameOrSerial(ref s) => (Some(s.clone()), None),
TargetInfoQuery::Addr(ref a) => (None, Some(a.clone())),
_ => (None, None),
};
let sources = DiscoverySources::MDNS
| DiscoverySources::USB
| DiscoverySources::MANUAL
| DiscoverySources::EMULATOR;
let filter = move |handle: &TargetHandle| {
let description = handle_to_description(handle);
query.match_description(&description)
};
let stream = discovery::wait_for_devices(filter, true, false, sources).await?;
let discovery_delay = ctx.get(CONFIG_LOCAL_DISCOVERY_TIMEOUT).await.unwrap_or(1000);
let delay = Duration::from_millis(discovery_delay);
// This is tricky. We want the stream to complete immediately if we find
// a target whose name matches the query exactly. Otherwise, run until the
// timer fires.
// We can't use `Stream::wait_until()`, because that would require us
// to return true for the found item, and false for the _next_ item.
// But there may be no next item, so the stream would end up waiting for
// the timer anyway. Instead, we create two futures: the timer, and one
// that is ready when we find the name we're looking for. Then we use
// `Stream::take_until()`, waiting until _either_ of those futures is ready
// (by using `race()`). The only remaining tricky part is that we need to
// examine each event to determine if it matches what we're looking for --
// so we interpose a closure via `Stream::map()` that examines each item,
// before returning them unmodified.
// Oh, and once we've got a set of results, if any of them are Err, cause
// the whole thing to be an Err. We could stop the race early in case of
// failure by using the same technique, I suppose.
let target_events: Result<Vec<TargetEvent>> = {
let timer = Timer::new(delay).fuse();
let found_target_event = async_utils::event::Event::new();
let found_it = found_target_event.wait().fuse();
let results: Vec<Result<_>> = stream
.map(move |ev| {
if let Ok(TargetEvent::Added(ref h)) = ev {
if non_empty_match_name(&h.node_name, &qname) {
found_target_event.signal();
} else if non_empty_match_addr(&h.state, &qaddr) {
found_target_event.signal();
}
ev
} else {
unreachable!()
}
})
.take_until(futures_lite::future::race(timer, found_it))
.collect()
.await;
// Fail if any results are Err
let r: Result<Vec<_>> = results.into_iter().collect();
r
};
// Extract handles from Added events
let added_handles: Vec<_> =
target_events?
.into_iter()
.map(|e| {
if let discovery::TargetEvent::Added(handle) = e {
handle
} else {
unreachable!()
}
})
.collect();
// Sometimes libdiscovery returns multiple Added events for the same target (I think always
// user emulators). The information is always the same, let's just extract the unique entries.
let unique_handles = added_handles.into_iter().collect::<HashSet<_>>();
Ok(unique_handles.into_iter().collect())
}
/// Attempts to resolve the query into a target's ssh-able address. Returns Some(_) if a target has been
/// found, None otherwise.
pub async fn resolve_target_address(
target_spec: Option<String>,
env_context: &EnvironmentContext,
) -> Result<Option<SocketAddr>> {
let query = TargetInfoQuery::from(target_spec.clone());
// If it's already an address, return it
if let TargetInfoQuery::Addr(a) = query {
let scope_id = if let SocketAddr::V6(addr) = a { addr.scope_id() } else { 0 };
return Ok(Some(TargetAddr::new(a.ip(), scope_id, SSH_PORT_DEFAULT).into()));
}
let handles = resolve_target_query(query, env_context).await?;
if handles.len() == 0 {
return Ok(None);
}
if handles.len() > 1 {
return Err(FfxError::DaemonError {
err: DaemonError::TargetAmbiguous,
target: target_spec,
}
.into());
}
let target = &handles[0];
match &target.state {
TargetState::Product(ref addresses) => {
if addresses.is_empty() {
return Err(anyhow::anyhow!(
"Target discovered but does not contain addresses: {target:?}"
));
}
let mut addrs_sorted = addresses
.into_iter()
.map(SocketAddr::from)
.sorted_by(|a1, a2| {
match (a1.ip().is_link_local_addr(), a2.ip().is_link_local_addr()) {
(true, true) | (false, false) => Ordering::Equal,
(true, false) => Ordering::Less,
(false, true) => Ordering::Greater,
}
})
.collect::<Vec<_>>();
let mut sock: SocketAddr = addrs_sorted.pop().unwrap();
if sock.port() == 0 {
sock.set_port(SSH_PORT_DEFAULT)
}
Ok(Some(sock))
}
state => Err(anyhow::anyhow!("Target discovered but not in the correct state: {state:?}")),
}
}
/// If daemon discovery is disabled, attempts to resolve the query into an
/// _address_ string query that can be passed to the daemon. If already an
/// address, just return it. Otherwise, perform discovery to find the address.
/// Returns Some(_) if a target has been found, None otherwise.
pub async fn maybe_locally_resolve_target_spec(
target_spec: Option<String>,
env_context: &EnvironmentContext,
) -> Result<Option<String>> {
if is_discovery_enabled(env_context).await {
Ok(target_spec)
} else {
let query = TargetInfoQuery::from(target_spec.clone());
let addr = match query {
TargetInfoQuery::Addr(addr) => Some(addr),
_ => resolve_target_address(target_spec, env_context).await?,
};
Ok(addr.map(|addr| addr.to_string()))
}
}
#[derive(Debug, Error)]
pub enum KnockError {
#[error("critical error encountered: {0:?}")]
CriticalError(anyhow::Error),
#[error("non-critical error encountered: {0:?}")]
NonCriticalError(#[from] anyhow::Error),
}
const RCS_TIMEOUT: Duration = Duration::from_secs(3);
/// Attempts to "knock" a target to determine if it is up and connectable via RCS.
///
/// This is intended to be run in a loop, with a non-critical error implying the caller
/// should call again, and a critical error implying the caller should raise the error
/// and no longer loop.
pub async fn knock_target(target: &TargetProxy) -> Result<(), KnockError> {
knock_target_with_timeout(target, RCS_TIMEOUT).await
}
/// Attempts to "knock" a target to determine if it is up and connectable via RCS, within
/// a specified timeout.
///
/// This is intended to be run in a loop, with a non-critical error implying the caller
/// should call again, and a critical error implying the caller should raise the error
/// and no longer loop.
pub async fn knock_target_with_timeout(
target: &TargetProxy,
rcs_timeout: Duration,
) -> Result<(), KnockError> {
let (rcs_proxy, remote_server_end) = create_proxy::<RemoteControlMarker>()
.map_err(|e| KnockError::NonCriticalError(e.into()))?;
timeout(rcs_timeout, target.open_remote_control(remote_server_end))
.await
.context("timing out")?
.context("opening remote_control")?
.map_err(|e| anyhow::anyhow!("open remote control err: {:?}", e))?;
rcs::knock_rcs(&rcs_proxy)
.await
.map_err(|e| KnockError::NonCriticalError(anyhow::anyhow!("{e:?}")))
}
/// Same as `knock_target_with_timeout` but takes a `TargetCollection` and an
/// optional target name and finds the target to knock. Uses the configured
/// default target if `target_name` is `None`.
pub async fn knock_target_by_name(
target_name: &Option<String>,
target_collection_proxy: &TargetCollectionProxy,
open_timeout: Duration,
rcs_timeout: Duration,
) -> Result<(), KnockError> {
let (target_proxy, target_remote) =
create_proxy::<TargetMarker>().map_err(|e| KnockError::NonCriticalError(e.into()))?;
timeout::timeout(
open_timeout,
target_collection_proxy.open_target(
&TargetQuery { string_matcher: target_name.clone(), ..Default::default() },
target_remote,
),
)
.await
.map_err(|_e| {
KnockError::NonCriticalError(errors::ffx_error!("Timeout opening target.").into())
})?
.map_err(|e| {
KnockError::CriticalError(
errors::ffx_error!("Lost connection to the Daemon. Full context:\n{}", e).into(),
)
})?
.map_err(|e| {
KnockError::CriticalError(errors::ffx_error!("Error opening target: {:?}", e).into())
})?;
knock_target_with_timeout(&target_proxy, rcs_timeout).await
}
struct OvernetClient {
node: Arc<overnet_core::Router>,
}
impl OvernetClient {
async fn locate_remote_control_node(&self) -> Result<overnet_core::NodeId> {
let lpc = self.node.new_list_peers_context().await;
let node_id;
'found: loop {
let new_peers = lpc.list_peers().await?;
for peer in &new_peers {
let peer_has_remote_control =
peer.services.contains(&RemoteControlMarker::PROTOCOL_NAME.to_string());
if peer_has_remote_control {
node_id = peer.node_id;
break 'found;
}
}
}
Ok(node_id)
}
/// This is the remote control proxy that should be used for everything.
///
/// If this is dropped, it will close the FidlPipe connection.
pub(crate) async fn connect_remote_control(&self) -> Result<RemoteControlProxy> {
let (server, client) = fidl::Channel::create();
let node_id = self.locate_remote_control_node().await?;
let _ = self
.node
.connect_to_service(node_id, RemoteControlMarker::PROTOCOL_NAME, server)
.await?;
let proxy = RemoteControlProxy::new(fidl::AsyncChannel::from_channel(client));
Ok(proxy)
}
}
/// Represents a direct (no daemon) connection to a Fuchsia target.
pub struct DirectConnection {
overnet: OvernetClient,
fidl_pipe: FidlPipe,
rcs_proxy: Mutex<Option<RemoteControlProxy>>,
}
impl DirectConnection {
pub async fn new(target_spec: String, context: EnvironmentContext) -> Result<Self> {
let addr = resolve_target_address(Some(target_spec.clone()), &context)
.await?
.ok_or_else(|| anyhow::anyhow!("Unable to resolve address of target '{target_spec}'"))
.context("resolving target address")?;
let node = overnet_core::Router::new(None)?;
let fidl_pipe = FidlPipe::new(context.clone(), addr, node.clone())
.await
.context("starting fidl pipe")?;
Ok(Self { overnet: OvernetClient { node }, fidl_pipe, rcs_proxy: Default::default() })
}
pub async fn rcs_proxy(&self) -> Result<RemoteControlProxy> {
let mut rcs = self.rcs_proxy.lock().await;
if rcs.is_none() {
*rcs = Some(
self.overnet
.connect_remote_control()
.await
.map_err(|e| self.wrap_connection_errors(e).context("getting RCS proxy"))?,
);
}
Ok(rcs.as_ref().unwrap().clone())
}
pub async fn knock_rcs(&self) -> Result<Option<CompatibilityInfo>, KnockError> {
let mut rcs = self.rcs_proxy.lock().await;
// These are the two places where errors can propagate to the user. For other code using
// the FIDL pipe it might be trickier to ensure that errors from the pipe are caught.
// For things like FHO integration these errors will probably just be handled outside of
// the main subtool.
if rcs.is_none() {
*rcs = Some(self.overnet.connect_remote_control().await.map_err(|e| {
KnockError::CriticalError(
self.wrap_connection_errors(e).context("finding RCS proxy"),
)
})?);
}
let res = rcs::knock_rcs(rcs.as_ref().unwrap()).await.map_err(|e| anyhow::anyhow!("{e:?}"));
match res {
Ok(()) => Ok(self.fidl_pipe.compatibility_info()),
Err(e) => Err(KnockError::NonCriticalError(
self.wrap_connection_errors(e).context("getting RCS proxy"),
)),
}
}
/// Takes a given connection error and, if there have been underlying connection errors, adds
/// additional context to the passed error, else leaves the error the same.
///
/// This function is used to overcome some of the shortcomings around FIDL errors, as on the
/// host they are being used to simulate what is essentially a networked connection, and not an
/// OS-backed operation (like when using FIDL on a Fuchsia device).
pub fn wrap_connection_errors(&self, e: anyhow::Error) -> anyhow::Error {
if let Some(pipe_errors) = self.fidl_pipe.try_drain_errors() {
return anyhow::anyhow!("{e:?}\n{pipe_errors:?}");
}
e
}
}
/// Identical to the above "knock_target" but does not use the daemon.
///
/// Unlike other errors, this is not intended to be run in a tight loop.
pub async fn knock_target_daemonless(
target_spec: String,
context: &EnvironmentContext,
) -> Result<Option<CompatibilityInfo>, KnockError> {
let conn = DirectConnection::new(target_spec, context.clone()).await?;
conn.knock_rcs().await
}
/// Get the target specifier. This uses the normal config mechanism which
/// supports flexible config values: it can be a string naming the target, or
/// a list of strings, in which case the first valid entry is used. (The most
/// common use of this functionality would be to specify an array of environment
/// variables, e.g. ["$FUCHSIA_TARGET_ADDR", "FUCHSIA_NODENAME"]).
/// The result is a string which can be turned into a `TargetInfoQuery` to match
/// against the available targets (by name, address, etc). We don't return the query
/// itself, because some callers assume the specifier is the name of the target,
/// for the purposes of error messages, etc. E.g. The repo server only works if
/// an explicit _name_ is provided. In other contexts, it is valid for the specifier
/// to be a substring, a network address, etc.
pub async fn get_target_specifier(context: &EnvironmentContext) -> Result<Option<String>> {
let target_spec = context.get(TARGET_DEFAULT_KEY).await?;
match target_spec {
Some(ref target) => info!("Target specifier: ['{target:?}']"),
None => debug!("No target specified"),
}
Ok(target_spec)
}
pub async fn add_manual_target(
target_collection_proxy: &TargetCollectionProxy,
addr: IpAddr,
scope_id: u32,
port: u16,
wait: bool,
) -> Result<()> {
let ip = match addr {
IpAddr::V6(i) => net::IpAddress::Ipv6(net::Ipv6Address { addr: i.octets().into() }),
IpAddr::V4(i) => net::IpAddress::Ipv4(net::Ipv4Address { addr: i.octets().into() }),
};
let addr = if port > 0 {
ffx::TargetAddrInfo::IpPort(ffx::TargetIpPort { ip, port, scope_id })
} else {
ffx::TargetAddrInfo::Ip(ffx::TargetIp { ip, scope_id })
};
let (client, mut stream) =
fidl::endpoints::create_request_stream::<ffx::AddTargetResponder_Marker>()
.context("create endpoints")?;
target_collection_proxy
.add_target(
&addr,
&ffx::AddTargetConfig { verify_connection: Some(wait), ..Default::default() },
client,
)
.context("calling AddTarget")?;
let res = if let Ok(Some(req)) = stream.try_next().await {
match req {
ffx::AddTargetResponder_Request::Success { .. } => Ok(()),
ffx::AddTargetResponder_Request::Error { err, .. } => Err(err),
}
} else {
ffx_bail!("ffx lost connection to the daemon before receiving a response.");
};
res.map_err(|e| {
let err = e.connection_error.unwrap();
let logs = e.connection_error_logs.map(|v| v.join("\n"));
let target = Some(format!("{addr:?}"));
FfxError::TargetConnectionError { err, target, logs }.into()
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::overnet_connector::{OvernetConnection, OvernetConnector};
use async_channel::Receiver;
use ffx_config::{macro_deps::serde_json::Value, test_init, ConfigLevel};
use fidl_fuchsia_developer_remotecontrol as rcs;
use fuchsia_async::Task;
fn create_overnet_circuit(router: Arc<overnet_core::Router>) -> fidl::AsyncSocket {
let (local_socket, remote_socket) = fidl::Socket::create_stream();
let local_socket = fidl::AsyncSocket::from_socket(local_socket);
let socket = fidl::AsyncSocket::from_socket(remote_socket);
let (mut rx, mut tx) = futures::AsyncReadExt::split(socket);
Task::spawn(async move {
let (errors_sender, errors) = futures::channel::mpsc::unbounded();
if let Err(e) = futures::future::join(
circuit::multi_stream::multi_stream_node_connection_to_async(
router.circuit_node(),
&mut rx,
&mut tx,
true,
circuit::Quality::NETWORK,
errors_sender,
"client".to_owned(),
),
errors
.map(|e| {
eprintln!("A client circuit stream failed: {e:?}");
})
.collect::<()>(),
)
.map(|(result, ())| result)
.await
{
if let circuit::Error::ConnectionClosed(msg) = e {
eprintln!("testing overnet link closed: {:?}", msg);
} else {
eprintln!("error handling Overnet link: {:?}", e);
}
}
})
.detach();
local_socket
}
#[fuchsia::test]
async fn test_get_empty_default_target() {
let env = test_init().await.unwrap();
let target_spec = get_target_specifier(&env.context).await.unwrap();
assert_eq!(target_spec, None);
}
#[fuchsia::test]
async fn test_set_default_target() {
let env = test_init().await.unwrap();
env.context
.query(TARGET_DEFAULT_KEY)
.level(Some(ConfigLevel::User))
.set(Value::String("some_target".to_owned()))
.await
.unwrap();
let target_spec = get_target_specifier(&env.context).await.unwrap();
assert_eq!(target_spec, Some("some_target".to_owned()));
}
#[fuchsia::test]
async fn test_default_first_target_in_array() {
let env = test_init().await.unwrap();
let ts: Vec<Value> = ["t1", "t2"].iter().map(|s| Value::String(s.to_string())).collect();
env.context
.query(TARGET_DEFAULT_KEY)
.level(Some(ConfigLevel::User))
.set(Value::Array(ts))
.await
.unwrap();
let target_spec = get_target_specifier(&env.context).await.unwrap();
assert_eq!(target_spec, Some("t1".to_owned()));
}
#[fuchsia::test]
async fn test_default_missing_env_ignored() {
let env = test_init().await.unwrap();
let ts: Vec<Value> =
["$THIS_BETTER_NOT_EXIST", "t2"].iter().map(|s| Value::String(s.to_string())).collect();
env.context
.query(TARGET_DEFAULT_KEY)
.level(Some(ConfigLevel::User))
.set(Value::Array(ts))
.await
.unwrap();
let target_spec = get_target_specifier(&env.context).await.unwrap();
assert_eq!(target_spec, Some("t2".to_owned()));
}
#[fuchsia::test]
async fn test_default_env_present() {
std::env::set_var("MY_LITTLE_TMPKEY", "t1");
let env = test_init().await.unwrap();
let ts: Vec<Value> =
["$MY_LITTLE_TMPKEY", "t2"].iter().map(|s| Value::String(s.to_string())).collect();
env.context
.query(TARGET_DEFAULT_KEY)
.level(Some(ConfigLevel::User))
.set(Value::Array(ts))
.await
.unwrap();
let target_spec = get_target_specifier(&env.context).await.unwrap();
assert_eq!(target_spec, Some("t1".to_owned()));
std::env::remove_var("MY_LITTLE_TMPKEY");
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum FakeOvernetBehavior {
CloseRcsImmediately,
KeepRcsOpen,
}
#[derive(Debug)]
struct FakeOvernet {
circuit_node: Arc<overnet_core::Router>,
error_receiver: Receiver<anyhow::Error>,
behavior: FakeOvernetBehavior,
}
impl FakeOvernet {
async fn handle_transaction(
req: rcs::RemoteControlRequest,
behavior: &FakeOvernetBehavior,
) {
match req {
rcs::RemoteControlRequest::OpenCapability { server_channel, responder, .. } => {
match behavior {
FakeOvernetBehavior::KeepRcsOpen => {
// We're just going to assume this capability is always going to be
// RCS, and avoid string matching for the sake of avoiding changes
// to monikers and/or capability connecting.
let mut stream = rcs::RemoteControlRequestStream::from_channel(
fidl::AsyncChannel::from_channel(server_channel),
);
// This task is here to ensure the channel stays open, but won't
// necessarily need do anything.
Task::spawn(async move {
while let Ok(Some(req)) = stream.try_next().await {
eprintln!("Got a request: {req:?}")
}
})
.detach();
}
FakeOvernetBehavior::CloseRcsImmediately => {
drop(server_channel);
}
}
responder.send(Ok(())).unwrap();
}
rcs::RemoteControlRequest::EchoString { value, responder } => {
responder.send(&value).unwrap()
}
_ => panic!("Received an unexpected request: {req:?}"),
}
}
}
impl OvernetConnector for FakeOvernet {
async fn connect(&mut self) -> Result<OvernetConnection> {
let circuit_socket = create_overnet_circuit(self.circuit_node.clone());
let (rcs_sender, rcs_receiver) = async_channel::unbounded();
self.circuit_node
.register_service(
rcs::RemoteControlMarker::PROTOCOL_NAME.to_owned(),
move |channel| {
let _ = rcs_sender.try_send(channel).unwrap();
Ok(())
},
)
.await
.unwrap();
let behavior = self.behavior.clone();
let rcs_task = Task::local(async move {
while let Ok(channel) = rcs_receiver.recv().await {
let mut stream = rcs::RemoteControlRequestStream::from_channel(
fidl::AsyncChannel::from_channel(channel),
);
while let Ok(Some(req)) = stream.try_next().await {
Self::handle_transaction(req, &behavior).await;
}
}
});
let (circuit_reader, circuit_writer) = tokio::io::split(circuit_socket);
Ok(OvernetConnection {
output: Box::new(tokio::io::BufReader::new(circuit_reader)),
input: Box::new(circuit_writer),
errors: self.error_receiver.clone(),
compat: None,
main_task: Some(rcs_task),
})
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_overnet_rcs_knock() {
let node = overnet_core::Router::new(None).unwrap();
let overnet_socket = create_overnet_socket(node.clone()).unwrap();
let (reader, writer) = tokio::io::split(overnet_socket);
let circuit_node = overnet_core::Router::new(None).unwrap();
let (_sender, error_receiver) = async_channel::unbounded();
let circuit = FakeOvernet {
circuit_node: circuit_node.clone(),
error_receiver,
behavior: FakeOvernetBehavior::KeepRcsOpen,
};
let fidl_pipe =
FidlPipe::start_internal("127.0.0.1:22".parse().unwrap(), reader, writer, circuit)
.await
.unwrap();
let conn = DirectConnection {
overnet: OvernetClient { node },
fidl_pipe,
rcs_proxy: Default::default(),
};
assert!(conn.knock_rcs().await.is_ok());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_overnet_rcs_knock_failure_disconnect() {
let node = overnet_core::Router::new(None).unwrap();
let overnet_socket = create_overnet_socket(node.clone()).unwrap();
let (reader, writer) = tokio::io::split(overnet_socket);
let circuit_node = overnet_core::Router::new(None).unwrap();
let (error_sender, error_receiver) = async_channel::unbounded();
let circuit = FakeOvernet {
circuit_node: circuit_node.clone(),
error_receiver,
behavior: FakeOvernetBehavior::CloseRcsImmediately,
};
let fidl_pipe =
FidlPipe::start_internal("[::1]:22".parse().unwrap(), reader, writer, circuit)
.await
.unwrap();
let conn = DirectConnection {
overnet: OvernetClient { node },
fidl_pipe,
rcs_proxy: Default::default(),
};
error_sender.send(anyhow::anyhow!("kaboom")).await.unwrap();
let err = conn.knock_rcs().await;
assert!(err.is_err());
let err_string = err.unwrap_err().to_string();
assert!(err_string.contains("kaboom"));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_overnet_rcs_echo_multiple_times() {
let node = overnet_core::Router::new(None).unwrap();
let overnet_socket = create_overnet_socket(node.clone()).unwrap();
let (reader, writer) = tokio::io::split(overnet_socket);
let circuit_node = overnet_core::Router::new(None).unwrap();
let (_sender, error_receiver) = async_channel::unbounded();
let circuit = FakeOvernet {
circuit_node: circuit_node.clone(),
error_receiver,
behavior: FakeOvernetBehavior::KeepRcsOpen,
};
let fidl_pipe =
FidlPipe::start_internal("127.0.0.1:22".parse().unwrap(), reader, writer, circuit)
.await
.unwrap();
let conn = DirectConnection {
overnet: OvernetClient { node },
fidl_pipe,
rcs_proxy: Default::default(),
};
let rcs = conn.rcs_proxy().await.unwrap();
assert_eq!(rcs.echo_string("foobart").await.unwrap(), "foobart".to_owned());
let rcs2 = conn.rcs_proxy().await.unwrap();
assert_eq!(rcs2.echo_string("foobarr").await.unwrap(), "foobarr".to_owned());
drop(rcs);
drop(rcs2);
let rcs3 = conn.rcs_proxy().await.unwrap();
assert_eq!(rcs3.echo_string("foobarz").await.unwrap(), "foobarz".to_owned());
}
}