blob: 5b8a20b3492ca28352dc50b65fc6162ed4aee83c [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Netstack3 bindings.
//!
//! This module provides Fuchsia bindings for the [`netstack3_core`] crate.
#![deny(clippy::redundant_clone)]
#[cfg(test)]
mod integration_tests;
mod debug_fidl_worker;
mod devices;
mod filter;
mod inspect;
mod interfaces_admin;
mod interfaces_watcher;
mod name_worker;
mod neighbor_worker;
mod netdevice_worker;
mod root_fidl_worker;
mod routes;
mod socket;
mod stack_fidl_worker;
mod timers;
mod util;
mod verifier_worker;
use std::{
collections::HashMap,
convert::{Infallible as Never, TryFrom as _},
ffi::CStr,
future::Future,
num::NonZeroU16,
ops::Deref,
pin::pin,
// TODO(https://fxbug.dev/42076296): Use RC types exported from Core, after
// we make sockets reference-backed.
sync::Arc,
time::Duration,
};
use assert_matches::assert_matches;
use fidl::endpoints::{DiscoverableProtocolMarker, ProtocolMarker as _, RequestStream};
use fidl_fuchsia_hardware_network as fhardware_network;
use fidl_fuchsia_net_interfaces_admin as fnet_interfaces_admin;
use fuchsia_async as fasync;
use fuchsia_inspect::health::Reporter as _;
use fuchsia_zircon as zx;
use futures::{channel::mpsc, select, FutureExt as _, StreamExt as _};
use packet::{Buf, BufferMut};
use rand::{rngs::OsRng, CryptoRng, RngCore};
use tracing::{error, info};
use util::{ConversionContext, IntoFidl as _};
use devices::{
BindingId, DeviceIdAndName, DeviceSpecificInfo, Devices, DynamicCommonInfo,
DynamicEthernetInfo, DynamicNetdeviceInfo, EthernetInfo, LoopbackInfo, PureIpDeviceInfo,
StaticCommonInfo, StaticNetdeviceInfo,
};
use interfaces_watcher::{InterfaceEventProducer, InterfaceProperties, InterfaceUpdate};
use net_types::{
ethernet::Mac,
ip::{AddrSubnet, AddrSubnetEither, Ip, IpAddr, IpAddress, IpVersion, Ipv4, Ipv6, Mtu},
SpecifiedAddr,
};
use netstack3_core::{
device::{
DeviceId, DeviceLayerEventDispatcher, DeviceLayerStateTypes, DeviceSendFrameError,
EthernetDeviceId, LoopbackCreationProperties, LoopbackDevice, LoopbackDeviceId,
PureIpDeviceId, WeakDeviceId,
},
error::NetstackError,
filter::FilterBindingsTypes,
icmp::{IcmpEchoBindingsContext, IcmpEchoBindingsTypes, IcmpSocketId},
inspect::{InspectableValue, Inspector},
ip::{
AddIpAddrSubnetError, AddressRemovedReason, IpDeviceConfigurationUpdate, IpDeviceEvent,
Ipv4DeviceConfigurationUpdate, Ipv6DeviceConfiguration, Ipv6DeviceConfigurationUpdate,
Lifetime, SlaacConfiguration,
},
neighbor,
routes::RawMetric,
sync::RwLock as CoreRwLock,
udp::{UdpBindingsTypes, UdpReceiveBindingsContext, UdpSocketId},
EventContext, InstantBindingsTypes, InstantContext, IpExt, RngContext, StackState,
TimerBindingsTypes, TimerContext, TimerId, TracingContext,
};
mod ctx {
use super::*;
/// Provides an implementation of [`BindingsContext`].
pub(crate) struct BindingsCtx(Arc<BindingsCtxInner>);
impl Deref for BindingsCtx {
type Target = BindingsCtxInner;
fn deref(&self) -> &BindingsCtxInner {
let Self(this) = self;
this.deref()
}
}
pub(crate) struct Ctx {
// `bindings_ctx` is the first member so all strongly-held references are
// dropped before primary references held in `core_ctx` are dropped. Note
// that dropping a primary reference while holding onto strong references
// will cause a panic. See `netstack3_core::sync::PrimaryRc` for more
// details.
bindings_ctx: BindingsCtx,
core_ctx: Arc<StackState<BindingsCtx>>,
}
#[derive(Debug)]
/// Error observed while attempting to destroy the last remaining clone of `Ctx`.
pub enum DestructionError {
/// Another clone of `BindingsCtx` still exists.
BindingsCtxStillCloned(
// TODO(https://fxbug.dev/42148629): Remove or explain #[allow(dead_code)].
#[allow(dead_code)] usize,
),
/// Another clone of `CoreCtx` still exists.
CoreCtxStillCloned(
// TODO(https://fxbug.dev/42148629): Remove or explain #[allow(dead_code)].
#[allow(dead_code)] usize,
),
}
impl Ctx {
fn new(routes_change_sink: routes::ChangeSink) -> Self {
let mut bindings_ctx = BindingsCtx(Arc::new(BindingsCtxInner::new(routes_change_sink)));
let core_ctx = Arc::new(StackState::new(&mut bindings_ctx));
Self { bindings_ctx, core_ctx }
}
pub(crate) fn bindings_ctx(&self) -> &BindingsCtx {
&self.bindings_ctx
}
// Gets a new `RngImpl` as if we have an implementation of `RngContext`,
// but without needing `&mut self`.
pub(crate) fn rng(&self) -> RngImpl {
RngImpl::new()
}
/// Destroys the last standing clone of [`Ctx`].
pub(crate) fn try_destroy_last(self) -> Result<(), DestructionError> {
let Self { bindings_ctx: BindingsCtx(bindings_ctx), core_ctx } = self;
fn unwrap_and_drop_or_get_count<T>(arc: Arc<T>) -> Result<(), usize> {
match Arc::try_unwrap(arc) {
Ok(t) => Ok(std::mem::drop(t)),
Err(arc) => Err(Arc::strong_count(&arc)),
}
}
// Always destroy bindings ctx first.
unwrap_and_drop_or_get_count(bindings_ctx)
.map_err(DestructionError::BindingsCtxStillCloned)?;
unwrap_and_drop_or_get_count(core_ctx).map_err(DestructionError::CoreCtxStillCloned)
}
pub(crate) fn api(&mut self) -> netstack3_core::CoreApi<'_, &mut BindingsCtx> {
let Ctx { bindings_ctx, core_ctx } = self;
core_ctx.api(bindings_ctx)
}
}
impl Clone for Ctx {
fn clone(&self) -> Self {
let Self { bindings_ctx: BindingsCtx(inner), core_ctx } = self;
Self { bindings_ctx: BindingsCtx(inner.clone()), core_ctx: core_ctx.clone() }
}
}
/// Contains the information needed to start serving a network stack over FIDL.
pub(crate) struct NetstackSeed {
pub(crate) netstack: Netstack,
pub(crate) interfaces_worker: interfaces_watcher::Worker,
pub(crate) interfaces_watcher_sink: interfaces_watcher::WorkerWatcherSink,
pub(crate) routes_change_runner: routes::ChangeRunner,
pub(crate) neighbor_worker: neighbor_worker::Worker,
pub(crate) neighbor_watcher_sink: mpsc::Sender<neighbor_worker::NewWatcher>,
}
impl Default for NetstackSeed {
fn default() -> Self {
let (interfaces_worker, interfaces_watcher_sink, interfaces_event_sink) =
interfaces_watcher::Worker::new();
let (routes_change_sink, routes_change_runner) = routes::create_sink_and_runner();
let ctx = Ctx::new(routes_change_sink);
let (neighbor_worker, neighbor_watcher_sink, neighbor_event_sink) =
neighbor_worker::new_worker();
Self {
netstack: Netstack { ctx, interfaces_event_sink, neighbor_event_sink },
interfaces_worker,
interfaces_watcher_sink,
routes_change_runner,
neighbor_worker,
neighbor_watcher_sink,
}
}
}
}
pub(crate) use ctx::{BindingsCtx, Ctx, NetstackSeed};
use crate::bindings::{interfaces_watcher::AddressPropertiesUpdate, util::TaskWaitGroup};
/// Extends the methods available to [`DeviceId`].
trait DeviceIdExt {
/// Returns the state associated with devices.
fn external_state(&self) -> DeviceSpecificInfo<'_>;
}
impl DeviceIdExt for DeviceId<BindingsCtx> {
fn external_state(&self) -> DeviceSpecificInfo<'_> {
match self {
DeviceId::Ethernet(d) => DeviceSpecificInfo::Ethernet(d.external_state()),
DeviceId::Loopback(d) => DeviceSpecificInfo::Loopback(d.external_state()),
DeviceId::PureIp(d) => DeviceSpecificInfo::PureIp(d.external_state()),
}
}
}
/// Extends the methods available to [`Lifetime`].
trait LifetimeExt {
/// Converts `self` to `zx::Time`.
fn into_zx_time(self) -> zx::Time;
}
impl LifetimeExt for Lifetime<StackTime> {
fn into_zx_time(self) -> zx::Time {
match self {
Lifetime::Finite(StackTime(time)) => time.into_zx(),
Lifetime::Infinite => zx::Time::INFINITE,
}
}
}
const LOOPBACK_NAME: &'static str = "lo";
/// Default MTU for loopback.
///
/// This value is also the default value used on Linux. As of writing:
///
/// ```shell
/// $ ip link show dev lo
/// 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000
/// link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
/// ```
const DEFAULT_LOOPBACK_MTU: Mtu = Mtu::new(65536);
/// Default routing metric for newly created interfaces, if unspecified.
///
/// The value is currently kept in sync with the Netstack2 implementation.
const DEFAULT_INTERFACE_METRIC: u32 = 100;
pub(crate) struct BindingsCtxInner {
timers: timers::TimerDispatcher<TimerId<BindingsCtx>>,
devices: Devices<DeviceId<BindingsCtx>>,
routes: routes::ChangeSink,
}
impl BindingsCtxInner {
fn new(routes_change_sink: routes::ChangeSink) -> Self {
Self { timers: Default::default(), devices: Default::default(), routes: routes_change_sink }
}
}
impl AsRef<Devices<DeviceId<BindingsCtx>>> for BindingsCtx {
fn as_ref(&self) -> &Devices<DeviceId<BindingsCtx>> {
&self.devices
}
}
impl<D> ConversionContext for D
where
D: AsRef<Devices<DeviceId<BindingsCtx>>>,
{
fn get_core_id(&self, binding_id: BindingId) -> Option<DeviceId<BindingsCtx>> {
self.as_ref().get_core_id(binding_id)
}
fn get_binding_id(&self, core_id: DeviceId<BindingsCtx>) -> BindingId {
core_id.bindings_id().id
}
}
/// A thin wrapper around `fuchsia_async::Time` that implements `core::Instant`.
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]
pub(crate) struct StackTime(fasync::Time);
impl netstack3_core::Instant for StackTime {
fn duration_since(&self, earlier: StackTime) -> Duration {
assert!(self.0 >= earlier.0);
// guaranteed not to panic because the assertion ensures that the
// difference is non-negative, and all non-negative i64 values are also
// valid u64 values
Duration::from_nanos(u64::try_from(self.0.into_nanos() - earlier.0.into_nanos()).unwrap())
}
fn saturating_duration_since(&self, earlier: StackTime) -> Duration {
// Guaranteed not to panic because we are doing a saturating
// subtraction, which means the difference will be >= 0, and all i64
// values >=0 are also valid u64 values.
Duration::from_nanos(
u64::try_from(self.0.into_nanos().saturating_sub(earlier.0.into_nanos())).unwrap(),
)
}
fn checked_add(&self, duration: Duration) -> Option<StackTime> {
Some(StackTime(fasync::Time::from_nanos(
self.0.into_nanos().checked_add(i64::try_from(duration.as_nanos()).ok()?)?,
)))
}
fn checked_sub(&self, duration: Duration) -> Option<StackTime> {
Some(StackTime(fasync::Time::from_nanos(
self.0.into_nanos().checked_sub(i64::try_from(duration.as_nanos()).ok()?)?,
)))
}
}
impl std::fmt::Display for StackTime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self(time) = *self;
write!(f, "{:.6}", time.into_nanos() as f64 / 1_000_000_000f64)
}
}
impl InspectableValue for StackTime {
fn record<I: Inspector>(&self, name: &str, inspector: &mut I) {
let Self(inner) = self;
inspector.record_int(name, inner.into_nanos())
}
}
impl InstantBindingsTypes for BindingsCtx {
type Instant = StackTime;
}
impl InstantContext for BindingsCtx {
fn now(&self) -> StackTime {
StackTime(fasync::Time::now())
}
}
impl TracingContext for BindingsCtx {
type DurationScope = fuchsia_trace::DurationScope<'static>;
fn duration(&self, name: &'static CStr) -> fuchsia_trace::DurationScope<'static> {
fuchsia_trace::duration(c"net", name, &[])
}
}
/// Convenience wrapper around the [`fuchsia_trace::duration`] macro that always
/// uses the "net" tracing category.
///
/// [`fuchsia_trace::duration`] uses RAII to begin and end the duration by tying
/// the scope of the duration to the lifetime of the object it returns. This
/// macro encapsulates that logic such that the trace duration will end when the
/// scope in which the macro is called ends.
macro_rules! trace_duration {
($name:expr) => {
fuchsia_trace::duration!(c"net", $name);
};
}
pub(crate) use trace_duration;
impl FilterBindingsTypes for BindingsCtx {
type DeviceClass = fidl_fuchsia_net_filter::DeviceClass;
}
#[derive(Default)]
pub(crate) struct RngImpl;
impl RngImpl {
fn new() -> Self {
// A change detector in case OsRng is no longer a ZST and we should keep
// state for it inside RngImpl.
let OsRng {} = OsRng::default();
RngImpl {}
}
}
/// [`RngCore`] for `RngImpl` relies entirely on the operating system to
/// generate random numbers and it needs not keep any state itself.
///
/// [`OsRng`] is a zero-sized type that provides randomness from the OS.
impl RngCore for RngImpl {
fn next_u32(&mut self) -> u32 {
OsRng::default().next_u32()
}
fn next_u64(&mut self) -> u64 {
OsRng::default().next_u64()
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
OsRng::default().fill_bytes(dest)
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
OsRng::default().try_fill_bytes(dest)
}
}
impl CryptoRng for RngImpl where OsRng: CryptoRng {}
impl RngContext for BindingsCtx {
type Rng<'a> = RngImpl;
fn rng(&mut self) -> RngImpl {
RngImpl::new()
}
}
impl TimerBindingsTypes for BindingsCtx {
type Timer = timers::Timer<TimerId<BindingsCtx>>;
type DispatchId = TimerId<BindingsCtx>;
}
impl TimerContext for BindingsCtx {
fn new_timer(&mut self, id: Self::DispatchId) -> Self::Timer {
self.timers.new_timer(id)
}
fn schedule_timer_instant(
&mut self,
StackTime(time): Self::Instant,
timer: &mut Self::Timer,
) -> Option<Self::Instant> {
timer.schedule(time).map(Into::into)
}
fn cancel_timer(&mut self, timer: &mut Self::Timer) -> Option<Self::Instant> {
timer.cancel().map(Into::into)
}
fn scheduled_instant(&self, timer: &mut Self::Timer) -> Option<Self::Instant> {
timer.scheduled_time().map(Into::into)
}
}
impl DeviceLayerStateTypes for BindingsCtx {
type LoopbackDeviceState = LoopbackInfo;
type EthernetDeviceState = EthernetInfo;
type PureIpDeviceState = PureIpDeviceInfo;
type DeviceIdentifier = DeviceIdAndName;
}
impl DeviceLayerEventDispatcher for BindingsCtx {
fn wake_rx_task(&mut self, device: &LoopbackDeviceId<Self>) {
let LoopbackInfo { static_common_info: _, dynamic_common_info: _, rx_notifier } =
device.external_state();
rx_notifier.schedule()
}
fn wake_tx_task(&mut self, device: &DeviceId<BindingsCtx>) {
let external_state = device.external_state();
let StaticCommonInfo { tx_notifier, authorization_token: _ } =
external_state.static_common_info();
tx_notifier.schedule()
}
fn send_ethernet_frame(
&mut self,
device: &EthernetDeviceId<Self>,
frame: Buf<Vec<u8>>,
) -> Result<(), DeviceSendFrameError<Buf<Vec<u8>>>> {
let EthernetInfo { mac: _, _mac_proxy: _, common_info: _, netdevice, dynamic_info } =
device.external_state();
let dynamic_info = dynamic_info.read();
send_netdevice_frame(
netdevice,
&dynamic_info.netdevice,
frame,
fhardware_network::FrameType::Ethernet,
)
}
fn send_ip_packet(
&mut self,
device: &PureIpDeviceId<Self>,
packet: Buf<Vec<u8>>,
ip_version: IpVersion,
) -> Result<(), DeviceSendFrameError<Buf<Vec<u8>>>> {
let frame_type = match ip_version {
IpVersion::V4 => fhardware_network::FrameType::Ipv4,
IpVersion::V6 => fhardware_network::FrameType::Ipv6,
};
let PureIpDeviceInfo { common_info: _, netdevice, dynamic_info } = device.external_state();
let dynamic_info = dynamic_info.read();
send_netdevice_frame(netdevice, &dynamic_info, packet, frame_type)
}
}
/// Send a frame on a Netdevice backed device.
fn send_netdevice_frame(
StaticNetdeviceInfo { handler }: &StaticNetdeviceInfo,
DynamicNetdeviceInfo {
phy_up,
common_info:
DynamicCommonInfo { admin_enabled, mtu: _, events: _, control_hook: _, addresses: _ },
}: &DynamicNetdeviceInfo,
frame: Buf<Vec<u8>>,
frame_type: fhardware_network::FrameType,
) -> Result<(), DeviceSendFrameError<Buf<Vec<u8>>>> {
if *phy_up && *admin_enabled {
handler
.send(frame.as_ref(), frame_type)
.unwrap_or_else(|e| tracing::warn!("failed to send frame to {:?}: {:?}", handler, e))
}
Ok(())
}
impl<I: IpExt> IcmpEchoBindingsContext<I, DeviceId<BindingsCtx>> for BindingsCtx {
fn receive_icmp_echo_reply<B: BufferMut>(
&mut self,
conn: &IcmpSocketId<I, WeakDeviceId<BindingsCtx>, BindingsCtx>,
device: &DeviceId<BindingsCtx>,
src_ip: I::Addr,
dst_ip: I::Addr,
id: u16,
data: B,
) {
conn.external_data().receive_icmp_echo_reply(device, src_ip, dst_ip, id, data)
}
}
impl IcmpEchoBindingsTypes for BindingsCtx {
type ExternalData<I: Ip> = socket::datagram::DatagramSocketExternalData<I>;
}
impl<I: IpExt> UdpReceiveBindingsContext<I, DeviceId<BindingsCtx>> for BindingsCtx {
fn receive_udp<B: BufferMut>(
&mut self,
id: &UdpSocketId<I, WeakDeviceId<BindingsCtx>, BindingsCtx>,
device: &DeviceId<BindingsCtx>,
dst_addr: (<I>::Addr, NonZeroU16),
src_addr: (<I>::Addr, Option<NonZeroU16>),
body: &B,
) {
id.external_data().receive_udp(device, dst_addr, src_addr, body)
}
}
impl UdpBindingsTypes for BindingsCtx {
type ExternalData<I: Ip> = socket::datagram::DatagramSocketExternalData<I>;
}
impl<I: Ip> EventContext<IpDeviceEvent<DeviceId<BindingsCtx>, I, StackTime>> for BindingsCtx {
fn on_event(&mut self, event: IpDeviceEvent<DeviceId<BindingsCtx>, I, StackTime>) {
match event {
IpDeviceEvent::AddressAdded { device, addr, state, valid_until } => {
let valid_until = valid_until.into_zx_time();
self.notify_interface_update(
&device,
InterfaceUpdate::AddressAdded {
addr: addr.into(),
assignment_state: state,
valid_until,
},
);
self.notify_address_update(&device, addr.addr().into(), state);
}
IpDeviceEvent::AddressRemoved { device, addr, reason } => {
self.notify_interface_update(
&device,
InterfaceUpdate::AddressRemoved(addr.to_ip_addr()),
);
match reason {
AddressRemovedReason::Manual => (),
AddressRemovedReason::DadFailed => self.notify_dad_failed(&device, addr.into()),
}
}
IpDeviceEvent::AddressStateChanged { device, addr, state } => {
self.notify_interface_update(
&device,
InterfaceUpdate::AddressAssignmentStateChanged {
addr: addr.to_ip_addr(),
new_state: state,
},
);
self.notify_address_update(&device, addr.into(), state);
}
IpDeviceEvent::EnabledChanged { device, ip_enabled } => {
self.notify_interface_update(&device, InterfaceUpdate::OnlineChanged(ip_enabled))
}
IpDeviceEvent::AddressPropertiesChanged { device, addr, valid_until } => self
.notify_interface_update(
&device,
InterfaceUpdate::AddressPropertiesChanged {
addr: addr.to_ip_addr(),
update: AddressPropertiesUpdate { valid_until: valid_until.into_zx_time() },
},
),
};
}
}
impl<I: Ip> EventContext<netstack3_core::ip::IpLayerEvent<DeviceId<BindingsCtx>, I>>
for BindingsCtx
{
fn on_event(&mut self, event: netstack3_core::ip::IpLayerEvent<DeviceId<BindingsCtx>, I>) {
// Core dispatched a routes-change request but has no expectation of
// observing the result, so we just discard the result receiver.
match event {
netstack3_core::ip::IpLayerEvent::AddRoute(entry) => {
self.routes.fire_change_and_forget(routes::Change::RouteOp(
routes::RouteOp::Add(entry.map_device_id(|d| d.downgrade())),
routes::SetMembership::CoreNdp,
))
}
netstack3_core::ip::IpLayerEvent::RemoveRoutes { subnet, device, gateway } => {
self.routes.fire_change_and_forget(routes::Change::RouteOp(
routes::RouteOp::RemoveMatching {
subnet,
device: device.downgrade(),
gateway,
metric: None,
},
routes::SetMembership::CoreNdp,
))
}
};
}
}
impl<I: Ip> EventContext<neighbor::Event<Mac, EthernetDeviceId<Self>, I, StackTime>>
for BindingsCtx
{
fn on_event(
&mut self,
neighbor::Event { device, kind, addr, at }: neighbor::Event<
Mac,
EthernetDeviceId<Self>,
I,
StackTime,
>,
) {
device.external_state().with_dynamic_info(|i| {
i.neighbor_event_sink
.unbounded_send(neighbor_worker::Event {
id: device.downgrade(),
kind,
addr: addr.into(),
at,
})
.expect("should be able to send neighbor event")
})
}
}
/// Implements `RcNotifier` for futures oneshot channels.
///
/// We need a newtype here because of orphan rules.
pub(crate) struct ReferenceNotifier<T>(Option<futures::channel::oneshot::Sender<T>>);
impl<T: Send> netstack3_core::sync::RcNotifier<T> for ReferenceNotifier<T> {
fn notify(&mut self, data: T) {
let Self(inner) = self;
inner.take().expect("notified twice").send(data).unwrap_or_else(|_: T| {
panic!(
"receiver was dropped before notifying for {}",
// Print the type name so we don't need Debug bounds.
core::any::type_name::<T>()
)
})
}
}
impl netstack3_core::ReferenceNotifiers for BindingsCtx {
type ReferenceReceiver<T: 'static> = futures::channel::oneshot::Receiver<T>;
type ReferenceNotifier<T: Send + 'static> = ReferenceNotifier<T>;
fn new_reference_notifier<T: Send + 'static, D: std::fmt::Debug>(
_debug_references: D,
) -> (Self::ReferenceNotifier<T>, Self::ReferenceReceiver<T>) {
let (sender, receiver) = futures::channel::oneshot::channel();
(ReferenceNotifier(Some(sender)), receiver)
}
}
impl BindingsCtx {
fn notify_interface_update(&self, device: &DeviceId<BindingsCtx>, event: InterfaceUpdate) {
device
.external_state()
.with_common_info(|i| i.events.notify(event).expect("interfaces worker closed"));
}
/// Notify `AddressStateProvider.WatchAddressAssignmentState` watchers.
fn notify_address_update(
&self,
device: &DeviceId<BindingsCtx>,
address: SpecifiedAddr<IpAddr>,
state: netstack3_core::ip::IpAddressState,
) {
// Note that not all addresses have an associated watcher (e.g. loopback
// address & autoconfigured SLAAC addresses).
device.external_state().with_common_info(|i| {
if let Some(address_info) = i.addresses.get(&address) {
address_info
.assignment_state_sender
.unbounded_send(state.into_fidl())
.expect("assignment state receiver unexpectedly disconnected");
}
})
}
fn notify_dad_failed(
&mut self,
device: &DeviceId<BindingsCtx>,
address: SpecifiedAddr<IpAddr>,
) {
device.external_state().with_common_info_mut(|i| {
if let Some(address_info) = i.addresses.get_mut(&address) {
let devices::FidlWorkerInfo { worker: _, cancelation_sender } =
&mut address_info.address_state_provider;
if let Some(sender) = cancelation_sender.take() {
sender
.send(interfaces_admin::AddressStateProviderCancellationReason::DadFailed)
.expect("assignment state receiver unexpectedly disconnected");
}
}
})
}
pub(crate) async fn apply_route_change<I: Ip>(
&self,
change: routes::Change<I::Addr>,
) -> Result<routes::ChangeOutcome, routes::Error> {
self.routes.send_change(change).await
}
pub(crate) async fn apply_route_change_either(
&self,
change: routes::ChangeEither,
) -> Result<routes::ChangeOutcome, routes::Error> {
match change {
routes::ChangeEither::V4(change) => self.apply_route_change::<Ipv4>(change).await,
routes::ChangeEither::V6(change) => self.apply_route_change::<Ipv6>(change).await,
}
}
pub(crate) async fn remove_routes_on_device(
&self,
device: &netstack3_core::device::WeakDeviceId<Self>,
) {
match self
.apply_route_change::<Ipv4>(routes::Change::RemoveMatchingDevice(device.clone()))
.await
.expect("deleting routes on device during removal should succeed")
{
routes::ChangeOutcome::Changed | routes::ChangeOutcome::NoChange => {
// We don't care whether there were any routes on the device or not.
}
}
match self
.apply_route_change::<Ipv6>(routes::Change::RemoveMatchingDevice(device.clone()))
.await
.expect("deleting routes on device during removal should succeed")
{
routes::ChangeOutcome::Changed | routes::ChangeOutcome::NoChange => {
// We don't care whether there were any routes on the device or not.
}
}
}
}
fn add_loopback_ip_addrs(
ctx: &mut Ctx,
loopback: &DeviceId<BindingsCtx>,
) -> Result<(), NetstackError> {
for addr_subnet in [
AddrSubnetEither::V4(
AddrSubnet::from_witness(Ipv4::LOOPBACK_ADDRESS, Ipv4::LOOPBACK_SUBNET.prefix())
.expect("error creating IPv4 loopback AddrSub"),
),
AddrSubnetEither::V6(
AddrSubnet::from_witness(Ipv6::LOOPBACK_ADDRESS, Ipv6::LOOPBACK_SUBNET.prefix())
.expect("error creating IPv6 loopback AddrSub"),
),
] {
ctx.api().device_ip_any().add_ip_addr_subnet(loopback, addr_subnet).map_err(
|e| match e {
AddIpAddrSubnetError::Exists => NetstackError::Exists,
AddIpAddrSubnetError::InvalidAddr => {
panic!("loopback address should not be invalid")
}
},
)?
}
Ok(())
}
/// Adds the IPv4 and IPv6 Loopback and multicast subnet routes, and the IPv4
/// limited broadcast subnet route.
async fn add_loopback_routes(bindings_ctx: &BindingsCtx, loopback: &DeviceId<BindingsCtx>) {
use netstack3_core::routes::{AddableEntry, AddableMetric};
let v4_changes = [
AddableEntry::without_gateway(
Ipv4::LOOPBACK_SUBNET,
loopback.downgrade(),
AddableMetric::MetricTracksInterface,
),
AddableEntry::without_gateway(
Ipv4::MULTICAST_SUBNET,
loopback.downgrade(),
AddableMetric::MetricTracksInterface,
),
]
.into_iter()
.map(|entry| {
routes::Change::RouteOp(routes::RouteOp::Add(entry), routes::SetMembership::Loopback)
})
.map(Into::into);
let v6_changes = [
AddableEntry::without_gateway(
Ipv6::LOOPBACK_SUBNET,
loopback.downgrade(),
AddableMetric::MetricTracksInterface,
),
AddableEntry::without_gateway(
Ipv6::MULTICAST_SUBNET,
loopback.downgrade(),
AddableMetric::MetricTracksInterface,
),
]
.into_iter()
.map(|entry| {
routes::Change::RouteOp(routes::RouteOp::Add(entry), routes::SetMembership::Loopback)
})
.map(Into::into);
for change in v4_changes.chain(v6_changes) {
bindings_ctx
.apply_route_change_either(change)
.await
.map(|outcome| assert_matches!(outcome, routes::ChangeOutcome::Changed))
.expect("adding loopback routes should succeed");
}
}
/// The netstack.
///
/// Provides the entry point for creating a netstack to be served as a
/// component.
#[derive(Clone)]
pub(crate) struct Netstack {
ctx: Ctx,
interfaces_event_sink: interfaces_watcher::WorkerInterfaceSink,
neighbor_event_sink: mpsc::UnboundedSender<neighbor_worker::Event>,
}
fn create_interface_event_producer(
interfaces_event_sink: &crate::bindings::interfaces_watcher::WorkerInterfaceSink,
id: BindingId,
properties: InterfaceProperties,
) -> InterfaceEventProducer {
interfaces_event_sink.add_interface(id, properties).expect("interface worker not running")
}
impl Netstack {
fn create_interface_event_producer(
&self,
id: BindingId,
properties: InterfaceProperties,
) -> InterfaceEventProducer {
create_interface_event_producer(&self.interfaces_event_sink, id, properties)
}
async fn add_loopback(
&mut self,
) -> (
futures::channel::oneshot::Sender<fnet_interfaces_admin::InterfaceRemovedReason>,
BindingId,
[NamedTask; 3],
) {
// Add and initialize the loopback interface with the IPv4 and IPv6
// loopback addresses and on-link routes to the loopback subnets.
let devices: &Devices<_> = self.ctx.bindings_ctx().as_ref();
let (control_sender, control_receiver) =
interfaces_admin::OwnedControlHandle::new_channel();
let loopback_rx_notifier = Default::default();
let binding_id = devices.alloc_new_id();
let events = self.create_interface_event_producer(
binding_id,
InterfaceProperties {
name: LOOPBACK_NAME.to_string(),
device_class: fidl_fuchsia_net_interfaces::DeviceClass::Loopback(
fidl_fuchsia_net_interfaces::Empty {},
),
},
);
events.notify(InterfaceUpdate::OnlineChanged(true)).expect("interfaces worker not running");
let loopback_info = LoopbackInfo {
static_common_info: StaticCommonInfo {
tx_notifier: Default::default(),
authorization_token: zx::Event::create(),
},
dynamic_common_info: CoreRwLock::new(DynamicCommonInfo {
mtu: DEFAULT_LOOPBACK_MTU,
admin_enabled: true,
events,
control_hook: control_sender,
addresses: HashMap::new(),
}),
rx_notifier: loopback_rx_notifier,
};
let loopback = self.ctx.api().device::<LoopbackDevice>().add_device(
DeviceIdAndName { id: binding_id, name: LOOPBACK_NAME.to_string() },
LoopbackCreationProperties { mtu: DEFAULT_LOOPBACK_MTU },
RawMetric(DEFAULT_INTERFACE_METRIC),
loopback_info,
);
let LoopbackInfo { static_common_info: _, dynamic_common_info: _, rx_notifier } =
loopback.external_state();
let rx_task =
crate::bindings::devices::spawn_rx_task(rx_notifier, self.ctx.clone(), &loopback);
let binding_id = loopback.bindings_id().id;
let loopback: DeviceId<_> = loopback.into();
let external_state = loopback.external_state();
let StaticCommonInfo { tx_notifier, authorization_token: _ } =
external_state.static_common_info();
self.ctx.bindings_ctx().devices.add_device(binding_id, loopback.clone());
let tx_task = crate::bindings::devices::spawn_tx_task(
tx_notifier,
self.ctx.clone(),
loopback.clone(),
);
// Don't need DAD and IGMP/MLD on loopback.
let ip_config = IpDeviceConfigurationUpdate {
ip_enabled: Some(true),
forwarding_enabled: Some(false),
gmp_enabled: Some(false),
};
let _: Ipv4DeviceConfigurationUpdate = self
.ctx
.api()
.device_ip::<Ipv4>()
.update_configuration(&loopback, Ipv4DeviceConfigurationUpdate { ip_config })
.unwrap();
let _: Ipv6DeviceConfigurationUpdate = self
.ctx
.api()
.device_ip::<Ipv6>()
.update_configuration(
&loopback,
Ipv6DeviceConfigurationUpdate {
dad_transmits: Some(None),
max_router_solicitations: Some(None),
slaac_config: Some(SlaacConfiguration {
enable_stable_addresses: true,
temporary_address_configuration: None,
}),
ip_config,
},
)
.unwrap();
add_loopback_ip_addrs(&mut self.ctx, &loopback).expect("error adding loopback addresses");
add_loopback_routes(self.ctx.bindings_ctx(), &loopback).await;
let (stop_sender, stop_receiver) = futures::channel::oneshot::channel();
// Loopback interface can't be removed.
let removable = false;
// Loopback doesn't have a defined state stream, provide a stream that
// never yields anything.
let state_stream = futures::stream::pending();
let control_task = fuchsia_async::Task::spawn(interfaces_admin::run_interface_control(
self.ctx.clone(),
binding_id,
stop_receiver,
control_receiver,
removable,
state_stream,
));
(
stop_sender,
binding_id,
[
NamedTask::new("loopback control", control_task),
NamedTask::new("loopback tx", tx_task),
NamedTask::new("loopback rx", rx_task),
],
)
}
}
pub(crate) enum Service {
DnsServerWatcher(fidl_fuchsia_net_name::DnsServerWatcherRequestStream),
DebugDiagnostics(fidl::endpoints::ServerEnd<fidl_fuchsia_net_debug::DiagnosticsMarker>),
DebugInterfaces(fidl_fuchsia_net_debug::InterfacesRequestStream),
FilterControl(fidl_fuchsia_net_filter::ControlRequestStream),
FilterState(fidl_fuchsia_net_filter::StateRequestStream),
Interfaces(fidl_fuchsia_net_interfaces::StateRequestStream),
InterfacesAdmin(fidl_fuchsia_net_interfaces_admin::InstallerRequestStream),
NeighborController(fidl_fuchsia_net_neighbor::ControllerRequestStream),
Neighbor(fidl_fuchsia_net_neighbor::ViewRequestStream),
PacketSocket(fidl_fuchsia_posix_socket_packet::ProviderRequestStream),
RawSocket(fidl_fuchsia_posix_socket_raw::ProviderRequestStream),
RootInterfaces(fidl_fuchsia_net_root::InterfacesRequestStream),
RoutesState(fidl_fuchsia_net_routes::StateRequestStream),
RoutesStateV4(fidl_fuchsia_net_routes::StateV4RequestStream),
RoutesStateV6(fidl_fuchsia_net_routes::StateV6RequestStream),
RoutesAdminV4(fidl_fuchsia_net_routes_admin::RouteTableV4RequestStream),
RoutesAdminV6(fidl_fuchsia_net_routes_admin::RouteTableV6RequestStream),
RootRoutesV4(fidl_fuchsia_net_root::RoutesV4RequestStream),
RootRoutesV6(fidl_fuchsia_net_root::RoutesV6RequestStream),
Socket(fidl_fuchsia_posix_socket::ProviderRequestStream),
Stack(fidl_fuchsia_net_stack::StackRequestStream),
Verifier(fidl_fuchsia_update_verify::NetstackVerifierRequestStream),
}
trait RequestStreamExt: RequestStream {
fn serve_with<F, Fut, E>(self, f: F) -> futures::future::Map<Fut, fn(Result<(), E>) -> ()>
where
E: std::error::Error,
F: FnOnce(Self) -> Fut,
Fut: Future<Output = Result<(), E>>;
}
impl<D: DiscoverableProtocolMarker, S: RequestStream<Protocol = D>> RequestStreamExt for S {
fn serve_with<F, Fut, E>(self, f: F) -> futures::future::Map<Fut, fn(Result<(), E>) -> ()>
where
E: std::error::Error,
F: FnOnce(Self) -> Fut,
Fut: Future<Output = Result<(), E>>,
{
f(self).map(|res| res.unwrap_or_else(|err| error!("{} error: {}", D::PROTOCOL_NAME, err)))
}
}
/// A helper struct to have named tasks.
///
/// Tasks are already tracked in the executor by spawn location, but long
/// running tasks are not expected to terminate except during clean shutdown.
/// Naming these helps root cause debug assertions.
#[derive(Debug)]
pub(crate) struct NamedTask {
name: &'static str,
task: fuchsia_async::Task<()>,
}
impl NamedTask {
/// Creates a new named task from `fut` with `name`.
#[track_caller]
fn spawn(name: &'static str, fut: impl futures::Future<Output = ()> + Send + 'static) -> Self {
Self { name, task: fuchsia_async::Task::spawn(fut) }
}
fn new(name: &'static str, task: fuchsia_async::Task<()>) -> Self {
Self { name, task }
}
fn into_future(self) -> impl futures::Future<Output = &'static str> + Send + 'static {
let Self { name, task } = self;
task.map(move |()| name)
}
}
impl NetstackSeed {
/// Consumes the netstack and starts serving all the FIDL services it
/// implements to the outgoing service directory.
pub(crate) async fn serve<S: futures::Stream<Item = Service>>(
self,
services: S,
inspector: &fuchsia_inspect::Inspector,
) {
info!("serving netstack with netstack3");
let Self {
mut netstack,
interfaces_worker,
interfaces_watcher_sink,
mut routes_change_runner,
neighbor_worker,
neighbor_watcher_sink,
} = self;
// Start servicing timers.
let mut timer_handler_ctx = netstack.ctx.clone();
let timers_task = NamedTask::new(
"timers",
netstack.ctx.bindings_ctx().timers.spawn(move |timer| {
timer_handler_ctx.api().handle_timer(timer);
}),
);
let (route_update_dispatcher_v4, route_update_dispatcher_v6) =
routes_change_runner.route_update_dispatchers();
// Start executing routes changes.
let routes_change_task = NamedTask::spawn("routes_changes", {
let ctx = netstack.ctx.clone();
async move { routes_change_runner.run(ctx).await }
});
let routes_change_task_fut = routes_change_task.into_future().fuse();
let mut routes_change_task_fut = pin!(routes_change_task_fut);
let (loopback_stopper, _, loopback_tasks): (
futures::channel::oneshot::Sender<_>,
BindingId,
_,
) = netstack.add_loopback().await;
let interfaces_worker_task = NamedTask::spawn("interfaces worker", async move {
let result = interfaces_worker.run().await;
let watchers = result.expect("interfaces worker ended with an error");
info!("interfaces worker shutting down, waiting for watchers to end");
watchers
.map(|res| match res {
Ok(()) => (),
Err(e) => {
if !e.is_closed() {
tracing::error!("error {e:?} collecting watchers");
}
}
})
.collect::<()>()
.await;
info!("all interface watchers closed, interfaces worker shutdown is complete");
});
let neighbor_worker_task = NamedTask::spawn("neighbor worker", neighbor_worker.run());
let no_finish_tasks = loopback_tasks.into_iter().chain([
interfaces_worker_task,
timers_task,
neighbor_worker_task,
]);
let mut no_finish_tasks = futures::stream::FuturesUnordered::from_iter(
no_finish_tasks.map(NamedTask::into_future),
);
let unexpected_early_finish_fut = async {
let no_finish_tasks_fut = no_finish_tasks.by_ref().next().fuse();
let mut no_finish_tasks_fut = pin!(no_finish_tasks_fut);
let name = select! {
name = no_finish_tasks_fut => name,
name = routes_change_task_fut => Some(name),
};
match name {
Some(name) => panic!("task {name} ended unexpectedly"),
None => panic!("unexpected end of infinite task stream"),
}
}
.fuse();
let inspect_nodes = {
// The presence of the health check node is useful even though the
// status will always be OK because the same node exists
// in NS2 and this helps for test assertions to guard against
// issues such as https://fxbug.dev/326510415.
let mut health = fuchsia_inspect::health::Node::new(inspector.root());
health.set_ok();
let socket_ctx = netstack.ctx.clone();
let sockets = inspector.root().create_lazy_child("Sockets", move || {
futures::future::ok(inspect::sockets(&mut socket_ctx.clone())).boxed()
});
let routes_ctx = netstack.ctx.clone();
let routes = inspector.root().create_lazy_child("Routes", move || {
futures::future::ok(inspect::routes(&mut routes_ctx.clone())).boxed()
});
let devices_ctx = netstack.ctx.clone();
let devices = inspector.root().create_lazy_child("Devices", move || {
futures::future::ok(inspect::devices(&mut devices_ctx.clone())).boxed()
});
let neighbors_ctx = netstack.ctx.clone();
let neighbors = inspector.root().create_lazy_child("Neighbors", move || {
futures::future::ok(inspect::neighbors(neighbors_ctx.clone())).boxed()
});
let counters_ctx = netstack.ctx.clone();
let counters = inspector.root().create_lazy_child("Counters", move || {
futures::future::ok(inspect::counters(&mut counters_ctx.clone())).boxed()
});
let filter_ctx = netstack.ctx.clone();
let filtering_state =
inspector.root().create_lazy_child("Filtering State", move || {
futures::future::ok(inspect::filtering_state(&mut filter_ctx.clone())).boxed()
});
(health, sockets, routes, devices, neighbors, counters, filtering_state)
};
let diagnostics_handler = debug_fidl_worker::DiagnosticsHandler::default();
// Insert a stream after services to get a helpful log line when it
// completes. The future we create from it will still wait for all the
// user-created resources to be joined on before returning.
let services =
services.chain(futures::stream::poll_fn(|_: &mut std::task::Context<'_>| {
info!("services stream ended");
std::task::Poll::Ready(None)
}));
// Keep a clone of Ctx around for teardown before moving it to the
// services future.
let teardown_ctx = netstack.ctx.clone();
// Use a reference to the watcher sink in the services loop.
let interfaces_watcher_sink_ref = &interfaces_watcher_sink;
let neighbor_watcher_sink_ref = &neighbor_watcher_sink;
let (route_set_waitgroup, route_set_spawner) = TaskWaitGroup::new();
let filter_update_dispatcher = filter::UpdateDispatcher::default();
// It is unclear why we need to wrap the `for_each_concurrent` call with
// `async move { ... }` but it seems like we do. Without this, the
// `Future` returned by this function fails to implement `Send` with the
// same issue reported in https://github.com/rust-lang/rust/issues/64552.
//
// TODO(https://github.com/rust-lang/rust/issues/64552): Remove this
// workaround.
let services_fut = async move {
services
.for_each_concurrent(None, |s| async {
match s {
Service::Stack(stack) => {
stack
.serve_with(|rs| {
stack_fidl_worker::StackFidlWorker::serve(netstack.clone(), rs)
})
.await
}
Service::Socket(socket) => {
// Run on a separate task so socket requests are not
// bound to the same thread as the main services
// loop.
let wait_group = fuchsia_async::Task::spawn(socket::serve(
netstack.ctx.clone(),
socket,
))
.await;
// Wait for all socket tasks to finish.
wait_group.await;
}
Service::PacketSocket(socket) => {
// Run on a separate task so socket requests are not
// bound to the same thread as the main services
// loop.
let wait_group = fuchsia_async::Task::spawn(socket::packet::serve(
netstack.ctx.clone(),
socket,
))
.await;
// Wait for all socket tasks to finish.
wait_group.await;
}
Service::RawSocket(socket) => {
// Run on a separate task so socket requests are not
// bound to the same thread as the main services
// loop.
let wait_group = fuchsia_async::Task::spawn(socket::raw::serve(
netstack.ctx.clone(),
socket,
))
.await;
// Wait for all socket tasks to finish.
wait_group.await;
}
Service::RootInterfaces(root_interfaces) => {
root_interfaces
.serve_with(|rs| {
root_fidl_worker::serve_interfaces(netstack.clone(), rs)
})
.await
}
Service::RoutesState(rs) => {
routes::state::serve_state(rs, netstack.ctx.clone()).await
}
Service::RoutesStateV4(rs) => {
routes::state::serve_state_v4(rs, &route_update_dispatcher_v4).await
}
Service::RoutesStateV6(rs) => {
routes::state::serve_state_v6(rs, &route_update_dispatcher_v6).await
}
Service::RoutesAdminV4(rs) => routes::admin::serve_route_table_v4(
rs,
route_set_spawner.clone(),
&netstack.ctx,
)
.await
.unwrap_or_else(|e| {
tracing::error!(
"error serving {}: {e:?}",
fidl_fuchsia_net_routes_admin::RouteTableV4Marker::DEBUG_NAME
);
}),
Service::RoutesAdminV6(rs) => routes::admin::serve_route_table_v6(
rs,
route_set_spawner.clone(),
&netstack.ctx,
)
.await
.unwrap_or_else(|e| {
tracing::error!(
"error serving {}: {e:?}",
fidl_fuchsia_net_routes_admin::RouteTableV6Marker::DEBUG_NAME
);
}),
Service::RootRoutesV4(rs) => root_fidl_worker::serve_routes_v4(
rs,
route_set_spawner.clone(),
&netstack.ctx,
)
.await
.unwrap_or_else(|e| {
tracing::error!(
"error serving {}: {e:?}",
fidl_fuchsia_net_root::RoutesV4Marker::DEBUG_NAME
);
}),
Service::RootRoutesV6(rs) => root_fidl_worker::serve_routes_v6(
rs,
route_set_spawner.clone(),
&netstack.ctx,
)
.await
.unwrap_or_else(|e| {
tracing::error!(
"error serving {}: {e:?}",
fidl_fuchsia_net_root::RoutesV6Marker::DEBUG_NAME
);
}),
Service::Interfaces(interfaces) => {
interfaces
.serve_with(|rs| {
interfaces_watcher::serve(
rs,
interfaces_watcher_sink_ref.clone(),
)
})
.await
}
Service::InterfacesAdmin(installer) => {
tracing::debug!(
"serving {}",
fidl_fuchsia_net_interfaces_admin::InstallerMarker::PROTOCOL_NAME
);
interfaces_admin::serve(netstack.clone(), installer).await;
}
Service::DebugInterfaces(debug_interfaces) => {
debug_interfaces
.serve_with(|rs| {
debug_fidl_worker::serve_interfaces(
netstack.ctx.bindings_ctx(),
rs,
)
})
.await
}
Service::DebugDiagnostics(debug_diagnostics) => {
diagnostics_handler.serve_diagnostics(debug_diagnostics).await
}
Service::DnsServerWatcher(dns) => {
dns.serve_with(|rs| name_worker::serve(netstack.clone(), rs)).await
}
Service::FilterState(filter) => {
filter
.serve_with(|rs| filter::serve_state(rs, &filter_update_dispatcher))
.await
}
Service::FilterControl(filter) => {
filter
.serve_with(|rs| {
filter::serve_control(
rs,
&filter_update_dispatcher,
&netstack.ctx,
)
})
.await
}
Service::Neighbor(neighbor) => {
neighbor
.serve_with(|rs| {
neighbor_worker::serve_view(
rs,
neighbor_watcher_sink_ref.clone(),
)
})
.await
}
Service::NeighborController(neighbor_controller) => {
neighbor_controller
.serve_with(|rs| {
neighbor_worker::serve_controller(netstack.ctx.clone(), rs)
})
.await
}
Service::Verifier(verifier) => {
verifier.serve_with(|rs| verifier_worker::serve(rs)).await
}
}
})
.await
};
{
let services_fut = services_fut.fuse();
// Pin services_fut to this block scope so it's dropped after the
// select.
let mut services_fut = pin!(services_fut);
// Do likewise for unexpected_early_finish_fut.
let mut unexpected_early_finish_fut = pin!(unexpected_early_finish_fut);
let () = futures::select! {
() = services_fut => (),
never = unexpected_early_finish_fut => {
let never: Never = never;
match never {}
},
};
}
info!("all services terminated, starting shutdown");
let ctx = teardown_ctx;
// Stop the loopback interface.
loopback_stopper
.send(fnet_interfaces_admin::InterfaceRemovedReason::PortClosed)
.expect("loopback task must still be running");
// Stop the timer dispatcher.
ctx.bindings_ctx().timers.stop();
// Stop the interfaces watcher worker.
std::mem::drop(interfaces_watcher_sink);
// Stop the neighbor watcher worker.
std::mem::drop(neighbor_watcher_sink);
// Collect the routes admin waitgroup.
route_set_waitgroup.await;
// We've signalled all long running tasks, now we can collect them.
no_finish_tasks.map(|name| info!("{name} finished")).collect::<()>().await;
// Stop the routes change runner.
ctx.bindings_ctx().routes.close_senders();
let _task_name: &str = routes_change_task_fut.await;
// Drop all inspector data, it holds ctx clones.
std::mem::drop(inspect_nodes);
inspector.root().clear_recorded();
// Last thing to happen is dropping the context.
ctx.try_destroy_last().expect("all Ctx references must have been dropped")
}
}