blob: 67743da63b72fa0b99c2c672f0291277a9557aa9 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use ffx_config::get;
use ffx_stream_util::TryStreamUtilExt;
use fidl::endpoints::ProtocolMarker;
use fidl_fuchsia_developer_ffx as ffx;
use fuchsia_async::Task;
use futures::TryStreamExt;
use mdns_discovery::{
discovery_loop, DiscoveryConfig, MdnsEnabledChecker, MdnsProtocol, MDNS_BROADCAST_INTERVAL,
MDNS_INTERFACE_DISCOVERY_INTERVAL, MDNS_TTL,
};
use protocols::prelude::*;
use std::rc::Rc;
// Default port to listen on for MDNS queries
const MDNS_PORT: u16 = 5353;
#[ffx_protocol]
#[derive(Default)]
pub struct Mdns {
mdns_task: Option<Task<()>>,
inner: Option<Rc<MdnsProtocol>>,
receiver: Option<async_channel::Receiver<ffx::MdnsEventType>>,
// If None defaults to MDNS_PORT
mdns_port: Option<u16>,
}
struct ConfigLoader;
#[async_trait(?Send)]
impl MdnsEnabledChecker for ConfigLoader {
async fn enabled(&self) -> bool {
get("discovery.mdns.enabled").await.unwrap_or(true)
}
}
#[async_trait(?Send)]
impl FidlProtocol for Mdns {
type Protocol = ffx::MdnsMarker;
type StreamHandler = FidlStreamHandler<Self>;
async fn handle(&self, _cx: &Context, req: ffx::MdnsRequest) -> Result<()> {
match req {
ffx::MdnsRequest::GetTargets { responder } => responder
.send(
&self.inner.as_ref().expect("inner state should be initalized").target_cache(),
)
.map_err(Into::into),
ffx::MdnsRequest::GetNextEvent { responder } => responder
.send(
self.receiver
.as_ref()
.expect("receiver should be initialized")
.recv()
.await
.ok()
.as_ref(),
)
.map_err(Into::into),
}
}
async fn start(&mut self, _cx: &Context) -> Result<()> {
let (sender, receiver) = async_channel::bounded::<ffx::MdnsEventType>(1);
let inner = Rc::new(MdnsProtocol { events_out: sender, target_cache: Default::default() });
self.inner.replace(inner.clone());
self.receiver.replace(receiver);
let inner = Rc::downgrade(&inner);
let mdns_port = self.mdns_port.unwrap_or(MDNS_PORT);
self.mdns_task.replace(Task::local(discovery_loop(
DiscoveryConfig {
socket_tasks: Default::default(),
mdns_protocol: inner,
discovery_interval: MDNS_INTERFACE_DISCOVERY_INTERVAL,
query_interval: MDNS_BROADCAST_INTERVAL,
ttl: MDNS_TTL,
mdns_port,
},
ConfigLoader {},
)));
Ok(())
}
async fn stop(&mut self, _cx: &Context) -> Result<()> {
self.mdns_task.take().ok_or(anyhow!("mdns_task never started"))?;
Ok(())
}
async fn serve<'a>(
&'a self,
cx: &'a Context,
stream: <Self::Protocol as ProtocolMarker>::RequestStream,
) -> Result<()> {
// This is necessary as we'll be hanging forever waiting on incoming
// traffic. This will exit early if the stream is closed at any point.
stream
.map_err(|err| anyhow!("{}", err))
.try_for_each_concurrent_while_connected(None, |req| self.handle(cx, req))
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use lazy_static::lazy_static;
use mdns::protocol::{
Class, DomainBuilder, EmbeddedPacketBuilder, MessageBuilder, RecordBuilder, Type,
};
use packet::serialize::{InnerPacketBuilder, Serializer};
use protocols::testing::FakeDaemonBuilder;
use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
lazy_static! {
// This is copied from the //fuchsia/lib/src/mdns/rust/src/protocol.rs
// tests library.
static ref MDNS_PACKET: Vec<u8> = vec![
0x00, 0x00, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x08, 0x5f,
0x66, 0x75, 0x63, 0x68, 0x73, 0x69, 0x61, 0x04, 0x5f, 0x75, 0x64, 0x70, 0x05, 0x6c,
0x6f, 0x63, 0x61, 0x6c, 0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, 0x00, 0x11, 0x94, 0x00,
0x18, 0x15, 0x74, 0x68, 0x75, 0x6d, 0x62, 0x2d, 0x73, 0x65, 0x74, 0x2d, 0x68, 0x75,
0x6d, 0x61, 0x6e, 0x2d, 0x73, 0x68, 0x72, 0x65, 0x64, 0xc0, 0x0c, 0xc0, 0x2b, 0x00,
0x21, 0x80, 0x01, 0x00, 0x00, 0x00, 0x78, 0x00, 0x1e, 0x00, 0x00, 0x00, 0x00, 0x14,
0xe9, 0x15, 0x74, 0x68, 0x75, 0x6d, 0x62, 0x2d, 0x73, 0x65, 0x74, 0x2d, 0x68, 0x75,
0x6d, 0x61, 0x6e, 0x2d, 0x73, 0x68, 0x72, 0x65, 0x64, 0xc0, 0x1a, 0xc0, 0x2b, 0x00,
0x10, 0x80, 0x01, 0x00, 0x00, 0x11, 0x94, 0x00, 0x01, 0x00, 0xc0, 0x55, 0x00, 0x01,
0x80, 0x01, 0x00, 0x00, 0x00, 0x78, 0x00, 0x04, 0xac, 0x10, 0xf3, 0x26, 0xc0, 0x55,
0x00, 0x1c, 0x80, 0x01, 0x00, 0x00, 0x00, 0x78, 0x00, 0x10, 0xfe, 0x80, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x8e, 0xae, 0x4c, 0xff, 0xfe, 0xe9, 0xc9, 0xd3,
];
}
async fn wait_for_port_binds(proxy: &ffx::MdnsProxy) -> u16 {
if let Some(e) = proxy.get_next_event().await.unwrap() {
match *e {
ffx::MdnsEventType::SocketBound(ffx::MdnsBindEvent { port, .. }) => {
let p = port.unwrap();
assert_ne!(p, 0);
return p;
}
e => panic!(
"events should start with two port binds. encountered unrecognized event: {:?}",
e
),
}
}
panic!("no port bound");
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_mdns_get_targets_empty() {
let mdns = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let daemon = FakeDaemonBuilder::new().inject_fidl_protocol::<Mdns>(mdns).build();
let proxy = daemon.open_proxy::<ffx::MdnsMarker>().await;
let targets = proxy.get_targets().await.unwrap();
assert_eq!(targets.len(), 0);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_mdns_stop() {
let daemon = FakeDaemonBuilder::new().build();
let protocol = Rc::new(RefCell::new(Mdns {
mdns_task: None,
inner: None,
receiver: None,
mdns_port: Some(0),
}));
let (proxy, task) = protocols::testing::create_proxy(protocol.clone(), &daemon).await;
drop(proxy);
task.await;
assert!(protocol.borrow().mdns_task.is_none());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_mdns_bind_event_on_first_listen() {
let mdns = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let daemon = FakeDaemonBuilder::new().inject_fidl_protocol::<Mdns>(mdns).build();
let proxy = daemon.open_proxy::<ffx::MdnsMarker>().await;
while let Some(e) = proxy.get_next_event().await.unwrap() {
if matches!(*e, ffx::MdnsEventType::SocketBound(_)) {
break;
}
}
}
#[fuchsia_async::run_singlethreaded(test)]
#[ignore] // TODO(b/297919461) -- re-enable or delete
async fn test_mdns_network_traffic_valid() {
let mdns = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let daemon = FakeDaemonBuilder::new().inject_fidl_protocol::<Mdns>(mdns).build();
let proxy = daemon.open_proxy::<ffx::MdnsMarker>().await;
let bound_port = wait_for_port_binds(&proxy).await;
// Note: this and other tests are only using IPv4 due to some issues
// on Mac, wherein sending on the unspecified address leads to a "no
// route to host" error. For some reason using IPv6 with either the
// unspecified address or the localhost address leads in errors or
// hangs on Mac tests.
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.unwrap();
socket.set_ttl(1).unwrap();
socket.set_multicast_ttl_v4(1).unwrap();
let addr: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, bound_port).into();
let my_ip = match addr.ip() {
IpAddr::V4(addr) => addr.octets(),
_ => panic!("expected ipv4 addr"),
};
let mut message = MessageBuilder::new(0, true);
let domain =
DomainBuilder::from_str("thumb-set-human-shred._fuchsia._udp.local").unwrap().bytes();
let ptr = RecordBuilder::new(
DomainBuilder::from_str("_fuchsia._udp.local").unwrap(),
Type::Ptr,
Class::In,
true,
1,
&domain,
);
message.add_answer(ptr);
let nodename = DomainBuilder::from_str("thumb-set-human-shred.local").unwrap();
let rec = RecordBuilder::new(nodename, Type::A, Class::In, true, 4500, &my_ip);
message.add_additional(rec);
let msg_bytes = message
.into_serializer()
.serialize_vec_outer()
.unwrap_or_else(|_| panic!("failed to serialize"))
.unwrap_b();
socket.send_to(msg_bytes.as_ref(), &addr.into()).unwrap();
while let Some(e) = proxy.get_next_event().await.unwrap() {
if matches!(*e, ffx::MdnsEventType::TargetFound(_),) {
break;
}
}
assert_eq!(
proxy.get_targets().await.unwrap().into_iter().next().unwrap().nodename.unwrap(),
"thumb-set-human-shred",
);
socket.send_to(msg_bytes.as_ref(), &addr.into()).unwrap();
while let Some(e) = proxy.get_next_event().await.unwrap() {
if matches!(*e, ffx::MdnsEventType::TargetRediscovered(_)) {
break;
}
}
assert_eq!(
proxy.get_targets().await.unwrap().into_iter().next().unwrap().nodename.unwrap(),
"thumb-set-human-shred",
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_mdns_network_traffic_invalid() {
let mdns = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let daemon = FakeDaemonBuilder::new().inject_fidl_protocol::<Mdns>(mdns).build();
let proxy = daemon.open_proxy::<ffx::MdnsMarker>().await;
let bound_port = wait_for_port_binds(&proxy).await;
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.unwrap();
socket.set_ttl(1).unwrap();
socket.set_multicast_ttl_v4(1).unwrap();
let addr: SocketAddr = (std::net::Ipv4Addr::UNSPECIFIED, bound_port).into();
// This is just a copy of the valid mdns packet but with a few bytes altered.
let packet: Vec<u8> = vec![
0x00, 0x00, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x08, 0x5f,
0x66, 0x75, 0x63, 0x68, 0x73, 0x69, 0x61, 0x04, 0x5f, 0x75, 0x64, 0x70, 0x05, 0x6c,
0x6f, 0x63, 0x61, 0x6c, 0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, 0x00, 0x11, 0x94, 0x00,
0x18, 0x95, 0x74, 0x68, 0x75, 0x6d, 0x62, 0x2d, 0x73, 0x65, 0x74, 0x2d, 0x68, 0x75,
0x6d, 0x61, 0x6e, 0x2d, 0x73, 0x68, 0x72, 0x65, 0x64, 0xc0, 0x0c, 0xc0, 0x2b, 0x00,
0x21, 0x80, 0x01, 0x00, 0x10, 0x02, 0x78, 0x00, 0x1e, 0x00, 0x00, 0x00, 0x00, 0x14,
0xe9, 0x15, 0x74, 0x68, 0x75, 0x6d, 0x62, 0x2d, 0x73, 0x65, 0x74, 0x2d, 0x68, 0x75,
0x6d, 0x61, 0x6e, 0x2d, 0x73, 0x68, 0x72, 0x65, 0x64, 0xc0, 0x1a, 0xc0, 0x2b, 0x00,
0x10, 0x80, 0x01, 0x00, 0x00, 0x11, 0x94, 0x00, 0x01, 0x00, 0xc0, 0x55, 0x00, 0x01,
0x80, 0x01, 0x00, 0x00, 0x00, 0x78, 0x00, 0x04, 0xac, 0x10, 0xf3, 0x26, 0xc0, 0x55,
0x00, 0x1c, 0x80, 0x01, 0x00, 0x00, 0x00, 0x78, 0x00, 0x10, 0xfe, 0x80, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x8e, 0xae, 0x4c, 0xff, 0xfe, 0xe9, 0xc9, 0xd3,
];
socket.send_to(&packet, &addr.into()).unwrap();
// This is here to un-stick the executor a bit, as otherwise the socket
// will not get read, and the code being tested will not get exercised.
//
// If this ever flakes it will be because somehow valid traffic made it
// onto the network.
fuchsia_async::Timer::new(std::time::Duration::from_millis(200)).await;
assert_eq!(proxy.get_targets().await.unwrap().len(), 0);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_mdns_network_traffic_wrong_protocol() {
let mdns = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let daemon = FakeDaemonBuilder::new().inject_fidl_protocol::<Mdns>(mdns).build();
let proxy = daemon.open_proxy::<ffx::MdnsMarker>().await;
let bound_port = wait_for_port_binds(&proxy).await;
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.unwrap();
socket.set_ttl(1).unwrap();
socket.set_multicast_ttl_v4(1).unwrap();
let addr: SocketAddr = (std::net::Ipv4Addr::UNSPECIFIED, bound_port).into();
let domain = DomainBuilder::from_str("_nonsense._udp.local").unwrap();
let record = RecordBuilder::new(
domain,
Type::Ptr,
Class::In,
true,
4500,
&[0x03, 'f' as u8, 'o' as u8, 'o' as u8, 0],
);
let nodename = DomainBuilder::from_str("fuchsia_thing._fuchsia._udp.local").unwrap();
let other_record =
RecordBuilder::new(nodename, Type::A, Class::In, true, 4500, &[8, 8, 8, 8]);
let mut message = MessageBuilder::new(0, true);
message.add_additional(record);
message.add_additional(other_record);
let msg_bytes = message
.into_serializer()
.serialize_vec_outer()
.unwrap_or_else(|_| panic!("failed to serialize"))
.unwrap_b();
socket.send_to(msg_bytes.as_ref(), &addr.into()).unwrap();
// This is here to un-stick the executor a bit, as otherwise the socket
// will not get read, and the code being tested will not get exercised.
//
// If this ever flakes it will be because somehow valid traffic made it
// onto the network.
fuchsia_async::Timer::new(std::time::Duration::from_millis(200)).await;
assert_eq!(proxy.get_targets().await.unwrap().len(), 0);
}
#[fuchsia_async::run_singlethreaded(test)]
#[ignore] // TODO(b/297919461) -- re-enable or delete
async fn test_new_and_rediscovered_target() {
let daemon = FakeDaemonBuilder::new().build();
let protocol = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let (proxy, _task) = protocols::testing::create_proxy(protocol.clone(), &daemon).await;
let svc_inner = protocol.borrow().inner.as_ref().unwrap().clone();
let nodename = "plop".to_owned();
// Skip port binding.
let _ = wait_for_port_binds(&proxy).await;
svc_inner
.handle_target(
ffx::TargetInfo { nodename: Some(nodename.clone()), ..Default::default() },
5000,
)
.await;
if let Some(e) = proxy.get_next_event().await.unwrap() {
assert!(matches!(*e, ffx::MdnsEventType::TargetFound(_),));
}
assert_eq!(
proxy.get_targets().await.unwrap().into_iter().next().unwrap().nodename.unwrap(),
nodename
);
svc_inner
.handle_target(
ffx::TargetInfo { nodename: Some(nodename.clone()), ..Default::default() },
5000,
)
.await;
if let Some(e) = proxy.get_next_event().await.unwrap() {
assert!(matches!(*e, ffx::MdnsEventType::TargetRediscovered(_),));
}
assert_eq!(
proxy.get_targets().await.unwrap().into_iter().next().unwrap().nodename.unwrap(),
nodename
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_target_eviction() {
let daemon = FakeDaemonBuilder::new().build();
let protocol = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let (proxy, _task) = protocols::testing::create_proxy(protocol.clone(), &daemon).await;
let svc_inner = protocol.borrow().inner.as_ref().unwrap().clone();
let nodename = "plop".to_owned();
// Skip port binding.
let _ = wait_for_port_binds(&proxy).await;
svc_inner
.handle_target(
ffx::TargetInfo { nodename: Some(nodename.clone()), ..Default::default() },
1,
)
.await;
// Hangs until the target expires.
let mut new_target_found = false;
while let Some(e) = proxy.get_next_event().await.unwrap() {
match *e {
ffx::MdnsEventType::TargetExpired(_) => {
break;
}
ffx::MdnsEventType::TargetFound(t) => {
assert_eq!(t.nodename.unwrap(), nodename);
new_target_found = true;
}
_ => {}
}
}
assert_eq!(proxy.get_targets().await.unwrap().len(), 0);
assert!(new_target_found);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_target_eviction_timer_override() {
let daemon = FakeDaemonBuilder::new().build();
let protocol = Rc::new(RefCell::new(Mdns {
inner: None,
receiver: None,
mdns_task: None,
mdns_port: Some(0),
}));
let (proxy, _task) = protocols::testing::create_proxy(protocol.clone(), &daemon).await;
let svc_inner = protocol.borrow().inner.as_ref().unwrap().clone();
let nodename = "plop".to_owned();
// Skip port binding.
let _ = wait_for_port_binds(&proxy).await;
svc_inner
.handle_target(
ffx::TargetInfo { nodename: Some(nodename.clone()), ..Default::default() },
50000,
)
.await;
while let Some(e) = proxy.get_next_event().await.unwrap() {
match *e {
ffx::MdnsEventType::TargetFound(t) => {
assert_eq!(t.nodename.unwrap(), nodename);
break;
}
_ => {}
}
}
svc_inner
.handle_target(
ffx::TargetInfo { nodename: Some(nodename.clone()), ..Default::default() },
1,
)
.await;
while let Some(e) = proxy.get_next_event().await.unwrap() {
if matches!(*e, ffx::MdnsEventType::TargetExpired(_)) {
break;
}
}
assert_eq!(proxy.get_targets().await.unwrap().len(), 0);
}
}