| // Copyright 2023 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! FIDL Worker for the `fuchsia.net.routes` suite of protocols. |
| |
| use std::collections::HashSet; |
| use std::pin::pin; |
| |
| use async_utils::event::Event; |
| use either::Either; |
| use fidl::endpoints::{DiscoverableProtocolMarker as _, ProtocolMarker}; |
| use futures::channel::{mpsc, oneshot}; |
| use futures::future::FusedFuture as _; |
| use futures::{FutureExt, StreamExt as _, TryStream, TryStreamExt as _}; |
| use itertools::Itertools as _; |
| use log::{debug, error, info, warn}; |
| use net_types::ethernet::Mac; |
| use net_types::ip::{GenericOverIp, Ip, IpAddr, IpAddress, Ipv4, Ipv6}; |
| use net_types::SpecifiedAddr; |
| use netstack3_core::device::{DeviceId, EthernetDeviceId, EthernetLinkDevice}; |
| use netstack3_core::error::AddressResolutionFailed; |
| use netstack3_core::neighbor::{LinkResolutionContext, LinkResolutionResult}; |
| use netstack3_core::routes::{NextHop, ResolvedRoute, WrapBroadcastMarker}; |
| use thiserror::Error; |
| use { |
| fidl_fuchsia_net as fnet, fidl_fuchsia_net_routes as fnet_routes, |
| fidl_fuchsia_net_routes_ext as fnet_routes_ext, fuchsia_zircon as zx, |
| }; |
| |
| use crate::bindings::util::{ConversionContext as _, IntoCore as _, IntoFidl as _, ResultExt as _}; |
| use crate::bindings::{routes, BindingsCtx, Ctx, IpExt}; |
| |
| // The maximum number of events a client for the `fuchsia.net.routes/Watcher` |
| // is allowed to have queued. Clients will be dropped if they exceed this limit. |
| // Keep this a multiple of `fnet_routes::MAX_EVENTS` (5 is somewhat arbitrary) |
| // so that we don't artificially truncate the allowed batch size. |
| const MAX_PENDING_EVENTS: usize = (fnet_routes::MAX_EVENTS * 5) as usize; |
| |
| impl LinkResolutionContext<EthernetLinkDevice> for BindingsCtx { |
| type Notifier = LinkResolutionNotifier; |
| } |
| |
| #[derive(Debug)] |
| pub(crate) struct LinkResolutionNotifier(oneshot::Sender<Result<Mac, AddressResolutionFailed>>); |
| |
| impl netstack3_core::neighbor::LinkResolutionNotifier<EthernetLinkDevice> |
| for LinkResolutionNotifier |
| { |
| type Observer = oneshot::Receiver<Result<Mac, AddressResolutionFailed>>; |
| |
| fn new() -> (Self, Self::Observer) { |
| let (tx, rx) = oneshot::channel(); |
| (Self(tx), rx) |
| } |
| |
| fn notify(self, result: Result<Mac, AddressResolutionFailed>) { |
| let Self(tx) = self; |
| tx.send(result).unwrap_or_else(|_| { |
| error!("link address observer was dropped before resolution completed") |
| }); |
| } |
| } |
| |
| /// Serve the `fuchsia.net.routes/State` protocol. |
| pub(crate) async fn serve_state(rs: fnet_routes::StateRequestStream, ctx: Ctx) { |
| rs.try_for_each_concurrent(None, |req| async { |
| match req { |
| fnet_routes::StateRequest::Resolve { destination, responder } => { |
| let result = resolve(destination, ctx.clone()).await; |
| responder |
| .send(result.as_ref().map_err(|e| e.into_raw())) |
| .unwrap_or_log("failed to respond"); |
| Ok(()) |
| } |
| fnet_routes::StateRequest::GetRouteTableName { table_id: _, responder: _ } => { |
| todo!("TODO(https://fxbug.dev/336205291): Implement for main table"); |
| } |
| } |
| }) |
| .await |
| .unwrap_or_else(|e| warn!("error serving {}: {:?}", fnet_routes::StateMarker::PROTOCOL_NAME, e)) |
| } |
| |
| /// Resolves the route to the given destination address. |
| /// |
| /// Returns `Err` if the destination can't be resolved. |
| async fn resolve( |
| destination: fnet::IpAddress, |
| ctx: Ctx, |
| ) -> Result<fnet_routes::Resolved, zx::Status> { |
| let addr: IpAddr = destination.into_core(); |
| match addr { |
| IpAddr::V4(addr) => resolve_inner(addr, ctx).await, |
| IpAddr::V6(addr) => resolve_inner(addr, ctx).await, |
| } |
| } |
| |
| /// The inner implementation of [`resolve`] that's generic over `Ip`. |
| #[netstack3_core::context_ip_bounds(A::Version, BindingsCtx)] |
| async fn resolve_inner<A: IpAddress>( |
| destination: A, |
| mut ctx: Ctx, |
| ) -> Result<fnet_routes::Resolved, zx::Status> |
| where |
| A::Version: IpExt, |
| { |
| let sanitized_dst = SpecifiedAddr::new(destination) |
| .map(|dst| { |
| netstack3_core::routes::RoutableIpAddr::try_from(dst).map_err( |
| |netstack3_core::socket::AddrIsMappedError {}| zx::Status::ADDRESS_UNREACHABLE, |
| ) |
| }) |
| .transpose()?; |
| let ResolvedRoute { device, src_addr, local_delivery_device: _, next_hop } = |
| match ctx.api().routes::<A::Version>().resolve_route(sanitized_dst) { |
| Err(e) => { |
| info!("Resolve failed for {}, {:?}", destination, e); |
| return Err(zx::Status::ADDRESS_UNREACHABLE); |
| } |
| Ok(resolved_route) => resolved_route, |
| }; |
| let (next_hop_addr, next_hop_type) = match next_hop { |
| NextHop::RemoteAsNeighbor => { |
| (SpecifiedAddr::new(destination), Either::Left(fnet_routes::Resolved::Direct)) |
| } |
| NextHop::Broadcast(marker) => { |
| <A::Version as Ip>::map_ip::<_, ()>( |
| WrapBroadcastMarker(marker), |
| |WrapBroadcastMarker(())| (), |
| |WrapBroadcastMarker(never)| match never {}, |
| ); |
| (SpecifiedAddr::new(destination), Either::Left(fnet_routes::Resolved::Direct)) |
| } |
| NextHop::Gateway(gateway) => (Some(gateway), Either::Right(fnet_routes::Resolved::Gateway)), |
| }; |
| let remote_mac = match &device { |
| DeviceId::Loopback(_device) => None, |
| DeviceId::Ethernet(device) => { |
| if let Some(addr) = next_hop_addr { |
| Some(resolve_ethernet_link_addr(&mut ctx, device, &addr).await?) |
| } else { |
| warn!("Cannot attempt Ethernet link resolution for the unspecified address."); |
| return Err(zx::Status::ADDRESS_UNREACHABLE); |
| } |
| } |
| DeviceId::PureIp(_device) => None, |
| }; |
| |
| let destination = { |
| let address = |
| next_hop_addr.map_or(A::Version::UNSPECIFIED_ADDRESS, |a| *a).to_ip_addr().into_fidl(); |
| let source_address = src_addr.addr().to_ip_addr().into_fidl(); |
| let mac = remote_mac.map(|mac| mac.into_fidl()); |
| let interface_id = ctx.bindings_ctx().get_binding_id(device); |
| fnet_routes::Destination { |
| address: Some(address), |
| mac, |
| interface_id: Some(interface_id.get()), |
| source_address: Some(source_address), |
| ..Default::default() |
| } |
| }; |
| |
| Ok(either::for_both!(next_hop_type, f => f(destination))) |
| } |
| |
| /// Performs link-layer resolution of the remote IP Address on the given device. |
| #[netstack3_core::context_ip_bounds(A::Version, BindingsCtx)] |
| async fn resolve_ethernet_link_addr<A: IpAddress>( |
| ctx: &mut Ctx, |
| device: &EthernetDeviceId<BindingsCtx>, |
| remote: &SpecifiedAddr<A>, |
| ) -> Result<Mac, zx::Status> |
| where |
| A::Version: IpExt, |
| { |
| match ctx.api().neighbor::<A::Version, EthernetLinkDevice>().resolve_link_addr(device, remote) { |
| LinkResolutionResult::Resolved(mac) => Ok(mac), |
| LinkResolutionResult::Pending(observer) => observer |
| .await |
| .expect("core must send link resolution result before dropping notifier") |
| .map_err(|AddressResolutionFailed| zx::Status::ADDRESS_UNREACHABLE), |
| } |
| } |
| |
| /// Serve the `fuchsia.net.routes/StateV4` protocol. |
| pub(crate) async fn serve_state_v4( |
| rs: fnet_routes::StateV4RequestStream, |
| dispatcher: &RouteUpdateDispatcher<Ipv4>, |
| ) { |
| rs.try_for_each_concurrent(None, |req| match req { |
| fnet_routes::StateV4Request::GetWatcherV4 { options, watcher, control_handle: _ } => { |
| serve_watcher::<Ipv4>(watcher, options.into(), dispatcher).map(|result| { |
| Ok(result.unwrap_or_else(|e| { |
| warn!("error serving {}: {:?}", fnet_routes::WatcherV4Marker::DEBUG_NAME, e) |
| })) |
| }) |
| } |
| fnet_routes::StateV4Request::GetRuleWatcherV4 { |
| options: _, |
| watcher: _, |
| control_handle: _, |
| } => { |
| todo!("TODO(https://fxbug.dev/336204757): Implement rules watcher"); |
| } |
| }) |
| .await |
| .unwrap_or_else(|e| { |
| warn!("error serving {}: {:?}", fnet_routes::StateV4Marker::PROTOCOL_NAME, e) |
| }) |
| } |
| |
| /// Serve the `fuchsia.net.routes/StateV6` protocol. |
| pub(crate) async fn serve_state_v6( |
| rs: fnet_routes::StateV6RequestStream, |
| dispatcher: &RouteUpdateDispatcher<Ipv6>, |
| ) { |
| rs.try_for_each_concurrent(None, |req| match req { |
| fnet_routes::StateV6Request::GetWatcherV6 { options, watcher, control_handle: _ } => { |
| serve_watcher::<Ipv6>(watcher, options.into(), dispatcher).map(|result| { |
| Ok(result.unwrap_or_else(|e| { |
| warn!("error serving {}: {:?}", fnet_routes::WatcherV6Marker::DEBUG_NAME, e) |
| })) |
| }) |
| } |
| fnet_routes::StateV6Request::GetRuleWatcherV6 { |
| options: _, |
| watcher: _, |
| control_handle: _, |
| } => { |
| todo!("TODO(https://fxbug.dev/336204757): Implement rules watcher"); |
| } |
| }) |
| .await |
| .unwrap_or_else(|e| { |
| warn!("error serving {}: {:?}", fnet_routes::StateV6Marker::PROTOCOL_NAME, e) |
| }) |
| } |
| |
| #[derive(Debug, Error)] |
| enum ServeWatcherError { |
| #[error("the request stream contained a FIDL error")] |
| ErrorInStream(fidl::Error), |
| #[error("a FIDL error was encountered while sending the response")] |
| FailedToRespond(fidl::Error), |
| #[error("the client called `Watch` while a previous call was already pending")] |
| PreviousPendingWatch, |
| #[error("the client was canceled")] |
| Canceled, |
| } |
| |
| // Serve a single client of the `WatcherV4` or `WatcherV6` protocol. |
| async fn serve_watcher<I: fnet_routes_ext::FidlRouteIpExt>( |
| server_end: fidl::endpoints::ServerEnd<I::WatcherMarker>, |
| fnet_routes_ext::WatcherOptions { table_interest }: fnet_routes_ext::WatcherOptions, |
| RouteUpdateDispatcher(dispatcher): &RouteUpdateDispatcher<I>, |
| ) -> Result<(), ServeWatcherError> { |
| let client_interest = match table_interest { |
| Some(fnet_routes::TableInterest::Main(fnet_routes::Main)) => { |
| ClientInterest::Only { table_id: routes::main_table_id::<I>().into() } |
| } |
| Some(fnet_routes::TableInterest::Only(table_id)) => ClientInterest::Only { table_id }, |
| Some(fnet_routes::TableInterest::All(fnet_routes::All)) | None => ClientInterest::All, |
| Some(fnet_routes::TableInterest::__SourceBreaking { unknown_ordinal }) => { |
| return Err(ServeWatcherError::ErrorInStream(fidl::Error::UnknownOrdinal { |
| ordinal: unknown_ordinal, |
| protocol_name: <I::WatcherMarker as ProtocolMarker>::DEBUG_NAME, |
| })) |
| } |
| }; |
| let mut watcher = { |
| let mut dispatcher = dispatcher.lock().await; |
| dispatcher.connect_new_client(client_interest) |
| }; |
| |
| let request_stream = |
| server_end.into_stream().expect("failed to acquire request_stream from server_end"); |
| |
| let canceled_fut = watcher.canceled.wait(); |
| |
| let result = { |
| let mut request_stream = request_stream.map_err(ServeWatcherError::ErrorInStream).fuse(); |
| let mut canceled_fut = pin!(canceled_fut); |
| let mut pending_watch_request = futures::future::OptionFuture::default(); |
| loop { |
| pending_watch_request = futures::select! { |
| request = request_stream.try_next() => match request { |
| Ok(Some(req)) => if pending_watch_request.is_terminated() { |
| // Convince the compiler that we're not holding on to a |
| // borrow of watcher. |
| std::mem::drop(pending_watch_request); |
| // Old request is terminated, accept this new one. |
| Some(watcher.watch().map(move |events| (req, events))).into() |
| } else { |
| break Err(ServeWatcherError::PreviousPendingWatch); |
| }, |
| Ok(None) => break Ok(()), |
| Err(e) => break Err(e), |
| }, |
| r = pending_watch_request => { |
| let (request, events) = r.expect("OptionFuture is not selected if empty"); |
| match respond_to_watch_request(request, events) { |
| Ok(()) => None.into(), |
| Err(e) => break Err(ServeWatcherError::FailedToRespond(e)), |
| } |
| }, |
| () = canceled_fut => break Err(ServeWatcherError::Canceled), |
| }; |
| } |
| }; |
| { |
| let mut dispatcher = dispatcher.lock().await; |
| dispatcher.disconnect_client(watcher); |
| } |
| |
| result |
| } |
| |
| // Responds to a single `Watch` request with the given batch of events. |
| fn respond_to_watch_request<I: fnet_routes_ext::FidlRouteIpExt>( |
| req: <<I::WatcherMarker as fidl::endpoints::ProtocolMarker>::RequestStream as TryStream>::Ok, |
| events: Vec<fnet_routes_ext::Event<I>>, |
| ) -> Result<(), fidl::Error> { |
| #[derive(GenericOverIp)] |
| #[generic_over_ip(I, Ip)] |
| struct Inputs<I: fnet_routes_ext::FidlRouteIpExt> { |
| req: |
| <<I::WatcherMarker as fidl::endpoints::ProtocolMarker>::RequestStream as TryStream>::Ok, |
| events: Vec<fnet_routes_ext::Event<I>>, |
| } |
| let result = I::map_ip_in::<Inputs<I>, _>( |
| Inputs { req, events }, |
| |Inputs { req, events }| match req { |
| fnet_routes::WatcherV4Request::Watch { responder } => { |
| let events = events |
| .into_iter() |
| .map(|event| { |
| event.try_into().unwrap_or_else(|e| match e { |
| fnet_routes_ext::NetTypeConversionError::UnknownUnionVariant(msg) => { |
| panic!("tried to send an event with Unknown enum variant: {}", msg) |
| } |
| }) |
| }) |
| .collect::<Vec<_>>(); |
| responder.send(&events) |
| } |
| }, |
| |Inputs { req, events }| match req { |
| fnet_routes::WatcherV6Request::Watch { responder } => { |
| let events = events |
| .into_iter() |
| .map(|event| { |
| event.try_into().unwrap_or_else(|e| match e { |
| fnet_routes_ext::NetTypeConversionError::UnknownUnionVariant(msg) => { |
| panic!("tried to send an event with Unknown enum variant: {}", msg) |
| } |
| }) |
| }) |
| .collect::<Vec<_>>(); |
| responder.send(&events) |
| } |
| }, |
| ); |
| result |
| } |
| |
| // An update to the routing table. |
| pub(crate) enum RoutingTableUpdate<I: Ip> { |
| RouteAdded(fnet_routes_ext::InstalledRoute<I>), |
| RouteRemoved(fnet_routes_ext::InstalledRoute<I>), |
| } |
| |
| // Consumes updates to the system routing table and dispatches them to clients |
| // of the `fuchsia.net.routes/WatcherV{4,6}` protocols. |
| #[derive(Default, Clone)] |
| pub(crate) struct RouteUpdateDispatcher<I: Ip>( |
| std::sync::Arc<futures::lock::Mutex<RouteUpdateDispatcherInner<I>>>, |
| ); |
| |
| // The inner representation of a `RouteUpdateDispatcher` holding state for the |
| // given IP protocol. |
| #[derive(Default)] |
| struct RouteUpdateDispatcherInner<I: Ip> { |
| // The set of currently installed routes. |
| routes: HashSet<fnet_routes_ext::InstalledRoute<I>>, |
| // The list of currently connected clients. |
| clients: Vec<RoutesWatcherSink<I>>, |
| } |
| |
| // The error type returned by `RouteUpdateDispatcher.notify()`. |
| #[derive(Debug, PartialEq)] |
| pub(crate) enum RouteUpdateNotifyError<I: Ip> { |
| // `notify` was called with `RoutingTableUpdate::RouteAdded` for a route |
| // that already exists. |
| AlreadyExists(fnet_routes_ext::InstalledRoute<I>), |
| // `notify` was called with `RoutingTableUpdate::RouteRemoved` for a route |
| // that does not exist. |
| NotFound(fnet_routes_ext::InstalledRoute<I>), |
| } |
| |
| impl<I: Ip> RouteUpdateDispatcherInner<I> { |
| // Notify this `RouteUpdateDispatcher` of an update to the routing table. |
| // The update will be dispatched to all active watcher clients. |
| fn notify(&mut self, update: RoutingTableUpdate<I>) -> Result<(), RouteUpdateNotifyError<I>> { |
| let RouteUpdateDispatcherInner { routes, clients } = self; |
| let event = match update { |
| RoutingTableUpdate::RouteAdded(route) => { |
| if routes.insert(route.clone()) { |
| fnet_routes_ext::Event::Added(route) |
| } else { |
| return Err(RouteUpdateNotifyError::AlreadyExists(route)); |
| } |
| } |
| RoutingTableUpdate::RouteRemoved(route) => { |
| if routes.remove(&route) { |
| fnet_routes_ext::Event::Removed(route) |
| } else { |
| return Err(RouteUpdateNotifyError::NotFound(route)); |
| } |
| } |
| }; |
| for client in clients { |
| client.send(event) |
| } |
| Ok(()) |
| } |
| |
| // Register a new client with this `RouteUpdateDispatcher`. |
| fn connect_new_client(&mut self, client_interest: ClientInterest) -> RoutesWatcher<I> { |
| let RouteUpdateDispatcherInner { routes, clients } = self; |
| let (watcher, sink) = |
| RoutesWatcher::new_with_existing_routes(routes.iter().cloned(), client_interest); |
| clients.push(sink); |
| watcher |
| } |
| |
| // Disconnect the given watcher from this `RouteUpdateDispatcher`. |
| fn disconnect_client(&mut self, watcher: RoutesWatcher<I>) { |
| let RouteUpdateDispatcherInner { routes: _, clients } = self; |
| let (idx, _): (usize, &RoutesWatcherSink<I>) = clients |
| .iter() |
| .enumerate() |
| .filter(|(_idx, client)| client.is_connected_to(&watcher)) |
| .exactly_one() |
| .expect("expected exactly one sink"); |
| let _: RoutesWatcherSink<I> = clients.swap_remove(idx); |
| } |
| } |
| |
| impl<I: Ip> RouteUpdateDispatcher<I> { |
| pub(crate) async fn notify( |
| &self, |
| update: RoutingTableUpdate<I>, |
| ) -> Result<(), RouteUpdateNotifyError<I>> { |
| let Self(inner) = self; |
| inner.lock().await.notify(update) |
| } |
| } |
| |
| /// The tables the client is interested in. |
| #[derive(Debug)] |
| enum ClientInterest { |
| /// The client is interested in updates across all tables. |
| All, |
| /// The client only want updates on the specific table. |
| /// |
| /// The table ID is a scalar instead of [`TableId`] because we don't perform |
| /// validation but only filtering. |
| Only { table_id: u32 }, |
| } |
| |
| impl ClientInterest { |
| fn has_interest_in<I: Ip>(&self, event: &fnet_routes_ext::Event<I>) -> bool { |
| match event { |
| fnet_routes_ext::Event::Unknown | fnet_routes_ext::Event::Idle => true, |
| fnet_routes_ext::Event::Existing(installed_route) |
| | fnet_routes_ext::Event::Added(installed_route) |
| | fnet_routes_ext::Event::Removed(installed_route) => match self { |
| ClientInterest::All => true, |
| ClientInterest::Only { table_id } => installed_route.table_id == *table_id, |
| }, |
| } |
| } |
| } |
| |
| // Consumes events for a single client of the |
| // `fuchsia.net.routes/WatcherV{4,6}` protocols. |
| #[derive(Debug)] |
| struct RoutesWatcherSink<I: Ip> { |
| // The sink with which to send routing changes to this client. |
| sink: mpsc::Sender<fnet_routes_ext::Event<I>>, |
| // The mechanism with which to cancel the client. |
| cancel: Event, |
| // What table is this client interested in. |
| interest: ClientInterest, |
| } |
| |
| impl<I: Ip> RoutesWatcherSink<I> { |
| // Send this [`RoutesWatcherSink`] a new event. |
| fn send(&mut self, event: fnet_routes_ext::Event<I>) { |
| let interested = self.interest.has_interest_in(&event); |
| |
| if !interested { |
| debug!( |
| "The client for sink {:?} is not interested in this event {:?}, skipping", |
| self, event |
| ); |
| return; |
| } |
| |
| self.sink.try_send(event).unwrap_or_else(|e| { |
| if e.is_full() { |
| if self.cancel.signal() { |
| warn!( |
| "too many unconsumed events (the client may not be \ |
| calling Watch frequently enough): {}", |
| MAX_PENDING_EVENTS |
| ) |
| } |
| } else { |
| panic!("unexpected error trying to send: {:?}", e) |
| } |
| }) |
| } |
| |
| // Returns `true` if this sink forwards events to the given watcher. |
| fn is_connected_to(&self, watcher: &RoutesWatcher<I>) -> bool { |
| self.sink.is_connected_to(&watcher.receiver) |
| } |
| } |
| |
| #[derive(Debug)] |
| // An implementation of the `fuchsia.net.routes.WatcherV{4,6}` protocols for |
| // a single client. |
| struct RoutesWatcher<I: Ip> { |
| // The `Existing` + `Idle` events for this client, capturing all of the |
| // routes that existed at the time it was instantiated. |
| // NB: storing this as an `IntoIter` makes `watch` easier to implement. |
| existing_events: |
| <std::vec::Vec<fnet_routes_ext::Event<I>> as std::iter::IntoIterator>::IntoIter, |
| // The receiver of routing changes for this client. |
| receiver: mpsc::Receiver<fnet_routes_ext::Event<I>>, |
| // The mechanism to observe that this client has been canceled. |
| canceled: Event, |
| } |
| |
| impl<I: Ip> RoutesWatcher<I> { |
| // Creates a new `RoutesWatcher` with the given existing routes. |
| fn new_with_existing_routes<R: Iterator<Item = fnet_routes_ext::InstalledRoute<I>>>( |
| routes: R, |
| interest: ClientInterest, |
| ) -> (Self, RoutesWatcherSink<I>) { |
| let (sender, receiver) = mpsc::channel(MAX_PENDING_EVENTS); |
| let cancel = Event::new(); |
| ( |
| RoutesWatcher { |
| existing_events: routes |
| .map(fnet_routes_ext::Event::Existing) |
| .filter(|event| interest.has_interest_in(event)) |
| .chain(std::iter::once(fnet_routes_ext::Event::Idle)) |
| .collect::<Vec<_>>() |
| .into_iter(), |
| receiver: receiver, |
| canceled: cancel.clone(), |
| }, |
| RoutesWatcherSink { sink: sender, cancel, interest }, |
| ) |
| } |
| |
| // Watch returns the currently available events (up to |
| // [`fnet_routes::MAX_EVENTS`]). This call will block if there are no |
| // available events. |
| fn watch( |
| &mut self, |
| ) -> impl futures::Future<Output = Vec<fnet_routes_ext::Event<I>>> + Unpin + '_ { |
| let RoutesWatcher { existing_events, receiver, canceled: _ } = self; |
| futures::stream::iter(existing_events.by_ref()) |
| .chain(receiver) |
| // Note: `ready_chunks` blocks until at least 1 event is ready. |
| .ready_chunks(fnet_routes::MAX_EVENTS.into()) |
| .into_future() |
| .map(|(r, _ready_chunks)| r.expect("underlying event stream unexpectedly ended")) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use assert_matches::assert_matches; |
| use ip_test_macro::ip_test; |
| use net_declare::{net_subnet_v4, net_subnet_v6}; |
| |
| fn arbitrary_route_on_interface<I: Ip>(interface: u64) -> fnet_routes_ext::InstalledRoute<I> { |
| fnet_routes_ext::InstalledRoute { |
| route: fnet_routes_ext::Route { |
| destination: I::map_ip( |
| (), |
| |()| net_subnet_v4!("192.168.0.0/24"), |
| |()| net_subnet_v6!("fe80::/64"), |
| ), |
| action: fnet_routes_ext::RouteAction::Forward(fnet_routes_ext::RouteTarget { |
| outbound_interface: interface, |
| next_hop: None, |
| }), |
| properties: fnet_routes_ext::RouteProperties { |
| specified_properties: fnet_routes_ext::SpecifiedRouteProperties { |
| metric: fnet_routes::SpecifiedMetric::ExplicitMetric(0), |
| }, |
| }, |
| }, |
| effective_properties: fnet_routes_ext::EffectiveRouteProperties { metric: 0 }, |
| table_id: 0, |
| } |
| } |
| |
| // Tests that `RouteUpdateDispatcher` returns an error when it receives a |
| // `RouteRemoved` update for a non-existent route. |
| #[ip_test(I)] |
| fn dispatcher_fails_to_remove_non_existent<I: Ip>() { |
| let route = arbitrary_route_on_interface::<I>(1); |
| assert_eq!( |
| RouteUpdateDispatcherInner::default() |
| .notify(RoutingTableUpdate::RouteRemoved(route.clone())), |
| Err(RouteUpdateNotifyError::NotFound(route)) |
| ); |
| } |
| |
| // Tests that `RouteUpdateDispatcher` returns an error when it receives an |
| // `AddRoute` update for an already existing route. |
| #[ip_test(I)] |
| fn dispatcher_fails_to_add_existing<I: Ip>() { |
| let mut dispatcher = RouteUpdateDispatcherInner::default(); |
| let route = arbitrary_route_on_interface::<I>(1); |
| assert_eq!(dispatcher.notify(RoutingTableUpdate::RouteAdded(route)), Ok(())); |
| assert_eq!( |
| dispatcher.notify(RoutingTableUpdate::RouteAdded(route)), |
| Err(RouteUpdateNotifyError::AlreadyExists(route)) |
| ); |
| } |
| |
| // Tests the basic functionality of the `RouteUpdateDispatcher`, |
| // `RouteWatcherSink`, and `RouteWatcher`. |
| #[ip_test(I)] |
| fn notify_dispatch_watch<I: Ip>() { |
| let mut dispatcher = RouteUpdateDispatcherInner::default(); |
| |
| // Add a new watcher and verify there are no existing routes. |
| let mut watcher1 = dispatcher.connect_new_client(ClientInterest::All); |
| assert_eq!(watcher1.watch().now_or_never().unwrap(), [fnet_routes_ext::Event::<I>::Idle]); |
| |
| // Add a route and verify that the watcher is notified. |
| let route = arbitrary_route_on_interface(1); |
| dispatcher.notify(RoutingTableUpdate::RouteAdded(route)).expect("failed to notify"); |
| assert_eq!( |
| watcher1.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Added(route)] |
| ); |
| |
| // Connect a second watcher and verify it sees the route as `Existing`. |
| let mut watcher2 = dispatcher.connect_new_client(ClientInterest::All); |
| assert_eq!( |
| watcher2.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Existing(route), fnet_routes_ext::Event::<I>::Idle] |
| ); |
| |
| // Remove the route and verify both watchers are notified. |
| dispatcher.notify(RoutingTableUpdate::RouteRemoved(route)).expect("failed to notify"); |
| assert_eq!( |
| watcher1.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Removed(route)] |
| ); |
| assert_eq!( |
| watcher2.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Removed(route)] |
| ); |
| |
| // Disconnect the first client, and verify the second client is still |
| // able to be notified. |
| dispatcher.disconnect_client(watcher1); |
| dispatcher.notify(RoutingTableUpdate::RouteAdded(route)).expect("failed to notify"); |
| assert_eq!( |
| watcher2.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Added(route)] |
| ); |
| } |
| |
| // Tests that a `RouteWatcher` is canceled if it exceeds |
| // `MAX_PENDING_EVENTS` in its queue. |
| #[ip_test(I)] |
| fn cancel_watcher_with_too_many_pending_events<I: Ip>() { |
| // Helper function to drain the watcher of a specific number of events, |
| // which may be spread across multiple batches of size |
| // `fnet_routes::MAX_EVENTS`. |
| fn drain_watcher<I: Ip>(watcher: &mut RoutesWatcher<I>, num_required_events: usize) { |
| let mut num_observed_events = 0; |
| while num_observed_events < num_required_events { |
| num_observed_events += watcher.watch().now_or_never().unwrap().len() |
| } |
| assert_eq!(num_observed_events, num_required_events); |
| } |
| |
| let mut dispatcher = RouteUpdateDispatcherInner::default(); |
| // `Existing` routes shouldn't count against the client's quota. |
| // Exceed the quota, and then verify new clients can still connect. |
| // Note that `EXCESS` is 2, because mpsc::channel implicitly adds +1 to |
| // the buffer size for every connected sender (and the dispatcher holds |
| // a sender). |
| const EXCESS: usize = 2; |
| const TOO_MANY_EVENTS: usize = MAX_PENDING_EVENTS + EXCESS; |
| for i in 0..TOO_MANY_EVENTS { |
| let route = arbitrary_route_on_interface::<I>(i.try_into().unwrap()); |
| dispatcher.notify(RoutingTableUpdate::RouteAdded(route)).expect("failed to notify"); |
| } |
| let mut watcher1 = dispatcher.connect_new_client(ClientInterest::All); |
| let mut watcher2 = dispatcher.connect_new_client(ClientInterest::All); |
| assert_eq!(watcher1.canceled.wait().now_or_never(), None); |
| assert_eq!(watcher2.canceled.wait().now_or_never(), None); |
| // Drain all of the `Existing` events (and +1 for the `Idle` event). |
| drain_watcher(&mut watcher1, TOO_MANY_EVENTS + 1); |
| drain_watcher(&mut watcher2, TOO_MANY_EVENTS + 1); |
| |
| // Generate `TOO_MANY_EVENTS`, consuming the excess on `watcher1` but |
| // not on `watcher2`; `watcher2` should be canceled. |
| for i in 0..EXCESS { |
| assert_eq!(watcher1.canceled.wait().now_or_never(), None); |
| assert_eq!(watcher2.canceled.wait().now_or_never(), None); |
| let route = arbitrary_route_on_interface::<I>(i.try_into().unwrap()); |
| dispatcher.notify(RoutingTableUpdate::RouteRemoved(route)).expect("failed to notify"); |
| } |
| drain_watcher(&mut watcher1, EXCESS); |
| for i in EXCESS..TOO_MANY_EVENTS { |
| assert_eq!(watcher1.canceled.wait().now_or_never(), None); |
| assert_eq!(watcher2.canceled.wait().now_or_never(), None); |
| let route = arbitrary_route_on_interface::<I>(i.try_into().unwrap()); |
| dispatcher.notify(RoutingTableUpdate::RouteRemoved(route)).expect("failed to notify"); |
| } |
| assert_eq!(watcher1.canceled.wait().now_or_never(), None); |
| assert_eq!(watcher2.canceled.wait().now_or_never(), Some(())); |
| } |
| |
| #[ip_test(I)] |
| fn watcher_respects_interest<I: Ip>() { |
| let mut dispatcher = RouteUpdateDispatcherInner::default(); |
| let main_table_id = routes::main_table_id::<I>(); |
| let other_table_id = main_table_id.next().expect("no next ID"); |
| let main_route = fnet_routes_ext::InstalledRoute { |
| table_id: main_table_id.into(), |
| ..arbitrary_route_on_interface::<I>(0) |
| }; |
| dispatcher.notify(RoutingTableUpdate::RouteAdded(main_route)).expect("failed to notify"); |
| let other_route = fnet_routes_ext::InstalledRoute { |
| table_id: other_table_id.into(), |
| ..arbitrary_route_on_interface::<I>(0) |
| }; |
| dispatcher.notify(RoutingTableUpdate::RouteAdded(other_route)).expect("failed to notify"); |
| let mut all_watcher = dispatcher.connect_new_client(ClientInterest::All); |
| let mut main_watcher = dispatcher.connect_new_client(ClientInterest::Only { |
| table_id: routes::main_table_id::<I>().into(), |
| }); |
| let mut other_watcher = |
| dispatcher.connect_new_client(ClientInterest::Only { table_id: other_table_id.into() }); |
| // They can get out of order because installed routes are stored in |
| // HashSet. |
| assert_matches!( |
| &all_watcher.watch().now_or_never().unwrap()[..], |
| [ |
| fnet_routes_ext::Event::Existing(installed_1), |
| fnet_routes_ext::Event::Existing(installed_2), |
| fnet_routes_ext::Event::<I>::Idle |
| ] => { |
| assert_eq!( |
| HashSet::<fnet_routes_ext::InstalledRoute<I>>::from_iter( |
| [*installed_1, *installed_2] |
| ), HashSet::from_iter([main_route, other_route])); |
| } |
| ); |
| assert_eq!( |
| main_watcher.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Existing(main_route), fnet_routes_ext::Event::<I>::Idle] |
| ); |
| assert_eq!( |
| other_watcher.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Existing(other_route), fnet_routes_ext::Event::<I>::Idle] |
| ); |
| dispatcher.notify(RoutingTableUpdate::RouteRemoved(main_route)).expect("failed to notify"); |
| dispatcher.notify(RoutingTableUpdate::RouteRemoved(other_route)).expect("failed to notify"); |
| // For a watcher interested in all tables, it should observe all route |
| // changes in all tables. |
| assert_eq!( |
| all_watcher.watch().now_or_never().unwrap(), |
| [ |
| fnet_routes_ext::Event::Removed(main_route), |
| fnet_routes_ext::Event::Removed(other_route), |
| ] |
| ); |
| // For a watcher interested in only the main table, it should observe |
| // route changes in only the main table. |
| assert_eq!( |
| main_watcher.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Removed(main_route)] |
| ); |
| // For a watcher interested in only the other table, it should observe |
| // route changes in only the other table. |
| assert_eq!( |
| other_watcher.watch().now_or_never().unwrap(), |
| [fnet_routes_ext::Event::Removed(other_route)] |
| ); |
| } |
| } |