// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{anyhow, Context, Error},
chrono::{DateTime, Utc},
IdentifyHostError, InterfaceAddress, IpAddress, RemoteControlMarker, RemoteControlProxy,
futures::lock::{Mutex, MutexGuard},
std::collections::{HashMap, HashSet},
std::fmt::{Debug, Display},
std::net::{IpAddr, SocketAddr},
#[derive(Debug, Clone)]
pub struct RCSConnection {
pub proxy: RemoteControlProxy,
pub overnet_id: NodeId,
pub enum RCSConnectionError {
/// There is something wrong with the FIDL connection.
/// There is an error from within RCS itself.
/// There is an error with the output from RCS.
impl Display for RCSConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RCSConnectionError::FidlConnectionError(ferr) => {
write!(f, "fidl connection error: {}", ferr)
RCSConnectionError::RemoteControlError(ierr) => write!(f, "internal error: {:?}", ierr),
RCSConnectionError::TargetError(error) => write!(f, "general error: {}", error),
impl RCSConnection {
pub async fn new(id: &mut NodeId) -> Result<Self, Error> {
let (s, p) = fidl::Channel::create().context("failed to create zx channel")?;
let _result = RCSConnection::connect_to_service(id, s)?;
let proxy = RemoteControlProxy::new(
fidl::AsyncChannel::from_channel(p).context("failed to make async channel")?,
Ok(Self { proxy, overnet_id: id.clone() })
pub fn copy_to_channel(&mut self, channel: fidl::Channel) -> Result<(), Error> {
RCSConnection::connect_to_service(&mut self.overnet_id, channel)
fn connect_to_service(overnet_id: &mut NodeId, channel: fidl::Channel) -> Result<(), Error> {
let svc = hoist::connect_as_service_consumer()?;
svc.connect_to_service(overnet_id, RemoteControlMarker::NAME, channel)
.map_err(|e| anyhow!("Error connecting to RCS: {}", e))
// For testing.
pub fn new_with_proxy(proxy: RemoteControlProxy, id: &NodeId) -> Self {
Self { proxy, overnet_id: id.clone() }
pub struct TargetState {
pub rcs: Option<RCSConnection>,
pub overnet_started: bool,
// Note that Child is not Send, so it needs its own Mutex. We expose
// the mutex here so that operations on the Option can be atomic (e.g. 'replace if None').
pub host_pipe: Mutex<Option<Child>>,
impl TargetState {
pub fn new() -> Self {
Self { rcs: None, overnet_started: false, host_pipe: Mutex::new(None) }
pub struct Target {
// Nodename of the target (immutable).
pub nodename: String,
pub last_response: Mutex<DateTime<Utc>>,
pub state: Mutex<TargetState>,
pub addrs: Mutex<HashSet<TargetAddr>>,
impl Target {
pub fn new(nodename: &str, t: DateTime<Utc>) -> Self {
Self {
nodename: nodename.to_string(),
last_response: Mutex::new(t),
state: Mutex::new(TargetState::new()),
addrs: Mutex::new(HashSet::new()),
// TODO(fxb/50708) remove this once we've resolved the possible deadlocks that result
// without it.
pub async fn clone_addrs(&self) -> HashSet<TargetAddr> {
let addrs = self.addrs.lock().await;
return addrs.clone();
pub async fn to_string_async(&self) -> String {
// Need to hold onto the state for the duration of the format
// function to ensure that it doesn't change abruptly.
let state = self.state.lock().await;
"{} [{}] [overnet_started: {}] [overnet_peer_id: {}] {}",
.map(|addr| addr.to_string())
.join(", "),
match state.rcs.as_ref() {
Some(s) =>,
None => String::from("not connected"),
pub async fn from_rcs_connection(r: RCSConnection) -> Result<Self, RCSConnectionError> {
let fidl_target = match r.proxy.identify_host().await {
Ok(res) => match res {
Ok(target) => target,
Err(e) => return Err(RCSConnectionError::RemoteControlError(e)),
Err(e) => return Err(RCSConnectionError::FidlConnectionError(e)),
let nodename = fidl_target
.ok_or(RCSConnectionError::TargetError(anyhow!("nodename required")))?;
// TODO(awdavies): Merge target addresses once the scope_id is picked
// up properly, else there will be duplicate link-local addresses that
// aren't usable.
let target = Target::new(nodename.as_ref(), Utc::now());
// Forces drop of target state mutex so that target can be returned.
let mut target_state = target.state.lock().await;
// If we're here, then overnet must have been started.
target_state.overnet_started = true;
target_state.rcs = Some(r);
pub async fn wait_for_state_with_rcs(
retries: u32,
retry_delay: Duration,
) -> Result<MutexGuard<'_, TargetState>, Error> {
// TODO(awdavies): It would make more sense to have something
// like std::sync::CondVar here, but there is no implementation
// in futures or async_std yet.
for _ in 0..retries {
let state_guard = self.state.lock().await;
match &state_guard.rcs {
Some(_) => return Ok(state_guard),
None => {
Err(anyhow!("Waiting for RCS timed out"))
impl Debug for Target {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Target {{ {:?} }}", self.nodename)
#[derive(Hash, Clone, Debug, Copy, Eq, PartialEq)]
pub struct TargetAddr {
ip: IpAddr,
scope_id: u32,
impl From<InterfaceAddress> for TargetAddr {
fn from(i: InterfaceAddress) -> Self {
// TODO(awdavies): Figure out if it's possible to get the scope_id from
// this address.
match i.ip_address {
IpAddress::Ipv4(ip4) => SocketAddr::from((ip4.addr, 0)).into(),
IpAddress::Ipv6(ip6) => SocketAddr::from((ip6.addr, 0)).into(),
impl From<(IpAddr, u32)> for TargetAddr {
fn from(f: (IpAddr, u32)) -> Self {
Self { ip: f.0, scope_id: f.1 }
impl From<SocketAddr> for TargetAddr {
fn from(s: SocketAddr) -> Self {
Self {
ip: s.ip(),
scope_id: match s {
SocketAddr::V6(addr) => addr.scope_id(),
_ => 0,
impl TargetAddr {
pub fn scope_id(&self) -> u32 {
pub fn ip(&self) -> IpAddr {
impl Display for TargetAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.ip())?;
match self.ip {
IpAddr::V6(addr) => {
if addr.is_ll() && self.scope_id() > 0 {
write!(f, "%{}", self.scope_id())?;
_ => (),
pub enum TargetQuery {
impl TargetQuery {
pub async fn matches(&self, t: &Arc<Target>) -> bool {
match self {
Self::Nodename(nodename) => *nodename == t.nodename,
Self::Addr(addr) => {
let addrs = t.addrs.lock().await;
// Try to do the lookup if either the scope_id is non-zero (and
// the IP address is IPv6 OR if the address is just IPv4 (as the
// scope_id is always zero in this case).
if addr.scope_id != 0 || addr.ip.is_ipv4() {
return addrs.contains(addr);
// Currently there's no way to parse an IP string w/ a scope_id,
// so if we're at this stage the address is IPv6 and has been
// probably parsed with a string (or it's a global IPv6 addr).
for target_addr in addrs.iter() {
if target_addr.ip == addr.ip {
return true;
Self::OvernetId(id) => match &t.state.lock().await.rcs {
Some(rcs) => == *id,
None => false,
impl From<&str> for TargetQuery {
fn from(s: &str) -> Self {
impl From<String> for TargetQuery {
fn from(s: String) -> Self {
match s.parse::<IpAddr>() {
Ok(a) => Self::Addr((a, 0).into()),
Err(_) => Self::Nodename(s),
impl From<TargetAddr> for TargetQuery {
fn from(t: TargetAddr) -> Self {
impl From<u64> for TargetQuery {
fn from(id: u64) -> Self {
pub struct TargetCollection {
inner: RwLock<HashMap<String, Arc<Target>>>,
impl TargetCollection {
pub fn new() -> Self {
Self { inner: RwLock::new(HashMap::new()) }
pub async fn inner_lock(
) -> async_std::sync::RwLockWriteGuard<'_, HashMap<String, Arc<Target>>> {
pub async fn targets(&self) -> Vec<Arc<Target>> {|t| t.clone()).collect()
pub async fn merge_insert(&self, t: Target) -> Arc<Target> {
let mut inner = self.inner.write().await;
// TODO(awdavies): better merging (using more indices for matching).
match inner.get(&t.nodename) {
Some(to_update) => {
let mut addrs = to_update.addrs.lock().await;
let mut last_response = to_update.last_response.lock().await;
// Ignore out-of-order packets.
if *last_response < *t.last_response.lock().await {
*last_response = *t.last_response.lock().await;
// TODO(awdavies): Create a merge function just for state.
let mut state = to_update.state.lock().await;
if state.rcs.is_none() {
state.rcs = t.state.lock().await.rcs.clone();
None => {
let t = Arc::new(t);
inner.insert(t.nodename.clone(), t.clone());
pub async fn get(&self, t: TargetQuery) -> Option<Arc<Target>> {
for target in {
if t.matches(target).await {
return Some(target.clone());
pub trait TryIntoTarget: Sized {
type Error;
/// Attempts, given a source socket address, to determine whether the
/// received message was from a Fuchsia target, and if so, what kind. Attempts
/// to fill in as much information as possible given the message, consuming
/// the underlying object in the process.
fn try_into_target(self, src: SocketAddr) -> Result<Target, Self::Error>;
mod test {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr};
use chrono::offset::TimeZone;
use fidl;
use fidl_fuchsia_developer_remotecontrol as rcs;
use futures::executor::block_on;
use futures::prelude::*;
impl PartialEq for TargetState {
/// This is a very loose eq function for now, might need to be updated
/// later, but this shouldn't be used outside of tests. Compares that
/// another option is not None.
fn eq(&self, other: &Self) -> bool {
match self.rcs {
Some(_) => match other.rcs {
Some(_) => true,
_ => false,
None => match other.rcs {
None => true,
_ => false,
impl Clone for Target {
fn clone(&self) -> Self {
Self {
nodename: self.nodename.clone(),
last_response: Mutex::new(block_on(self.last_response.lock()).clone()),
state: Mutex::new(block_on(self.state.lock()).clone()),
addrs: Mutex::new(block_on(self.addrs.lock()).clone()),
impl Clone for TargetState {
fn clone(&self) -> Self {
Self {
rcs: self.rcs.clone(),
overnet_started: self.overnet_started,
// host_pipe is not used in tests.
host_pipe: Mutex::new(None),
fn fake_now() -> DateTime<Utc> {
Utc.ymd(2014, 10, 31).and_hms(9, 10, 12)
fn fake_elapsed() -> DateTime<Utc> {
Utc.ymd(2014, 11, 2).and_hms(13, 2, 1)
impl PartialEq for Target {
fn eq(&self, o: &Target) -> bool {
self.nodename == o.nodename
&& *block_on(self.last_response.lock()) == *block_on(o.last_response.lock())
&& *block_on(self.addrs.lock()) == *block_on(o.addrs.lock())
&& *block_on(self.state.lock()) == *block_on(o.state.lock())
fn test_target_collection_insert_new() {
hoist::run(async move {
let tc = TargetCollection::new();
let nodename = String::from("what");
let t = Target::new(&nodename, fake_now());
assert_eq!(&*tc.get(nodename.clone().into()).await.unwrap(), &t.clone());
match tc.get("oihaoih".into()).await {
Some(_) => panic!("string lookup should return Nobne"),
_ => (),
fn test_target_collection_merge() {
hoist::run(async move {
let tc = TargetCollection::new();
let nodename = String::from("bananas");
let t1 = Target::new(&nodename, fake_now());
let t2 = Target::new(&nodename, fake_elapsed());
let a1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
let a2 = IpAddr::V6(Ipv6Addr::new(
0xfe80, 0xcafe, 0xf00d, 0xf000, 0xb412, 0xb455, 0x1337, 0xfeed,
t1.addrs.lock().await.insert((a1.clone(), 1).into());
t2.addrs.lock().await.insert((a2.clone(), 1).into());
let merged_target = tc.get(nodename.clone().into()).await.unwrap();
assert_ne!(&*merged_target, &t1);
assert_ne!(&*merged_target, &t2);
assert_eq!(merged_target.addrs.lock().await.len(), 2);
assert_eq!(*merged_target.last_response.lock().await, fake_elapsed());
assert!(merged_target.addrs.lock().await.contains(&(a1, 1).into()));
assert!(merged_target.addrs.lock().await.contains(&(a2, 1).into()));
fn setup_fake_remote_control_service(
send_internal_error: bool,
nodename_response: String,
) -> RemoteControlProxy {
let (proxy, mut stream) =
hoist::spawn(async move {
while let Ok(req) = stream.try_next().await {
match req {
Some(rcs::RemoteControlRequest::IdentifyHost { responder }) => {
if send_internal_error {
let _ = responder
.send(&mut Err(rcs::IdentifyHostError::ListInterfacesFailed))
.context("sending testing error response")
} else {
let result: Vec<rcs::InterfaceAddress> = vec![rcs::InterfaceAddress {
ip_address: rcs::IpAddress::Ipv4(rcs::Ipv4Address {
addr: [192, 168, 0, 1],
prefix_len: 24,
let nodename = if nodename_response.len() == 0 {
} else {
.send(&mut Ok(rcs::IdentifyHostResponse {
addresses: Some(result),
.context("sending testing response")
_ => assert!(false),
fn test_target_from_rcs_connection_internal_err() {
// TODO(awdavies): Do some form of PartialEq implementation for
// the RCSConnectionError enum to avoid the nested matches.
hoist::run(async move {
let conn = RCSConnection::new_with_proxy(
setup_fake_remote_control_service(true, "foo".to_owned()),
&NodeId { id: 123 },
match Target::from_rcs_connection(conn).await {
Ok(_) => assert!(false),
Err(e) => match e {
RCSConnectionError::RemoteControlError(rce) => match rce {
rcs::IdentifyHostError::ListInterfacesFailed => (),
_ => assert!(false),
_ => assert!(false),
fn test_target_from_rcs_connection_nodename_none() {
hoist::run(async move {
let conn = RCSConnection::new_with_proxy(
setup_fake_remote_control_service(false, "".to_owned()),
&NodeId { id: 123456 },
match Target::from_rcs_connection(conn).await {
Ok(_) => assert!(false),
Err(e) => match e {
RCSConnectionError::TargetError(_) => (),
_ => assert!(false),
fn test_target_from_rcs_connection_no_err() {
hoist::run(async move {
let conn = RCSConnection::new_with_proxy(
setup_fake_remote_control_service(false, "foo".to_owned()),
&NodeId { id: 1234 },
match Target::from_rcs_connection(conn).await {
Ok(t) => {
assert_eq!(t.nodename, "foo");
assert_eq!(t.state.lock().await.rcs.as_ref().unwrap(), 1234u64);
// For now there shouldn't be any addresses put in here, as
// there's not a consistent way to convert them yet.
assert_eq!(t.addrs.lock().await.len(), 0);
Err(_) => assert!(false),
fn test_target_query_matches_nodename() {
hoist::run(async move {
let query = TargetQuery::from("foo");
let target = Arc::new(Target::new("foo", Utc::now()));
fn test_target_by_overnet_id() {
hoist::run(async move {
const ID: u64 = 12345;
let conn = RCSConnection::new_with_proxy(
setup_fake_remote_control_service(false, "foo".to_owned()),
&NodeId { id: ID },
let t = Target::from_rcs_connection(conn).await.unwrap();
let tc = TargetCollection::new();
assert_eq!(*tc.get(ID.into()).await.unwrap(), t);
fn test_target_by_addr() {
hoist::run(async move {
let addr: TargetAddr = (IpAddr::from([192, 168, 0, 1]), 0).into();
let t = Target::new("foo", Utc::now());
let tc = TargetCollection::new();
assert_eq!(*tc.get(addr.into()).await.unwrap(), t);
assert_eq!(*tc.get("".into()).await.unwrap(), t);
let addr: TargetAddr =
(IpAddr::from([0xfe80, 0x0, 0x0, 0x0, 0xdead, 0xbeef, 0xbeef, 0xbeef]), 3).into();
let t = Target::new("fooberdoober", Utc::now());
assert_eq!(*tc.get("fe80::dead:beef:beef:beef".into()).await.unwrap(), t);
assert_eq!(*tc.get(addr.clone().into()).await.unwrap(), t);
assert_eq!(*tc.get("fooberdoober".into()).await.unwrap(), t);
fn test_target_debug_string() {
hoist::run(async move {
let t = Target::new("foo", fake_now());
assert_eq!(t.to_string_async().await, "foo [] [overnet_started: false] [overnet_peer_id: not connected] Fri, 31 Oct 2014 09:10:12 +0000");
t.addrs.lock().await.insert((IpAddr::from([192, 168, 1, 1]), 0).into());
t.state.lock().await.rcs = Some(RCSConnection::new_with_proxy(
setup_fake_remote_control_service(false, "foo".to_owned()),
&NodeId { id: 2u64 },
assert_eq!(t.to_string_async().await, "foo [] [overnet_started: false] [overnet_peer_id: 2] Fri, 31 Oct 2014 09:10:12 +0000");
fn test_wait_for_rcs() {
hoist::run(async move {
let t = Arc::new(Target::new("foo", Utc::now()));
assert!(t.wait_for_state_with_rcs(0, Duration::from_millis(1)).await.is_err());
assert!(t.wait_for_state_with_rcs(1, Duration::from_millis(1)).await.is_err());
assert!(t.wait_for_state_with_rcs(10, Duration::from_millis(1)).await.is_err());
let t_clone = t.clone();
hoist::spawn(async move {
let mut state = t_clone.state.lock().await;
let conn = RCSConnection::new_with_proxy(
setup_fake_remote_control_service(false, "foo".to_owned()),
&NodeId { id: 5u64 },
state.overnet_started = true;
state.rcs = Some(conn);
// Adds a few hundred thousands as this is a race test.
assert!(t.wait_for_state_with_rcs(500000, Duration::from_millis(1)).await.is_ok());