blob: f979afbfd688b11e59bb3982c8c78d4dbf0db237 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::constants::{get_socket, CURRENT_EXE_HASH, MDNS_BROADCAST_INTERVAL_SECS},
crate::discovery::{TargetFinder, TargetFinderConfig},
crate::events::{DaemonEvent, TargetInfo, WireTrafficType},
crate::fastboot::{client::Fastboot, spawn_fastboot_discovery},
crate::logger::streamer::GenericDiagnosticsStreamer,
crate::mdns::MdnsTargetFinder,
crate::onet::create_ascendd,
crate::target::{
ConnectionState, RcsConnection, SshAddrFetcher, Target, TargetCollection, TargetEvent,
ToFidlTarget,
},
anyhow::{anyhow, Context, Result},
ascendd::Ascendd,
async_trait::async_trait,
chrono::Utc,
ffx_core::{build_info, TryStreamUtilExt},
ffx_daemon_core::events::{self, EventHandler},
fidl::endpoints::ServiceMarker,
fidl_fuchsia_developer_bridge::{
DaemonError, DaemonRequest, DaemonRequestStream, DiagnosticsStreamError, FastbootError,
TargetAddrInfo,
},
fidl_fuchsia_developer_remotecontrol::{
ArchiveIteratorEntry, ArchiveIteratorError, ArchiveIteratorRequest, RemoteControlMarker,
},
fidl_fuchsia_overnet::{MeshControllerProxy, ServiceConsumerProxy, ServicePublisherProxy},
fidl_fuchsia_overnet_protocol::NodeId,
fuchsia_async::{Task, Timer},
futures::prelude::*,
hoist::{hoist, OvernetInstance},
std::collections::BTreeSet,
std::convert::TryInto,
std::iter::FromIterator,
std::sync::{Arc, Weak},
std::time::Duration,
};
// Daemon
#[derive(Clone)]
pub struct Daemon {
pub event_queue: events::Queue<DaemonEvent>,
target_collection: Arc<TargetCollection>,
ascendd: Arc<Ascendd>,
}
impl OvernetInstance for Daemon {
fn connect_as_service_consumer(&self) -> Result<ServiceConsumerProxy, anyhow::Error> {
self.ascendd.connect_as_service_consumer()
}
fn connect_as_service_publisher(&self) -> Result<ServicePublisherProxy, anyhow::Error> {
self.ascendd.connect_as_service_publisher()
}
fn connect_as_mesh_controller(&self) -> Result<MeshControllerProxy, anyhow::Error> {
self.ascendd.connect_as_mesh_controller()
}
}
// This is just for mocking config values for unit testing.
#[async_trait]
trait ConfigReader: Send + Sync {
async fn get(&self, q: &str) -> Result<ffx_config::Value, ffx_config::api::ConfigError>;
}
#[derive(Default)]
struct DefaultConfigReader {}
#[async_trait]
impl ConfigReader for DefaultConfigReader {
async fn get(&self, q: &str) -> Result<ffx_config::Value, ffx_config::api::ConfigError> {
ffx_config::get(q).await
}
}
pub struct DaemonEventHandler {
// This must be a weak ref or else it will cause a circular reference.
// Generally this should be the common practice for any handler pointing
// to shared state, as it could be the handler's parent state that is
// holding the event queue itself (as is the case with the target collection
// here).
target_collection: Weak<TargetCollection>,
config_reader: Arc<dyn ConfigReader>,
ascendd: Arc<Ascendd>,
}
impl DaemonEventHandler {
fn new(ascendd: Arc<Ascendd>, target_collection: Weak<TargetCollection>) -> Self {
Self { ascendd, target_collection, config_reader: Arc::new(DefaultConfigReader::default()) }
}
fn handle_mdns<'a>(
&self,
t: TargetInfo,
tc: &'a TargetCollection,
cr: Weak<dyn ConfigReader>,
) -> impl Future<Output = ()> + 'a {
log::trace!("Found new target via mdns: {}", t.nodename);
tc.merge_insert(Target::from_target_info(self.ascendd.clone(), t)).then(|target| {
async move {
let nodename = target.nodename().await;
let t_clone = target.clone();
let autoconnect_fut = async move {
if let Some(cr) = cr.upgrade() {
let n = cr.get("target.default").await.ok();
if let Some(n) = n {
if n.as_str()
.and_then(|n| nodename.as_ref().map(|x| x == n))
.unwrap_or(false)
{
log::trace!(
"Doing autoconnect for default target: {}",
nodename.as_ref().map(|x| x.as_str()).unwrap_or("<unknown>")
);
t_clone.run_host_pipe().await;
}
}
}
};
let _: ((), ()) = futures::join!(target.run_mdns_monitor(), autoconnect_fut);
// Updates state last so that if tasks are waiting on this state, everything is
// already running and there aren't any races.
target
.update_connection_state(|s| match s {
ConnectionState::Disconnected | ConnectionState::Mdns(_) => {
ConnectionState::Mdns(Utc::now())
}
_ => s,
})
.await;
}
})
}
async fn handle_overnet_peer(&self, node_id: u64, tc: &TargetCollection) {
// It's possible that the target will be dropped in the middle of this operation.
// Do not exit an error, just log and exit this round.
let res = {
match RcsConnection::new(self.ascendd.clone(), &mut NodeId { id: node_id })
.await
.context("unable to convert proxy to target")
{
Ok(r) => Target::from_rcs_connection(r)
.await
.map_err(|e| anyhow!("unable to convert proxy to target: {}", e)),
Err(e) => Err(e),
}
};
let target = match res {
Ok(t) => t,
Err(e) => {
log::error!("{:#?}", e);
return;
}
};
log::trace!("Found new target via overnet: {}", target.nodename_str().await);
tc.merge_insert(target).then(|target| async move { target.run_logger().await }).await;
}
async fn handle_fastboot(&self, t: TargetInfo, tc: &TargetCollection) {
log::trace!("Found new target via fastboot: {}", t.nodename);
tc.merge_insert(Target::from_target_info(self.ascendd.clone(), t.into()))
.then(|target| async move {
target
.update_connection_state(|s| match s {
ConnectionState::Disconnected | ConnectionState::Fastboot(_) => {
ConnectionState::Fastboot(Utc::now())
}
_ => s,
})
.await;
target.run_fastboot_monitor().await;
})
.await;
}
}
#[async_trait]
impl EventHandler<DaemonEvent> for DaemonEventHandler {
async fn on_event(&self, event: DaemonEvent) -> Result<bool> {
let tc = match self.target_collection.upgrade() {
Some(t) => t,
None => {
log::debug!("dropped target collection");
return Ok(true); // We're done, as the parent has been dropped.
}
};
match event {
DaemonEvent::WireTraffic(traffic) => match traffic {
WireTrafficType::Mdns(t) => {
self.handle_mdns(t, &tc, Arc::downgrade(&self.config_reader)).await;
}
WireTrafficType::Fastboot(t) => {
self.handle_fastboot(t, &tc).await;
}
},
DaemonEvent::OvernetPeer(node_id) => {
self.handle_overnet_peer(node_id, &tc).await;
}
_ => (),
}
// This handler is never done unless the target_collection is dropped,
// so always return false.
Ok(false)
}
}
impl Daemon {
pub async fn new() -> Result<Daemon> {
log::info!("Starting daemon overnet server");
let target_collection = Arc::new(TargetCollection::new());
let queue = events::Queue::new(&target_collection);
let ascendd = Arc::new(create_ascendd().await?);
let daemon_event_handler =
DaemonEventHandler::new(ascendd.clone(), Arc::downgrade(&target_collection));
queue.add_handler(daemon_event_handler).await;
target_collection.set_event_queue(queue.clone()).await;
Daemon::spawn_onet_discovery(queue.clone());
spawn_fastboot_discovery(queue.clone());
let config = TargetFinderConfig {
interface_discovery_interval: Duration::from_secs(1),
broadcast_interval: Duration::from_secs(MDNS_BROADCAST_INTERVAL_SECS),
mdns_ttl: 255,
};
let mut mdns = MdnsTargetFinder::new(&config)?;
mdns.start(queue.clone())?;
Ok(Daemon { target_collection: target_collection.clone(), event_queue: queue, ascendd })
}
pub async fn get_default_target(&self, n: Option<String>) -> Result<Target, DaemonError> {
let n_clone = n.clone();
// Infinite timeout here is fine, as the client dropping connection
// will lead to this being cleaned up eventually. It is the client's
// responsibility to determine their respective timeout(s).
self.event_queue
.wait_for(None, move |e| {
if let DaemonEvent::NewTarget(Some(n)) = e {
// Gets either a target with the correct name if matching,
// or returns true if there is ANY target at all.
n_clone.as_ref().map(|s| n.contains(s)).unwrap_or(true)
} else {
false
}
})
.await
.map_err(|e| {
log::warn!("{}", e);
DaemonError::TargetStateError
})?;
// TODO(awdavies): It's possible something might happen between the new
// target event and now, so it would make sense to give the
// user some information on what happened: likely something
// to do with the target suddenly being forced out of the cache
// (this isn't a problem yet, but will be once more advanced
// lifetime tracking is implemented). If a name isn't specified it's
// possible a secondary/tertiary target showed up, and those cases are
// handled here.
self.target_collection.get_default(n).await
}
#[cfg(test)]
pub async fn new_for_test() -> Daemon {
let target_collection = Arc::new(TargetCollection::new());
let event_queue = events::Queue::new(&target_collection);
target_collection.set_event_queue(event_queue.clone()).await;
Daemon {
target_collection,
event_queue,
ascendd: Arc::new(create_ascendd().await.unwrap()),
}
}
pub async fn handle_requests_from_stream(&self, stream: DaemonRequestStream) -> Result<()> {
stream
.map_err(|e| anyhow!("reading FIDL stream: {:#}", e))
.try_for_each_concurrent_while_connected(None, |r| self.handle_request(r))
.await
}
pub fn spawn_onet_discovery(queue: events::Queue<DaemonEvent>) {
fuchsia_async::Task::spawn(async move {
loop {
let svc = match hoist().connect_as_service_consumer() {
Ok(svc) => svc,
Err(err) => {
log::info!("Overnet setup failed: {}, will retry in 1s", err);
Timer::new(Duration::from_secs(1)).await;
continue;
}
};
loop {
let peers = match svc.list_peers().await {
Ok(peers) => peers,
Err(err) => {
log::info!("Overnet peer discovery failed: {}, will retry", err);
Timer::new(Duration::from_secs(1)).await;
// break out of the peer discovery loop on error in
// order to reconnect, in case the error causes the
// overnet interface to go bad.
break;
}
};
for peer in peers {
let peer_has_rcs = peer.description.services.map_or(false, |s| {
s.iter().find(|name| *name == RemoteControlMarker::NAME).is_some()
});
if peer_has_rcs {
queue.push(DaemonEvent::OvernetPeer(peer.id.id)).await.unwrap_or_else(
|err| {
log::warn!(
"Overnet discovery failed to enqueue event: {}",
err
);
},
);
}
}
Timer::new(Duration::from_millis(100)).await;
}
}
})
.detach();
}
pub async fn handle_request(&self, req: DaemonRequest) -> Result<()> {
log::debug!("daemon received request: {:?}", req);
match req {
DaemonRequest::Crash { .. } => panic!("instructed to crash by client!"),
DaemonRequest::EchoString { value, responder } => {
log::info!("Received echo request for string {:?}", value);
responder.send(value.as_ref()).context("error sending response")?;
log::info!("echo response sent successfully");
}
DaemonRequest::ListTargets { value, responder } => {
log::info!("Received list target request for '{:?}'", value);
responder
.send(
&mut future::join_all(match value.as_ref() {
"" => self
.target_collection
.targets()
.await
.drain(..)
.map(|t| {
async move {
if t.is_connected().await {
Some(t.to_fidl_target().await)
} else {
None
}
}
.boxed()
})
.collect(),
_ => match self.target_collection.get_connected(value).await {
Some(t) => {
vec![async move { Some(t.to_fidl_target().await) }.boxed()]
}
None => vec![],
},
})
.await
.drain(..)
.filter_map(|m| m)
.collect::<Vec<_>>()
.drain(..),
)
.context("error sending response")?;
}
DaemonRequest::GetRemoteControl { target, remote, responder } => {
let target = match self.get_default_target(target).await {
Ok(t) => t,
Err(e) => {
responder.send(&mut Err(e)).context("sending error response")?;
return Ok(());
}
};
if matches!(target.get_connection_state().await, ConnectionState::Fastboot(_)) {
let nodename = target.nodename().await.unwrap_or("<No Nodename>".to_string());
log::warn!("Attempting to connect to RCS on a fastboot target: {}", nodename);
responder
.send(&mut Err(DaemonError::TargetInFastboot))
.context("sending error response")?;
return Ok(());
}
// Ensure auto-connect has at least started.
target.run_host_pipe().await;
match target.events.wait_for(None, |e| e == TargetEvent::RcsActivated).await {
Ok(()) => (),
Err(e) => {
log::warn!("{}", e);
responder
.send(&mut Err(DaemonError::RcsConnectionError))
.context("sending error response")?;
return Ok(());
}
}
let mut rcs = match target.rcs().await {
Some(r) => r,
None => {
log::warn!("rcs dropped after event fired");
responder
.send(&mut Err(DaemonError::TargetStateError))
.context("sending error response")?;
return Ok(());
}
};
let mut response = rcs
.copy_to_channel(remote.into_channel())
.map_err(|_| DaemonError::RcsConnectionError);
responder.send(&mut response).context("error sending response")?;
}
DaemonRequest::GetFastboot { target, fastboot, responder } => {
let target = match self.get_default_target(target).await {
Ok(t) => t,
Err(e) => {
log::warn!("{:?}", e);
responder
.send(&mut Err(FastbootError::TargetError))
.context("sending error response")?;
return Ok(());
}
};
// TODO(fxb/69285): reboot device into fastboot
if !matches!(target.get_connection_state().await, ConnectionState::Fastboot(_)) {
let nodename = target.nodename().await.unwrap_or("<No Nodename>".to_string());
log::warn!("Attempting to run fastboot on a non-fastboot target: {}", nodename);
responder
.send(&mut Err(FastbootError::NonFastbootDevice))
.context("sending error response")?;
return Ok(());
}
let mut fastboot_manager = Fastboot::new(target);
let stream = fastboot.into_stream()?;
fuchsia_async::Task::spawn(async move {
match fastboot_manager.0.handle_fastboot_requests_from_stream(stream).await {
Ok(_) => log::debug!("Fastboot proxy finished - client disconnected"),
Err(e) => {
log::error!("There was an error handling fastboot requests: {:?}", e)
}
}
})
.detach();
responder.send(&mut Ok(())).context("error sending response")?;
}
DaemonRequest::Quit { responder } => {
log::info!("Received quit request.");
match std::fs::remove_file(get_socket().await) {
Ok(()) => {}
Err(e) => log::error!("failed to remove socket file: {}", e),
}
if cfg!(test) {
panic!("quit() should not be invoked in test code");
}
// It is desirable for the client to receive an ACK for the quit
// request. As Overnet has a potentially complicated routing
// path, it is tricky to implement some notion of a bounded
// "flush" for this response, however in practice it is only
// necessary here to wait long enough for the message to likely
// leave the local process before exiting. Enqueue a detached
// timer to shut down the daemon before sending the response.
// This is detached because once the client receives the
// response, the client will disconnect it's socket. If the
// local reactor observes this disconnection before the timer
// expires, an in-line timer wait would never fire, and the
// daemon would never exit.
Task::spawn(
Timer::new(std::time::Duration::from_millis(20)).map(|_| std::process::exit(0)),
)
.detach();
responder.send(true).context("error sending response")?;
}
DaemonRequest::GetSshAddress { responder, target, timeout } => {
let fut = async move {
let target = self.get_default_target(target).await?;
let poll_duration = std::time::Duration::from_millis(15);
loop {
let addrs = target.addrs().await;
if let Some(addr) = (&addrs).to_ssh_addr() {
let res: TargetAddrInfo = addr.into();
return Ok(res);
}
Timer::new(poll_duration).await;
}
};
return responder
.send(&mut match async_std::future::timeout(
Duration::from_nanos(timeout.try_into()?),
fut,
)
.await
{
Ok(mut r) => r,
Err(_) => Err(DaemonError::Timeout),
})
.context("sending client response");
}
DaemonRequest::GetVersionInfo { responder } => {
return responder.send(build_info()).context("sending GetVersionInfo response");
}
DaemonRequest::AddTarget { ip, responder } => {
self.target_collection
.merge_insert(Target::new_with_addrs(
self.ascendd.clone(),
None,
BTreeSet::from_iter(Some(ip.into()).into_iter()),
))
.then(|target| async move {
target
.update_connection_state(|s| match s {
ConnectionState::Disconnected => ConnectionState::Manual,
_ => {
log::warn!("New manual target in unexpected state {:?}", s);
s
}
})
.await;
target.run_host_pipe().await
})
.await;
responder.send(&mut Ok(())).context("error sending response")?;
}
DaemonRequest::RemoveTarget { target_id, responder } => {
let result = self.target_collection.remove_target(target_id.clone()).await;
responder.send(&mut Ok(result)).context("error sending response")?;
}
DaemonRequest::GetHash { responder } => {
let hash: String =
ffx_config::get((CURRENT_EXE_HASH, ffx_config::ConfigLevel::Runtime)).await?;
responder.send(&hash).context("error sending response")?;
}
DaemonRequest::StreamDiagnostics { target, parameters, iterator, responder } => {
let target = match self.get_default_target(target.clone()).await {
Ok(t) => t,
Err(e) => {
log::warn!(
"got error fetching target with filter '{}': {:?}",
target.as_ref().unwrap_or(&String::default()),
e
);
responder
.send(&mut Err(DiagnosticsStreamError::TargetMatchFailed))
.context("sending error response")?;
return Ok(());
}
};
let stream = target.stream_info();
if parameters.stream_mode.is_none() {
log::info!("StreamDiagnostics failed: stream mode is required");
return responder
.send(&mut Err(DiagnosticsStreamError::MissingParameter))
.context("sending missing parameter response");
}
let mut log_iterator =
stream.stream_entries(parameters.stream_mode.unwrap()).await?;
let task = Task::spawn(async move {
let mut iter_stream = iterator.into_stream()?;
while let Some(request) = iter_stream.next().await {
match request? {
ArchiveIteratorRequest::GetNext { responder } => {
let res = log_iterator.iter().await?;
match res {
Some(Ok(entry)) => {
// TODO(jwing): implement truncation or migrate to a socket-based
// API.
responder.send(&mut Ok(vec![ArchiveIteratorEntry {
data: Some(serde_json::to_string(&entry)?),
truncated_chars: Some(0),
..ArchiveIteratorEntry::EMPTY
}]))?;
}
Some(Err(e)) => {
log::warn!("got error streaming diagnostics: {}", e);
responder
.send(&mut Err(ArchiveIteratorError::DataReadFailed))?;
}
None => {
responder.send(&mut Ok(vec![]))?;
}
}
}
}
}
Ok::<(), anyhow::Error>(())
});
responder.send(&mut Ok(()))?;
task.await?;
}
}
Ok(())
}
}
////////////////////////////////////////////////////////////////////////////////
// tests
#[cfg(test)]
mod test {
use {
super::*,
anyhow::bail,
async_std::future::timeout,
fidl_fuchsia_developer_bridge as bridge,
fidl_fuchsia_developer_bridge::{DaemonMarker, FastbootMarker},
fidl_fuchsia_developer_remotecontrol as rcs,
fidl_fuchsia_developer_remotecontrol::{RemoteControlMarker, RemoteControlProxy},
fidl_fuchsia_net as fidl_net,
fidl_fuchsia_net::{IpAddress, Ipv4Address, Subnet},
fidl_fuchsia_overnet_protocol::NodeId,
fuchsia_async::Task,
std::net::{SocketAddr, SocketAddrV6},
};
struct TestHookFakeFastboot {
tc: Weak<TargetCollection>,
ascendd: Arc<Ascendd>,
}
struct TestHookFakeRcs {
tc: Weak<TargetCollection>,
ascendd: Arc<Ascendd>,
}
struct TargetControl {
event_queue: events::Queue<DaemonEvent>,
tc: Arc<TargetCollection>,
_task: Task<()>,
}
impl TargetControl {
pub async fn send_mdns_discovery_event(&mut self, t: Target) {
let nodename =
t.nodename().await.expect("Should not send mDns discovery for unnamed node.");
let nodename_clone = nodename.clone();
self.event_queue
.push(DaemonEvent::WireTraffic(WireTrafficType::Mdns(TargetInfo {
nodename: nodename.clone(),
addresses: t.addrs().await.iter().cloned().collect(),
..Default::default()
})))
.await
.unwrap();
self.event_queue
.wait_for(None, move |e| e == DaemonEvent::NewTarget(Some(nodename_clone.clone())))
.await
.unwrap();
self.tc
.get(nodename)
.await
.unwrap()
.events
.wait_for(None, |e| e == TargetEvent::RcsActivated)
.await
.unwrap();
}
pub async fn send_fastboot_discovery_event(&mut self, t: Target) {
let nodename =
t.nodename().await.expect("Should not send fastboot discovery for unnamed node.");
let serial =
t.serial().await.expect("Should not send fastboot discovery for unnamed node.");
let nodename_clone = nodename.clone();
self.event_queue
.push(DaemonEvent::WireTraffic(WireTrafficType::Fastboot(TargetInfo {
nodename: nodename.clone(),
serial: Some(serial.clone()),
..Default::default()
})))
.await
.unwrap();
self.event_queue
.wait_for(None, move |e| e == DaemonEvent::NewTarget(Some(nodename_clone.clone())))
.await
.unwrap();
}
}
#[async_trait]
impl EventHandler<DaemonEvent> for TestHookFakeFastboot {
async fn on_event(&self, event: DaemonEvent) -> Result<bool> {
let tc = match self.tc.upgrade() {
Some(t) => t,
None => return Ok(true),
};
match event {
DaemonEvent::WireTraffic(WireTrafficType::Fastboot(t)) => {
tc.merge_insert(Target::from_target_info(self.ascendd.clone(), t.into()))
.then(|target| async move {
target
.update_connection_state(|_| ConnectionState::Fastboot(Utc::now()))
.await;
})
.await;
}
DaemonEvent::NewTarget(_) => {}
_ => panic!("unexpected event"),
}
Ok(false)
}
}
#[async_trait]
impl EventHandler<DaemonEvent> for TestHookFakeRcs {
async fn on_event(&self, event: DaemonEvent) -> Result<bool> {
let tc = match self.tc.upgrade() {
Some(t) => t,
None => return Ok(true),
};
match event {
DaemonEvent::WireTraffic(WireTrafficType::Mdns(t)) => {
tc.merge_insert(Target::from_target_info(self.ascendd.clone(), t)).await;
}
DaemonEvent::NewTarget(Some(n)) => {
let rcs = RcsConnection::new_with_proxy(
self.ascendd.clone(),
setup_fake_target_service(n),
&NodeId { id: 0u64 },
);
tc.merge_insert(Target::from_rcs_connection(rcs).await.unwrap()).await;
}
_ => panic!("unexpected event"),
}
Ok(false)
}
}
async fn spawn_daemon_server_with_target_ctrl_for_fastboot(
stream: DaemonRequestStream,
) -> TargetControl {
let d = Daemon::new_for_test().await;
d.event_queue
.add_handler(TestHookFakeFastboot {
ascendd: d.ascendd.clone(),
tc: Arc::downgrade(&d.target_collection),
})
.await;
let event_clone = d.event_queue.clone();
let res = TargetControl {
event_queue: event_clone,
tc: d.target_collection.clone(),
_task: Task::spawn(async move {
d.handle_requests_from_stream(stream)
.await
.unwrap_or_else(|err| log::warn!("Fatal error handling request: {:?}", err));
}),
};
res
}
async fn spawn_daemon_server_with_target_ctrl(stream: DaemonRequestStream) -> TargetControl {
let d = Daemon::new_for_test().await;
d.event_queue
.add_handler(TestHookFakeRcs {
ascendd: d.ascendd.clone(),
tc: Arc::downgrade(&d.target_collection),
})
.await;
let event_clone = d.event_queue.clone();
let res = TargetControl {
event_queue: event_clone,
tc: d.target_collection.clone(),
_task: Task::spawn(async move {
d.handle_requests_from_stream(stream)
.await
.unwrap_or_else(|err| log::warn!("Fatal error handling request: {:?}", err));
}),
};
res
}
async fn spawn_daemon_server_with_fake_target(
nodename: &str,
stream: DaemonRequestStream,
) -> (TargetControl, Arc<Ascendd>) {
let mut res = spawn_daemon_server_with_target_ctrl(stream).await;
let ascendd = Arc::new(create_ascendd().await.unwrap());
let fake_target = Target::new(ascendd.clone(), nodename);
fake_target
.addrs_insert(
SocketAddr::V6(SocketAddrV6::new("fe80::1".parse().unwrap(), 0, 0, 1)).into(),
)
.await;
res.send_mdns_discovery_event(fake_target).await;
(res, ascendd)
}
async fn spawn_daemon_server_with_fake_fastboot_target(
nodename: &str,
stream: DaemonRequestStream,
) -> (TargetControl, Arc<Ascendd>) {
let mut res = spawn_daemon_server_with_target_ctrl_for_fastboot(stream).await;
let ascendd = Arc::new(create_ascendd().await.unwrap());
let fake_target = Target::new_with_serial(ascendd.clone(), nodename, "florp");
res.send_fastboot_discovery_event(fake_target).await;
(res, ascendd)
}
fn setup_fake_target_service(nodename: String) -> RemoteControlProxy {
let (proxy, mut stream) =
fidl::endpoints::create_proxy_and_stream::<RemoteControlMarker>().unwrap();
fuchsia_async::Task::spawn(async move {
while let Ok(Some(req)) = stream.try_next().await {
match req {
rcs::RemoteControlRequest::IdentifyHost { responder } => {
let result: Vec<Subnet> = vec![Subnet {
addr: IpAddress::Ipv4(Ipv4Address { addr: [254, 254, 0, 1] }),
prefix_len: 24,
}];
let nodename =
if nodename.len() == 0 { None } else { Some(nodename.clone()) };
responder
.send(&mut Ok(rcs::IdentifyHostResponse {
nodename,
addresses: Some(result),
..rcs::IdentifyHostResponse::EMPTY
}))
.context("sending testing response")
.unwrap();
}
_ => assert!(false),
}
}
})
.detach();
proxy
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_getting_rcs_multiple_targets_mdns_with_empty_selector_should_err() -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (_, remote_server_end) = fidl::endpoints::create_proxy::<RemoteControlMarker>()?;
let (mut ctrl, ascendd) = spawn_daemon_server_with_fake_target("foobar", stream).await;
ctrl.send_mdns_discovery_event(Target::new(ascendd, "bazmumble")).await;
if let Ok(_) = daemon_proxy.get_remote_control(None, remote_server_end).await.unwrap() {
panic!("failure expected for multiple targets");
}
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_getting_rcs_multiple_targets_mdns_with_unclear_selector_should_not_err(
) -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (_, remote_server_end) = fidl::endpoints::create_proxy::<RemoteControlMarker>()?;
let (mut ctrl, ascendd) = spawn_daemon_server_with_fake_target("foobar", stream).await;
ctrl.send_mdns_discovery_event(Target::new(ascendd, "bazmumble")).await;
if let Err(_) = daemon_proxy.get_remote_control(Some(""), remote_server_end).await.unwrap()
{
panic!("failure expected for multiple targets");
}
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_getting_rcs_multiple_targets_mdns_with_correct_selector_should_not_err(
) -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (_, remote_server_end) = fidl::endpoints::create_proxy::<RemoteControlMarker>()?;
let (mut ctrl, ascendd) = spawn_daemon_server_with_fake_target("foobar", stream).await;
ctrl.send_mdns_discovery_event(Target::new(ascendd, "bazmumble")).await;
if let Err(_) =
daemon_proxy.get_remote_control(Some("foobar"), remote_server_end).await.unwrap()
{
panic!("failure unexpected for multiple targets with a matching selector");
}
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_getting_rcs_multiple_targets_mdns_with_incorrect_selector_should_err(
) -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (_, remote_server_end) = fidl::endpoints::create_proxy::<RemoteControlMarker>()?;
let (mut ctrl, ascendd) = spawn_daemon_server_with_fake_target("foobar", stream).await;
ctrl.send_mdns_discovery_event(Target::new(ascendd, "bazmumble")).await;
if let Ok(_) = timeout(Duration::from_millis(10), async move {
daemon_proxy.get_remote_control(Some("rando"), remote_server_end).await.unwrap()
})
.await
{
panic!("failure expected for multiple targets with a mismatched selector");
}
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_list_targets_mdns_discovery() -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (mut ctrl, ascendd) = spawn_daemon_server_with_fake_target("foobar", stream).await;
ctrl.send_mdns_discovery_event(Target::new(ascendd.clone(), "baz")).await;
ctrl.send_mdns_discovery_event(Target::new(ascendd.clone(), "quux")).await;
let res = daemon_proxy.list_targets("").await.unwrap();
// Daemon server contains one fake target plus these two.
assert_eq!(res.len(), 3);
let has_nodename =
|v: &Vec<bridge::Target>, s: &str| v.iter().any(|x| x.nodename.as_ref().unwrap() == s);
assert!(has_nodename(&res, "foobar"));
assert!(has_nodename(&res, "baz"));
assert!(has_nodename(&res, "quux"));
let res = daemon_proxy.list_targets("mlorp").await.unwrap();
assert!(!has_nodename(&res, "foobar"));
assert!(!has_nodename(&res, "baz"));
assert!(!has_nodename(&res, "quux"));
let res = daemon_proxy.list_targets("foobar").await.unwrap();
assert!(has_nodename(&res, "foobar"));
assert!(!has_nodename(&res, "baz"));
assert!(!has_nodename(&res, "quux"));
let res = daemon_proxy.list_targets("baz").await.unwrap();
assert!(!has_nodename(&res, "foobar"));
assert!(has_nodename(&res, "baz"));
assert!(!has_nodename(&res, "quux"));
let res = daemon_proxy.list_targets("quux").await.unwrap();
assert!(!has_nodename(&res, "foobar"));
assert!(!has_nodename(&res, "baz"));
assert!(has_nodename(&res, "quux"));
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_get_ssh_address() -> Result<()> {
let (daemon_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<DaemonMarker>()?;
let _ctrl = spawn_daemon_server_with_fake_target("foobar", stream).await;
let timeout = std::i64::MAX;
let r = daemon_proxy.get_ssh_address(Some("foobar"), timeout).await?;
// This is from the `spawn_daemon_server_with_fake_target` impl.
let want = Ok(bridge::TargetAddrInfo::Ip(bridge::TargetIp {
ip: fidl_net::IpAddress::Ipv6(fidl_net::Ipv6Address {
addr: [254, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
}),
scope_id: 1,
}));
assert_eq!(r, want);
let r = daemon_proxy.get_ssh_address(Some("toothpaste"), 10000).await?;
assert_eq!(r, Err(DaemonError::Timeout));
Ok(())
}
struct FakeConfigReader {
query_expected: String,
value: String,
}
#[async_trait]
impl ConfigReader for FakeConfigReader {
async fn get(&self, q: &str) -> Result<ffx_config::Value, ffx_config::api::ConfigError> {
assert_eq!(q, self.query_expected);
Ok(ffx_config::Value::String(self.value.clone()))
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_daemon_mdns_event_handler() {
let ascendd = Arc::new(create_ascendd().await.unwrap());
let t = Target::new(ascendd.clone(), "this-town-aint-big-enough-for-the-three-of-us");
let tc = Arc::new(TargetCollection::new());
tc.merge_insert(t.clone()).await;
let handler = DaemonEventHandler {
ascendd: ascendd.clone(),
target_collection: Arc::downgrade(&tc),
config_reader: Arc::new(FakeConfigReader {
query_expected: "target.default".to_owned(),
value: "florp".to_owned(),
}),
};
assert!(!handler
.on_event(DaemonEvent::WireTraffic(WireTrafficType::Mdns(TargetInfo {
nodename: t.nodename().await.expect("mdns target should always have a name."),
..Default::default()
})))
.await
.unwrap());
// This is a bit of a hack to check the target state without digging
// into internals. This also avoids running into a race waiting for an
// event to be propagated.
t.update_connection_state(|s| {
if let ConnectionState::Mdns(ref _t) = s {
s
} else {
panic!("state not updated by event, should be set to MDNS state.");
}
})
.await;
assert_eq!(
t.task_manager.task_snapshot(crate::target::TargetTaskType::HostPipe).await,
ffx_daemon_core::task::TaskSnapshot::NotRunning
);
// This handler will now return the value of the default target as
// intended.
let handler = DaemonEventHandler {
ascendd,
target_collection: Arc::downgrade(&tc),
config_reader: Arc::new(FakeConfigReader {
query_expected: "target.default".to_owned(),
value: "this-town-aint-big-enough-for-the-three-of-us".to_owned(),
}),
};
assert!(!handler
.on_event(DaemonEvent::WireTraffic(WireTrafficType::Mdns(TargetInfo {
nodename: t.nodename().await.expect("Handling Mdns traffic for unnamed node"),
..Default::default()
})))
.await
.unwrap());
assert_eq!(
t.task_manager.task_snapshot(crate::target::TargetTaskType::HostPipe).await,
ffx_daemon_core::task::TaskSnapshot::Running
);
// TODO(awdavies): RCS, Fastboot, etc. events.
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_add_target() {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let _ctrl = spawn_daemon_server_with_target_ctrl(stream).await;
let mut info = bridge::TargetAddrInfo::Ip(bridge::TargetIp {
ip: fidl_net::IpAddress::Ipv6(fidl_net::Ipv6Address {
addr: [254, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
}),
scope_id: 0,
});
daemon_proxy.add_target(&mut info).await.unwrap().unwrap();
let mut got = daemon_proxy.list_targets("").await.unwrap().into_iter();
let target = got.next().expect("Got no targets after adding a target.");
assert!(got.next().is_none());
assert!(target.nodename.is_none());
assert_eq!(target.addresses.as_ref().unwrap().len(), 1);
assert_eq!(target.addresses.unwrap()[0], info);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_get_default_target_empty() {
let ascendd = Arc::new(create_ascendd().await.unwrap());
let d = Daemon::new_for_test().await;
let nodename = "where-is-my-hasenpfeffer";
let t = Target::new_autoconnected(ascendd.clone(), nodename).await;
d.target_collection.merge_insert(t.clone()).await;
assert_eq!(nodename, d.get_default_target(None).await.unwrap().nodename().await.unwrap());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_get_default_target_query() {
let ascendd = Arc::new(create_ascendd().await.unwrap());
let d = Daemon::new_for_test().await;
let nodename = "where-is-my-hasenpfeffer";
let t = Target::new_autoconnected(ascendd.clone(), nodename).await;
d.target_collection.merge_insert(t.clone()).await;
assert_eq!(
nodename,
d.get_default_target(Some(nodename.to_string()))
.await
.unwrap()
.nodename()
.await
.unwrap()
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_get_default_target_ambiguous() {
let ascendd = Arc::new(create_ascendd().await.unwrap());
let d = Daemon::new_for_test().await;
let t = Target::new_autoconnected(ascendd.clone(), "where-is-my-hasenpfeffer").await;
let t2 = Target::new_autoconnected(ascendd.clone(), "it-is-rabbit-season").await;
d.target_collection.merge_insert(t.clone()).await;
d.target_collection.merge_insert(t2.clone()).await;
assert_eq!(Err(DaemonError::TargetAmbiguous), d.get_default_target(None).await);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fastboot_on_rcs_error_msg() -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (_, fastboot_server_end) = fidl::endpoints::create_proxy::<FastbootMarker>()?;
let _ctrl = spawn_daemon_server_with_fake_target("foobar", stream).await;
let got = daemon_proxy.get_fastboot(None, fastboot_server_end).await?;
match got {
Err(FastbootError::NonFastbootDevice) => Ok(()),
_ => bail!("Expecting non fastboot device error message."),
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_rcs_on_fastboot_error_msg() -> Result<()> {
let (daemon_proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
let (_, remote_server_end) = fidl::endpoints::create_proxy::<RemoteControlMarker>()?;
let _ctrl = spawn_daemon_server_with_fake_fastboot_target("foobar", stream).await;
let got = daemon_proxy.get_remote_control(None, remote_server_end).await?;
match got {
Err(DaemonError::TargetInFastboot) => Ok(()),
_ => bail!("Expecting target in fastboot error message."),
}
}
}