blob: 2f1435fa52e18c98a342091590d82f0d429ed6a1 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::{HashMap, VecDeque};
use fidl::endpoints::{ControlHandle as _, RequestStream as _, Responder as _};
use fidl_fuchsia_net as fnet;
use fidl_fuchsia_net_ext::IntoExt;
use fidl_fuchsia_net_neighbor::{
self as fnet_neighbor, ControllerRequest, ControllerRequestStream, ViewRequest,
ViewRequestStream,
};
use fidl_fuchsia_net_neighbor_ext as fnet_neighbor_ext;
use fuchsia_zircon as zx;
use assert_matches::assert_matches;
use futures::{
channel::mpsc, task::Poll, Future, SinkExt as _, StreamExt as _, TryFutureExt as _,
TryStreamExt as _,
};
use net_types::{
ethernet::Mac,
ip::{IpAddr, IpAddress, Ipv4, Ipv6},
SpecifiedAddr, Witness as _,
};
use tracing::{error, info, warn};
use crate::bindings::{
devices::{BindingId, DeviceIdAndName},
BindingsCtx, Ctx, StackTime,
};
use netstack3_core::{
device::{DeviceId, EthernetDeviceId, EthernetLinkDevice, EthernetWeakDeviceId},
error::NotFoundError,
neighbor,
neighbor::{NeighborRemovalError, StaticNeighborInsertionError},
IpExt,
};
#[derive(Debug)]
pub(crate) struct Event {
pub(crate) id: EthernetWeakDeviceId<BindingsCtx>,
pub(crate) addr: SpecifiedAddr<IpAddr>,
pub(crate) kind: neighbor::EventKind<Mac>,
pub(crate) at: StackTime,
}
impl std::fmt::Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { id, addr, kind, at } = self;
write!(f, "neighbor event {:?} {} {:?} at {}", id.bindings_id(), addr, kind, at)
}
}
fn new_fidl_entry(
binding_id: BindingId,
addr: SpecifiedAddr<IpAddr>,
state: neighbor::EventState<Mac>,
StackTime(at): StackTime,
) -> fnet_neighbor::Entry {
let (state, mac) = match state {
neighbor::EventState::Dynamic(dynamic_state) => match dynamic_state {
neighbor::EventDynamicState::Incomplete => {
(fnet_neighbor::EntryState::Incomplete, None)
}
neighbor::EventDynamicState::Reachable(mac) => {
(fnet_neighbor::EntryState::Reachable, Some(mac))
}
neighbor::EventDynamicState::Stale(mac) => {
(fnet_neighbor::EntryState::Stale, Some(mac))
}
neighbor::EventDynamicState::Delay(mac) => {
(fnet_neighbor::EntryState::Delay, Some(mac))
}
neighbor::EventDynamicState::Probe(mac) => {
(fnet_neighbor::EntryState::Probe, Some(mac))
}
neighbor::EventDynamicState::Unreachable(mac) => {
(fnet_neighbor::EntryState::Unreachable, Some(mac))
}
},
neighbor::EventState::Static(mac) => (fnet_neighbor::EntryState::Static, Some(mac)),
};
fnet_neighbor_ext::Entry {
interface: binding_id.into(),
neighbor: addr.get().into_ext(),
state,
mac: mac.map(IntoExt::into_ext),
updated_at: at.into_nanos(),
}
.into()
}
pub(crate) struct Worker {
event_receiver: mpsc::UnboundedReceiver<Event>,
watcher_receiver: mpsc::Receiver<NewWatcher>,
}
/// Arbitrarily picked constant to limit memory consumed by queued watcher requests.
const WATCHER_CHANNEL_CAPACITY: usize = 128;
pub(crate) fn new_worker() -> (Worker, mpsc::Sender<NewWatcher>, mpsc::UnboundedSender<Event>) {
let (event_sink, event_receiver) = futures::channel::mpsc::unbounded();
let (watcher_sink, watcher_receiver) =
futures::channel::mpsc::channel(WATCHER_CHANNEL_CAPACITY);
(Worker { event_receiver, watcher_receiver }, watcher_sink, event_sink)
}
fn handle_new_watcher(
neighbor_state: &HashMap<
BindingId,
HashMap<SpecifiedAddr<IpAddr>, (neighbor::EventState<Mac>, StackTime)>,
>,
watchers: &mut futures::stream::FuturesUnordered<Watcher>,
NewWatcher { options, stream }: NewWatcher,
) {
let options = match options.try_into() {
Ok(options) => options,
Err(e) => {
warn!("failed to initialize neighbor watcher: {:?}", e);
stream.control_handle().shutdown_with_epitaph(zx::Status::INVALID_ARGS);
return;
}
};
let event_queue = EventQueue(
neighbor_state
.iter()
.map(|(binding_id, entries)| {
entries.iter().map(|(addr, (state, last_updated))| {
fnet_neighbor::EntryIteratorItem::Existing(new_fidl_entry(
*binding_id,
*addr,
*state,
*last_updated,
))
})
})
.flatten()
.chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(
fnet_neighbor::IdleEvent,
)))
.collect(),
);
watchers.push(Watcher { stream, options, event_queue, responder: None });
}
impl Worker {
pub(crate) async fn run(self) {
let Self { event_receiver: event_stream, watcher_receiver: new_watchers } = self;
let mut watchers = futures::stream::FuturesUnordered::<Watcher>::new();
let mut neighbor_state: HashMap<
_,
HashMap<SpecifiedAddr<IpAddr>, (neighbor::EventState<Mac>, StackTime)>,
> = HashMap::new();
enum SinkItem {
NewWatcher(NewWatcher),
Event(Event),
}
// Always consume events before watchers. That allows external
// observers to assume all side effects of a call are already
// applied before a watcher observes its initial existing set of
// properties.
let mut stream = futures::stream::select_with_strategy(
event_stream.map(SinkItem::Event),
new_watchers.map(SinkItem::NewWatcher),
|_: &mut ()| futures::stream::PollNext::Left,
);
enum Item {
WatcherEnded(Result<(), fidl::Error>),
SinkItem(Option<SinkItem>),
}
loop {
let item = futures::select! {
i = stream.next() => Item::SinkItem(i),
w = watchers.select_next_some() => Item::WatcherEnded(w),
complete => break,
};
match item {
Item::SinkItem(None) => {
info!("neighbor worker shutting down, waiting for watchers to end")
}
Item::WatcherEnded(r) => r.unwrap_or_else(|e| {
if !e.is_closed() {
error!("error operating neighbor watcher {:?}", e);
}
}),
Item::SinkItem(Some(SinkItem::NewWatcher(new_watcher))) => {
handle_new_watcher(&neighbor_state, &mut watchers, new_watcher);
}
Item::SinkItem(Some(SinkItem::Event(
ref event @ Event { ref id, kind, addr, at },
))) => {
// TODO(https://fxbug.dev/42086008): Add flags to the log indicating if
// the neighbor is a default gateway and if it is on-link.
info!(tag = "NUD", "{event}");
let DeviceIdAndName { id: binding_id, name: _ } = *id.bindings_id();
let entry = neighbor_state
.entry(binding_id)
.or_insert_with(|| HashMap::new())
.entry(addr);
let fidl_event = match kind {
neighbor::EventKind::Added(state) => match entry {
std::collections::hash_map::Entry::Occupied(occupied) => {
panic!(
"neighbor added but already exists: entry={:?}, event={:?}",
occupied.get(),
event
);
}
std::collections::hash_map::Entry::Vacant(vacant) => {
let _ = vacant.insert((state, at));
fnet_neighbor::EntryIteratorItem::Added(new_fidl_entry(
binding_id, addr, state, at,
))
}
},
neighbor::EventKind::Removed => match entry {
std::collections::hash_map::Entry::Vacant(_) => {
panic!("neighbor removed but not found: {event:?}");
}
std::collections::hash_map::Entry::Occupied(occupied) => {
let (state, at) = occupied.remove();
let entry = assert_matches!(
neighbor_state.entry(binding_id),
std::collections::hash_map::Entry::Occupied(o) => o
);
if entry.get().is_empty() {
let _ = entry.remove();
}
fnet_neighbor::EntryIteratorItem::Removed(new_fidl_entry(
binding_id, addr, state, at,
))
}
},
neighbor::EventKind::Changed(state) => match entry {
std::collections::hash_map::Entry::Vacant(_) => {
panic!("neighbor changed but not found: {event:?}");
}
std::collections::hash_map::Entry::Occupied(mut occupied) => {
let (current_state, _) = *occupied.get();
// NS3 core guarantees to only emit changed events if state
// actually changed.
assert_ne!(
current_state, state,
"neighbor changed but nothing changed: {event:?}",
);
let _ = occupied.insert((state, at));
fnet_neighbor::EntryIteratorItem::Changed(new_fidl_entry(
binding_id, addr, state, at,
))
}
},
};
watchers.iter_mut().for_each(|watcher| {
watcher.push(fidl_event.clone());
});
}
}
}
info!("all neighbor watchers closed, neighbor worker shutdown is complete");
}
}
#[derive(Debug)]
/// A bounded queue of [`Events`] to be returned to `fuchsia.net.neighbor/EntryIterator`.
struct EventQueue(VecDeque<fnet_neighbor::EntryIteratorItem>);
const MAX_ITEM_BATCH_SIZE: usize = fnet_neighbor::MAX_ITEM_BATCH_SIZE as usize;
// Arbitrarily-chosen maximum number of events to queue per client (4 times the
// maximum number of entries held in core per IP per interface).
const MAX_EVENTS: usize = 4 * neighbor::MAX_ENTRIES;
impl EventQueue {
fn is_empty(&self) -> bool {
let Self(event_queue) = self;
event_queue.is_empty()
}
fn push(
&mut self,
event: fnet_neighbor::EntryIteratorItem,
) -> Result<(), fnet_neighbor::EntryIteratorItem> {
let Self(event_queue) = self;
if event_queue.len() >= MAX_EVENTS {
return Err(event);
}
event_queue.push_back(event);
Ok(())
}
fn pop_max(&mut self) -> impl IntoIterator<Item = fnet_neighbor::EntryIteratorItem> + '_ {
let Self(event_queue) = self;
let count = std::cmp::min(MAX_ITEM_BATCH_SIZE, event_queue.len());
event_queue.drain(0..count)
}
}
/// The task that serves `fuchsia.net.neighbor/EntryIterator`.
///
/// The future implementation drives `stream` and responds to the requests
/// with events from `event_queue`, and completes when `stream` is exhausted
/// (possibly with an error).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub(crate) struct Watcher {
stream: fnet_neighbor::EntryIteratorRequestStream,
options: fnet_neighbor_ext::EntryIteratorOptions,
event_queue: EventQueue,
responder: Option<fnet_neighbor::EntryIteratorGetNextResponder>,
}
fn send_events(
responder: fnet_neighbor::EntryIteratorGetNextResponder,
events: &[fnet_neighbor::EntryIteratorItem],
) {
responder.send(events).unwrap_or_else(|e| {
if e.is_closed() {
warn!("neighbor watcher closed when sending event");
} else {
error!("error sending event to neighbor watcher: {e:?}");
}
})
}
impl Future for Watcher {
type Output = Result<(), fidl::Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
loop {
let next_request = self.as_mut().stream.poll_next_unpin(cx)?;
match futures::ready!(next_request) {
Some(fnet_neighbor::EntryIteratorRequest::GetNext { responder }) => {
if !self.event_queue.is_empty() {
let events = self.event_queue.pop_max().into_iter().collect::<Vec<_>>();
send_events(responder, &events);
} else {
match &self.responder {
Some(existing) => {
existing
.control_handle()
.shutdown_with_epitaph(zx::Status::ALREADY_EXISTS);
return Poll::Ready(Ok(()));
}
None => {
self.responder = Some(responder);
}
}
}
}
None => return Poll::Ready(Ok(())),
}
}
}
}
impl Watcher {
fn push(&mut self, event: fnet_neighbor::EntryIteratorItem) {
let Self {
stream,
event_queue,
responder,
options: fnet_neighbor_ext::EntryIteratorOptions {},
} = self;
match responder.take() {
Some(responder) => {
debug_assert!(event_queue.is_empty());
send_events(responder, std::slice::from_ref(&event));
}
None => {
event_queue.push(event).unwrap_or_else(|_: fnet_neighbor::EntryIteratorItem| {
warn!("too many pending events enqueued for neighbor watcher, closing channel");
stream.control_handle().shutdown();
});
}
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("neighbor worker no longer available")]
pub(crate) struct WorkerClosedError;
/// Possible errors when serving `fuchsia.net.neighbor/View`.
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("failed to send a Watcher task to parent")]
Send(#[from] WorkerClosedError),
#[error(transparent)]
Fidl(#[from] fidl::Error),
}
pub(crate) struct NewWatcher {
stream: fnet_neighbor::EntryIteratorRequestStream,
options: fnet_neighbor::EntryIteratorOptions,
}
pub(super) async fn serve_view(
stream: ViewRequestStream,
sink: mpsc::Sender<NewWatcher>,
) -> Result<(), Error> {
stream
.err_into()
.try_fold(sink, |mut sink, request| async move {
match request {
ViewRequest::OpenEntryIterator { it, options, control_handle: _ } => sink
.send(NewWatcher { stream: it.into_stream()?, options })
.await
.map_err(|_: mpsc::SendError| Error::Send(WorkerClosedError))?,
}
Ok(sink)
})
.map_ok(|_: mpsc::Sender<NewWatcher>| ())
.await
}
fn get_ethernet_id(ctx: &Ctx, interface: u64) -> Result<EthernetDeviceId<BindingsCtx>, zx::Status> {
match BindingId::new(interface)
.and_then(|id| ctx.bindings_ctx().devices.get_core_id(id))
.ok_or(zx::Status::NOT_FOUND)?
{
DeviceId::Ethernet(e) => Ok(e),
// NUD is not supported for Loopback or pure IP devices.
DeviceId::Loopback(_) | DeviceId::PureIp(_) => Err(zx::Status::NOT_SUPPORTED),
}
}
#[netstack3_core::context_ip_bounds(A::Version, BindingsCtx)]
fn add_static_entry<A: IpAddress>(
ctx: &mut Ctx,
interface: u64,
neighbor: A,
mac: fnet::MacAddress,
) -> Result<(), zx::Status>
where
A::Version: IpExt,
{
let device_id = get_ethernet_id(ctx, interface)?;
let mac = mac.into_ext();
ctx.api()
.neighbor::<A::Version, EthernetLinkDevice>()
.insert_static_entry(&device_id, neighbor, mac)
.map_err(|e| match e {
StaticNeighborInsertionError::MacAddressNotUnicast
| StaticNeighborInsertionError::IpAddressInvalid => zx::Status::INVALID_ARGS,
})
}
#[netstack3_core::context_ip_bounds(A::Version, BindingsCtx)]
fn remove_entry<A: IpAddress>(ctx: &mut Ctx, interface: u64, neighbor: A) -> Result<(), zx::Status>
where
A::Version: IpExt,
{
let device_id = get_ethernet_id(ctx, interface)?;
ctx.api()
.neighbor::<A::Version, EthernetLinkDevice>()
.remove_entry(&device_id, neighbor)
.map_err(|e| match e {
NeighborRemovalError::IpAddressInvalid => zx::Status::INVALID_ARGS,
NeighborRemovalError::NotFound(NotFoundError) => zx::Status::NOT_FOUND,
})
}
#[netstack3_core::context_ip_bounds(I, BindingsCtx)]
fn clear_entries<I: IpExt>(ctx: &mut Ctx, interface: u64) -> Result<(), zx::Status> {
let device_id = get_ethernet_id(ctx, interface)?;
Ok(ctx.api().neighbor::<I, EthernetLinkDevice>().flush_table(&device_id))
}
pub(super) async fn serve_controller(
ctx: Ctx,
stream: ControllerRequestStream,
) -> Result<(), fidl::Error> {
stream
.try_for_each(|request| async {
let mut ctx: Ctx = ctx.clone();
match request {
ControllerRequest::AddEntry { interface, neighbor, mac, responder } => {
let result = match neighbor.into_ext() {
IpAddr::V4(v4) => add_static_entry(&mut ctx, interface, v4, mac),
IpAddr::V6(v6) => add_static_entry(&mut ctx, interface, v6, mac),
};
responder.send(result.map_err(|e| e.into_raw()))
}
ControllerRequest::RemoveEntry { interface, neighbor, responder } => {
let result = match neighbor.into_ext() {
IpAddr::V4(v4) => remove_entry(&mut ctx, interface, v4),
IpAddr::V6(v6) => remove_entry(&mut ctx, interface, v6),
};
responder.send(result.map_err(|e| e.into_raw()))
}
ControllerRequest::ClearEntries { interface, ip_version, responder } => {
let result = match ip_version {
fnet::IpVersion::V4 => clear_entries::<Ipv4>(&mut ctx, interface),
fnet::IpVersion::V6 => clear_entries::<Ipv6>(&mut ctx, interface),
};
responder.send(result.map_err(|e| e.into_raw()))
}
}
})
.await
}