blob: 811cc70dd75f51d341251efa43f57a39b3af4c04 [file] [log] [blame]
use std::os::unix::prelude::AsRawFd;
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::MdnsServiceInner,
anyhow::{Context as _, Result},
async_io::Async,
async_lock::Mutex,
async_net::UdpSocket,
fidl_fuchsia_developer_bridge as bridge,
fidl_fuchsia_net::{IpAddress, Ipv4Address, Ipv6Address},
fuchsia_async::{Task, Timer},
futures::FutureExt,
mdns::protocol as dns,
netext::{get_mcast_interfaces, IsLocalAddr},
packet::{InnerPacketBuilder, ParseBuffer},
std::collections::HashMap,
std::collections::HashSet,
std::fmt::Write,
std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
std::rc::{Rc, Weak},
std::time::Duration,
zerocopy::ByteSlice,
};
const MDNS_MCAST_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
const MDNS_MCAST_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x00fb);
#[cfg(not(test))]
const MDNS_PORT: u16 = 5353;
#[cfg(test)]
const MDNS_PORT: u16 = 0;
pub(crate) struct DiscoveryConfig {
pub socket_tasks: Rc<Mutex<HashMap<IpAddr, Task<()>>>>,
pub mdns_service: Weak<MdnsServiceInner>,
pub discovery_interval: Duration,
pub query_interval: Duration,
pub ttl: u32,
}
async fn propagate_bind_event(sock: &UdpSocket, svc: &Weak<MdnsServiceInner>) -> u16 {
let port = match sock.local_addr().unwrap() {
SocketAddr::V4(s) => s.port(),
SocketAddr::V6(s) => s.port(),
};
if let Some(svc) = svc.upgrade() {
svc.publish_event(bridge::MdnsEventType::SocketBound(bridge::MdnsBindEvent {
port: Some(port),
..bridge::MdnsBindEvent::EMPTY
}))
.await;
}
port
}
// discovery_loop iterates over all multicast interfaces and adds them to
// the socket_tasks if there is not already a task for that interface.
pub(crate) async fn discovery_loop(config: DiscoveryConfig) {
let DiscoveryConfig { socket_tasks, mdns_service, discovery_interval, query_interval, ttl } =
config;
// See fxbug.dev/62617#c10 for details. A macOS system can end up in
// a situation where the default routes for protocols are on
// non-functional interfaces, and under such conditions the wildcard
// listen socket binds will fail. We will repeat attempting to bind
// them, as newly added interfaces later may unstick the issue, if
// they introduce new routes. These boolean flags are used to
// suppress the production of a log output in every interface
// iteration.
// In order to manually reproduce these conditions on a macOS
// system, open Network.prefpane, and for each connection in the
// list select Advanced... > TCP/IP > Configure IPv6 > Link-local
// only. Click apply, then restart the ffx daemon.
let mut should_log_v4_listen_error = true;
let mut should_log_v6_listen_error = true;
let mut v4_listen_socket: Weak<UdpSocket> = Weak::new();
let mut v6_listen_socket: Weak<UdpSocket> = Weak::new();
loop {
if v4_listen_socket.upgrade().is_none() {
match make_listen_socket((MDNS_MCAST_V4, MDNS_PORT).into())
.context("make_listen_socket for IPv4")
{
Ok(sock) => {
// TODO(awdavies): Networking tests appear to fail when
// using IPv6. Only propagates the port binding event for
// IPv4.
let _ = propagate_bind_event(&sock, &mdns_service).await;
let sock = Rc::new(sock);
v4_listen_socket = Rc::downgrade(&sock);
Task::local(recv_loop(sock, mdns_service.clone())).detach();
should_log_v4_listen_error = true;
}
Err(err) => {
if should_log_v4_listen_error {
log::error!(
"unable to bind IPv4 listen socket: {}. Discovery may fail.",
err
);
should_log_v4_listen_error = false;
}
}
}
}
if v6_listen_socket.upgrade().is_none() {
match make_listen_socket((MDNS_MCAST_V6, MDNS_PORT).into())
.context("make_listen_socket for IPv6")
{
Ok(sock) => {
let sock = Rc::new(sock);
v6_listen_socket = Rc::downgrade(&sock);
Task::local(recv_loop(sock, mdns_service.clone())).detach();
should_log_v6_listen_error = true;
}
Err(err) => {
if should_log_v6_listen_error {
log::error!(
"unable to bind IPv6 listen socket: {}. Discovery may fail.",
err
);
should_log_v6_listen_error = false;
}
}
}
}
// As some operating systems will not error sendmsg/recvmsg for UDP
// sockets bound to addresses that no longer exist, they must be removed
// by ensuring that they still exist, otherwise we may be sending out
// unanswerable queries.
let mut to_delete = HashSet::<IpAddr>::new();
for ip in socket_tasks.lock().await.keys() {
to_delete.insert(ip.clone());
}
for iface in get_mcast_interfaces().unwrap_or(Vec::new()) {
match iface.id() {
Ok(id) => {
if let Some(sock) = v6_listen_socket.upgrade() {
let _ = sock.join_multicast_v6(&MDNS_MCAST_V6, id);
}
}
Err(err) => {
log::warn!("{}", err);
}
}
for addr in iface.addrs.iter() {
to_delete.remove(&addr.ip());
let mut addr = addr.clone();
addr.set_port(0);
// TODO(raggi): remove duplicate joins, log unexpected errors
if let SocketAddr::V4(addr) = addr {
if let Some(sock) = v4_listen_socket.upgrade() {
let _ = sock.join_multicast_v4(MDNS_MCAST_V4, *addr.ip());
}
}
if socket_tasks.lock().await.get(&addr.ip()).is_some() {
continue;
}
let sock = iface
.id()
.map(|id| match make_sender_socket(id, addr.clone(), ttl) {
Ok(sock) => Some(sock),
Err(err) => {
log::info!("mdns: failed to bind {}: {}", &addr, err);
None
}
})
.ok()
.flatten();
if sock.is_some() {
socket_tasks.lock().await.insert(
addr.ip().clone(),
Task::local(query_recv_loop(
Rc::new(sock.unwrap()),
mdns_service.clone(),
query_interval,
socket_tasks.clone(),
)),
);
}
}
}
// Drop tasks for IP addresses no longer found on the system.
{
let mut tasks = socket_tasks.lock().await;
for ip in to_delete {
if let Some(handle) = tasks.remove(&ip) {
handle.cancel().await;
}
}
}
Timer::new(discovery_interval).await;
}
}
fn make_target<B: ByteSlice + Clone>(
src: SocketAddr,
msg: dns::Message<B>,
) -> Option<(bridge::Target, u32)> {
let mut nodename = String::new();
let mut ttl = 0u32;
let src = bridge::TargetAddrInfo::Ip(bridge::TargetIp {
ip: match &src {
SocketAddr::V6(s) => IpAddress::Ipv6(Ipv6Address { addr: s.ip().octets().into() }),
SocketAddr::V4(s) => IpAddress::Ipv4(Ipv4Address { addr: s.ip().octets().into() }),
},
scope_id: if let SocketAddr::V6(s) = &src { s.scope_id() } else { 0 },
});
let is_fastboot = is_fastboot_response(&msg);
for record in msg.additional.iter() {
if record.rtype != dns::Type::A && record.rtype != dns::Type::Aaaa {
continue;
}
if nodename.len() == 0 {
write!(nodename, "{}", record.domain).unwrap();
nodename = nodename.trim_end_matches(".local").into();
}
if ttl == 0 {
ttl = record.ttl;
}
// The records here also have the IP addresses of
// the machine, however these could be different if behind a NAT
// (as with QEMU). Later it might be useful to store them in the
// Target struct.
}
if nodename.len() == 0 || ttl == 0 {
return None;
}
Some((
bridge::Target {
nodename: Some(nodename),
addresses: Some(vec![src]),
target_state: if is_fastboot { Some(bridge::TargetState::Fastboot) } else { None },
..bridge::Target::EMPTY
},
ttl,
))
}
// recv_loop reads packets from sock. If the packet is a Fuchsia mdns packet, a
// corresponding mdns event is published to the queue. All other packets are
// silently discarded.
async fn recv_loop(sock: Rc<UdpSocket>, mdns_service: Weak<MdnsServiceInner>) {
loop {
let mut buf = &mut [0u8; 1500][..];
let addr = match sock.recv_from(&mut buf).await {
Ok((sz, addr)) => {
buf = &mut buf[..sz];
addr
}
Err(err) => {
log::info!("listen socket recv error: {}, mdns listener closed", err);
return;
}
};
// Note: important, otherwise non-local responders could add themselves.
if !addr.ip().is_local_addr() {
continue;
}
let msg = match buf.parse::<dns::Message<_>>() {
Ok(msg) => msg,
Err(e) => {
log::trace!(
"unable to parse message received on {} from {}: {:?}",
sock.local_addr()
.map(|s| format!("{}", s))
.unwrap_or(format!("fd:{}", sock.as_raw_fd())),
addr,
e
);
continue;
}
};
if !is_fuchsia_response(&msg) && !is_fastboot_response(&msg) {
continue;
}
if let Some(mdns_service) = mdns_service.upgrade() {
if let Some((t, ttl)) = make_target(addr, msg) {
log::trace!(
"packet from {} ({}) on {}",
addr,
t.nodename.as_ref().unwrap_or(&"<unknown>".to_string()),
sock.local_addr().unwrap()
);
mdns_service.handle_target(t, ttl).await;
}
} else {
return;
}
}
}
lazy_static::lazy_static! {
static ref QUERY_BUF: &'static [u8] = {
let question = dns::QuestionBuilder::new(
dns::DomainBuilder::from_str("_fuchsia._udp.local").unwrap(),
dns::Type::Ptr,
dns::Class::In,
true,
);
let mut message = dns::MessageBuilder::new(0, true);
message.add_question(question);
let mut buf = vec![0; message.bytes_len()];
message.serialize(buf.as_mut_slice());
Box::leak(buf.into_boxed_slice())
};
}
// query_loop broadcasts an mdns query on sock every interval.
async fn query_loop(sock: Rc<UdpSocket>, interval: Duration) {
let to_addr: SocketAddr = match sock.local_addr() {
Ok(SocketAddr::V4(_)) => (MDNS_MCAST_V4, MDNS_PORT).into(),
Ok(SocketAddr::V6(_)) => (MDNS_MCAST_V6, MDNS_PORT).into(),
Err(err) => {
log::info!("resolving local socket addr failed with: {}", err);
return;
}
};
loop {
if let Err(err) = sock.send_to(&QUERY_BUF, to_addr).await {
log::info!(
"mdns query failed from {}: {}",
sock.local_addr().map(|a| a.to_string()).unwrap_or("unknown".to_string()),
err
);
return;
}
Timer::new(interval).await;
}
}
// sock is dispatched with a recv_loop, as well as broadcasting an
// mdns query to discover Fuchsia devices every interval.
async fn query_recv_loop(
sock: Rc<UdpSocket>,
mdns_service: Weak<MdnsServiceInner>,
interval: Duration,
tasks: Rc<Mutex<HashMap<IpAddr, Task<()>>>>,
) {
let mut recv = recv_loop(sock.clone(), mdns_service).boxed_local().fuse();
let mut query = query_loop(sock.clone(), interval).boxed_local().fuse();
let addr = match sock.local_addr() {
Ok(addr) => addr,
Err(err) => {
log::error!("mdns: failed to shutdown: {:?}", err);
return;
}
};
log::info!("mdns: started query socket {}", &addr);
let _ = futures::select!(
_ = recv => {},
_ = query => {},
);
drop(recv);
drop(query);
tasks.lock().await.remove(&addr.ip()).map(drop);
log::info!("mdns: shut down query socket {}", &addr);
}
fn is_fuchsia_response<B: zerocopy::ByteSlice + Clone>(m: &dns::Message<B>) -> bool {
m.answers.len() >= 1 && m.answers[0].domain == "_fuchsia._udp.local"
}
fn is_fastboot_response<B: zerocopy::ByteSlice + Clone>(m: &dns::Message<B>) -> bool {
m.answers.len() >= 1 && m.answers[0].domain == "_fastboot._udp.local"
}
fn make_listen_socket(listen_addr: SocketAddr) -> Result<UdpSocket> {
let socket: std::net::UdpSocket = match listen_addr {
SocketAddr::V4(_) => {
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.context("construct datagram socket")?;
socket.set_multicast_loop_v4(false).context("set_multicast_loop_v4")?;
socket.set_reuse_address(true).context("set_reuse_address")?;
socket.set_reuse_port(true).context("set_reuse_port")?;
socket
.bind(
&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), listen_addr.port()).into(),
)
.context("bind")?;
socket
.join_multicast_v4(&MDNS_MCAST_V4, &Ipv4Addr::UNSPECIFIED)
.context("join_multicast_v4")?;
socket
}
SocketAddr::V6(_) => {
let socket = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.context("construct datagram socket")?;
socket.set_only_v6(true).context("set_only_v6")?;
socket.set_multicast_loop_v6(false).context("set_multicast_loop_v6")?;
socket.set_reuse_address(true).context("set_reuse_address")?;
socket.set_reuse_port(true).context("set_reuse_port")?;
socket
.bind(
&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), listen_addr.port()).into(),
)
.context("bind")?;
// For some reason this often fails to bind on Mac, so avoid it and
// use the interface binding loop to get multicast group joining to
// work.
#[cfg(not(target_os = "macos"))]
socket.join_multicast_v6(&MDNS_MCAST_V6, 0).context("join_multicast_v6")?;
socket
}
}
.into();
Ok(Async::new(socket)?.into())
}
fn make_sender_socket(interface_id: u32, addr: SocketAddr, ttl: u32) -> Result<UdpSocket> {
let socket: std::net::UdpSocket = match addr {
SocketAddr::V4(ref saddr) => {
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.context("construct datagram socket")?;
socket.set_ttl(ttl).context("set_ttl")?;
socket.set_multicast_if_v4(&saddr.ip()).context("set_multicast_if_v4")?;
socket.set_multicast_ttl_v4(ttl).context("set_multicast_ttl_v4")?;
socket.bind(&addr.into()).context("bind")?;
socket
}
SocketAddr::V6(ref _saddr) => {
let socket = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.context("construct datagram socket")?;
socket.set_only_v6(true).context("set_only_v6")?;
socket.set_multicast_if_v6(interface_id).context("set_multicast_if_v6")?;
socket.set_unicast_hops_v6(ttl).context("set_unicast_hops_v6")?;
socket.set_multicast_hops_v6(ttl).context("set_multicast_hops_v6")?;
socket.bind(&addr.into()).context("bind")?;
socket
}
}
.into();
Ok(Async::new(socket)?.into())
}
#[cfg(test)]
mod tests {
use super::*;
use ::mdns::protocol::{Class, DomainBuilder, Message, MessageBuilder, RecordBuilder, Type};
use packet::{InnerPacketBuilder, ParseBuffer, Serializer};
#[test]
fn test_make_target() {
let nodename = DomainBuilder::from_str("foo._fuchsia._udp.local").unwrap();
let record = RecordBuilder::new(nodename, Type::A, Class::Any, true, 4500, &[8, 8, 8, 8]);
let mut message = MessageBuilder::new(0, true);
message.add_additional(record);
let mut msg_bytes = message
.into_serializer()
.serialize_vec_outer()
.unwrap_or_else(|_| panic!("failed to serialize"));
let parsed = msg_bytes.parse::<Message<_>>().expect("failed to parse");
let addr: SocketAddr = (MDNS_MCAST_V4, 12).into();
let (t, ttl) = make_target(addr.clone(), parsed).unwrap();
assert_eq!(ttl, 4500);
assert_eq!(
t.addresses.as_ref().unwrap()[0],
bridge::TargetAddrInfo::Ip(bridge::TargetIp {
ip: IpAddress::Ipv4(Ipv4Address { addr: MDNS_MCAST_V4.octets().into() }),
scope_id: 0
})
);
assert_eq!(t.nodename.unwrap(), "foo._fuchsia._udp");
}
#[test]
fn test_make_target_no_valid_record() {
let nodename = DomainBuilder::from_str("foo._fuchsia._udp.local").unwrap();
let record = RecordBuilder::new(
nodename,
Type::Ptr,
Class::Any,
true,
4500,
&[0x03, 'f' as u8, 'o' as u8, 'o' as u8, 0],
);
let mut message = MessageBuilder::new(0, true);
message.add_additional(record);
let mut msg_bytes = message
.into_serializer()
.serialize_vec_outer()
.unwrap_or_else(|_| panic!("failed to serialize"));
let parsed = msg_bytes.parse::<Message<_>>().expect("failed to parse");
let addr: SocketAddr = (MDNS_MCAST_V4, 12).into();
assert!(make_target(addr.clone(), parsed).is_none());
}
}