| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Netstack3 bindings. |
| //! |
| //! This module provides Fuchsia bindings for the [`netstack3_core`] crate. |
| |
| #[macro_use] |
| mod macros; |
| |
| #[cfg(test)] |
| mod integration_tests; |
| |
| mod context; |
| mod debug_fidl_worker; |
| mod devices; |
| mod ethernet_worker; |
| mod interfaces_admin; |
| mod interfaces_watcher; |
| mod netdevice_worker; |
| mod socket; |
| mod stack_fidl_worker; |
| mod timers; |
| mod util; |
| |
| use std::convert::TryFrom as _; |
| use std::future::Future; |
| use std::num::NonZeroU16; |
| use std::ops::DerefMut as _; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| use fidl::endpoints::{DiscoverableProtocolMarker, RequestStream}; |
| use fidl_fuchsia_net_stack as fidl_net_stack; |
| use fuchsia_async as fasync; |
| use fuchsia_component::server::{ServiceFs, ServiceFsDir}; |
| use fuchsia_zircon as zx; |
| use futures::{ |
| channel::mpsc, lock::Mutex, FutureExt as _, SinkExt as _, StreamExt as _, TryStreamExt as _, |
| }; |
| use log::{debug, error}; |
| use packet::{BufferMut, Serializer}; |
| use packet_formats::icmp::{IcmpEchoReply, IcmpMessage, IcmpUnusedCode}; |
| use rand::rngs::OsRng; |
| use util::ConversionContext; |
| |
| use context::Lockable; |
| use devices::{ |
| BindingId, CommonInfo, DeviceInfo, DeviceSpecificInfo, Devices, EthernetInfo, LoopbackInfo, |
| NetdeviceInfo, |
| }; |
| use interfaces_watcher::{InterfaceEventProducer, InterfaceProperties, InterfaceUpdate}; |
| use timers::TimerDispatcher; |
| |
| use net_types::ip::{AddrSubnet, AddrSubnetEither, Ip, Ipv4, Ipv6}; |
| use netstack3_core::{ |
| add_ip_addr_subnet, add_route, |
| context::{CounterContext, EventContext, InstantContext, RngContext, TimerContext}, |
| handle_timer, icmp, update_ipv4_configuration, update_ipv6_configuration, AddableEntryEither, |
| BufferUdpContext, Ctx, DeviceId, DeviceLayerEventDispatcher, IpDeviceConfiguration, IpExt, |
| Ipv4DeviceConfiguration, Ipv6DeviceConfiguration, NonSyncContext, SlaacConfiguration, TimerId, |
| UdpBoundId, UdpConnId, UdpContext, UdpListenerId, |
| }; |
| |
| /// Default MTU for loopback. |
| /// |
| /// This value is also the default value used on Linux. As of writing: |
| /// |
| /// ```shell |
| /// $ ip link show dev lo |
| /// 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000 |
| /// link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 |
| /// ``` |
| const DEFAULT_LOOPBACK_MTU: u32 = 65536; |
| |
| pub(crate) trait LockableContext: for<'a> Lockable<'a, Ctx<Self::NonSyncCtx>> { |
| type NonSyncCtx: NonSyncContext + Send; |
| } |
| |
| pub(crate) trait DeviceStatusNotifier { |
| /// A notification that the state of the device with binding Id `id` |
| /// changed. |
| /// |
| /// This method is called by workers that observe devices, such as |
| /// [`EthernetWorker`]. This method is called after all the internal |
| /// structures that cache or store device state are already up to date. The |
| /// only side effect should be notifying other workers or external |
| /// applications that are listening for status changes. |
| fn device_status_changed(&mut self, id: u64); |
| } |
| |
| pub(crate) trait InterfaceEventProducerFactory { |
| fn create_interface_event_producer( |
| &self, |
| id: BindingId, |
| properties: InterfaceProperties, |
| ) -> InterfaceEventProducer; |
| } |
| |
| type IcmpEchoSockets = socket::datagram::SocketCollectionPair<socket::datagram::IcmpEcho>; |
| type UdpSockets = socket::datagram::SocketCollectionPair<socket::datagram::Udp>; |
| |
| impl DeviceStatusNotifier for BindingsNonSyncCtxImpl { |
| fn device_status_changed(&mut self, _id: u64) { |
| // NOTE(brunodalbo) we may want to do more things here in the future, |
| // for now this is only intercepted for testing |
| } |
| } |
| |
| /// Provides an implementation of [`NonSyncContext`]. |
| #[derive(Default)] |
| pub(crate) struct BindingsNonSyncCtxImpl { |
| rng: OsRng, |
| timers: timers::TimerDispatcher<TimerId>, |
| devices: Devices, |
| icmp_echo_sockets: IcmpEchoSockets, |
| udp_sockets: UdpSockets, |
| } |
| |
| impl AsRef<timers::TimerDispatcher<TimerId>> for BindingsNonSyncCtxImpl { |
| fn as_ref(&self) -> &TimerDispatcher<TimerId> { |
| &self.timers |
| } |
| } |
| |
| impl AsMut<timers::TimerDispatcher<TimerId>> for BindingsNonSyncCtxImpl { |
| fn as_mut(&mut self) -> &mut TimerDispatcher<TimerId> { |
| &mut self.timers |
| } |
| } |
| |
| impl AsRef<Devices> for BindingsNonSyncCtxImpl { |
| fn as_ref(&self) -> &Devices { |
| &self.devices |
| } |
| } |
| |
| impl AsMut<Devices> for BindingsNonSyncCtxImpl { |
| fn as_mut(&mut self) -> &mut Devices { |
| &mut self.devices |
| } |
| } |
| |
| impl<'a> Lockable<'a, Ctx<BindingsNonSyncCtxImpl>> for Netstack { |
| type Guard = futures::lock::MutexGuard<'a, Ctx<BindingsNonSyncCtxImpl>>; |
| type Fut = futures::lock::MutexLockFuture<'a, Ctx<BindingsNonSyncCtxImpl>>; |
| fn lock(&'a self) -> Self::Fut { |
| self.ctx.lock() |
| } |
| } |
| |
| impl AsRef<IcmpEchoSockets> for BindingsNonSyncCtxImpl { |
| fn as_ref(&self) -> &IcmpEchoSockets { |
| &self.icmp_echo_sockets |
| } |
| } |
| |
| impl AsMut<IcmpEchoSockets> for BindingsNonSyncCtxImpl { |
| fn as_mut(&mut self) -> &mut IcmpEchoSockets { |
| &mut self.icmp_echo_sockets |
| } |
| } |
| |
| impl AsRef<UdpSockets> for BindingsNonSyncCtxImpl { |
| fn as_ref(&self) -> &UdpSockets { |
| &self.udp_sockets |
| } |
| } |
| |
| impl AsMut<UdpSockets> for BindingsNonSyncCtxImpl { |
| fn as_mut(&mut self) -> &mut UdpSockets { |
| &mut self.udp_sockets |
| } |
| } |
| |
| impl<NonSyncCtx> timers::TimerHandler<TimerId> for Ctx<NonSyncCtx> |
| where |
| NonSyncCtx: NonSyncContext + AsMut<timers::TimerDispatcher<TimerId>> + Send + Sync + 'static, |
| { |
| fn handle_expired_timer(&mut self, timer: TimerId) { |
| let Ctx { sync_ctx, non_sync_ctx } = self; |
| handle_timer(sync_ctx, non_sync_ctx, timer) |
| } |
| |
| fn get_timer_dispatcher(&mut self) -> &mut timers::TimerDispatcher<TimerId> { |
| self.non_sync_ctx.as_mut() |
| } |
| } |
| |
| impl<C> timers::TimerContext<TimerId> for C |
| where |
| C: LockableContext + Clone + Send + Sync + 'static, |
| C::NonSyncCtx: AsMut<timers::TimerDispatcher<TimerId>> + Send + Sync + 'static, |
| { |
| type Handler = Ctx<C::NonSyncCtx>; |
| } |
| |
| impl<D> ConversionContext for D |
| where |
| D: AsRef<Devices>, |
| { |
| fn get_core_id(&self, binding_id: u64) -> Option<DeviceId> { |
| self.as_ref().get_core_id(binding_id) |
| } |
| |
| fn get_binding_id(&self, core_id: DeviceId) -> Option<u64> { |
| self.as_ref().get_binding_id(core_id) |
| } |
| } |
| |
| /// A thin wrapper around `fuchsia_async::Time` that implements `core::Instant`. |
| #[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)] |
| pub(crate) struct StackTime(fasync::Time); |
| |
| impl netstack3_core::Instant for StackTime { |
| fn duration_since(&self, earlier: StackTime) -> Duration { |
| assert!(self.0 >= earlier.0); |
| // guaranteed not to panic because the assertion ensures that the |
| // difference is non-negative, and all non-negative i64 values are also |
| // valid u64 values |
| Duration::from_nanos(u64::try_from(self.0.into_nanos() - earlier.0.into_nanos()).unwrap()) |
| } |
| |
| fn checked_add(&self, duration: Duration) -> Option<StackTime> { |
| Some(StackTime(fasync::Time::from_nanos( |
| self.0.into_nanos().checked_add(i64::try_from(duration.as_nanos()).ok()?)?, |
| ))) |
| } |
| |
| fn checked_sub(&self, duration: Duration) -> Option<StackTime> { |
| Some(StackTime(fasync::Time::from_nanos( |
| self.0.into_nanos().checked_sub(i64::try_from(duration.as_nanos()).ok()?)?, |
| ))) |
| } |
| } |
| |
| impl InstantContext for BindingsNonSyncCtxImpl { |
| type Instant = StackTime; |
| |
| fn now(&self) -> StackTime { |
| StackTime(fasync::Time::now()) |
| } |
| } |
| |
| impl CounterContext for BindingsNonSyncCtxImpl { |
| fn increment_counter(&mut self, _key: &'static str) {} |
| } |
| |
| impl RngContext for BindingsNonSyncCtxImpl { |
| type Rng = OsRng; |
| |
| fn rng(&self) -> &OsRng { |
| &self.rng |
| } |
| |
| fn rng_mut(&mut self) -> &mut OsRng { |
| &mut self.rng |
| } |
| } |
| |
| impl TimerContext<TimerId> for BindingsNonSyncCtxImpl { |
| fn schedule_timer_instant(&mut self, time: StackTime, id: TimerId) -> Option<StackTime> { |
| self.timers.schedule_timer(id, time) |
| } |
| |
| fn cancel_timer(&mut self, id: TimerId) -> Option<StackTime> { |
| self.timers.cancel_timer(&id) |
| } |
| |
| fn cancel_timers_with<F: FnMut(&TimerId) -> bool>(&mut self, f: F) { |
| self.timers.cancel_timers_with(f); |
| } |
| |
| fn scheduled_instant(&self, id: TimerId) -> Option<StackTime> { |
| self.timers.scheduled_time(&id) |
| } |
| } |
| |
| impl<B> DeviceLayerEventDispatcher<B> for BindingsNonSyncCtxImpl |
| where |
| B: BufferMut, |
| { |
| fn send_frame<S: Serializer<Buffer = B>>( |
| &mut self, |
| device: DeviceId, |
| frame: S, |
| ) -> Result<(), S> { |
| // TODO(wesleyac): Error handling |
| let frame = frame.serialize_vec_outer().map_err(|(_, ser)| ser)?; |
| let dev = match self.devices.get_core_device_mut(device) { |
| Some(dev) => dev, |
| None => { |
| error!("Tried to send frame on device that is not listed: {:?}", device); |
| return Ok(()); |
| } |
| }; |
| |
| match dev.info_mut() { |
| DeviceSpecificInfo::Ethernet(EthernetInfo { |
| common_info: CommonInfo { admin_enabled, mtu: _, events: _, name: _ }, |
| client, |
| mac: _, |
| features: _, |
| phy_up, |
| }) => { |
| if *admin_enabled && *phy_up { |
| client.send(frame.as_ref()) |
| } |
| } |
| DeviceSpecificInfo::Netdevice(NetdeviceInfo { |
| common_info: CommonInfo { admin_enabled, mtu: _, events: _, name: _ }, |
| handler, |
| mac: _, |
| phy_up, |
| }) => { |
| if *admin_enabled && *phy_up { |
| handler.send(frame.as_ref()).unwrap_or_else(|e| { |
| log::warn!("failed to send frame to {:?}: {:?}", handler, e) |
| }) |
| } |
| } |
| DeviceSpecificInfo::Loopback(LoopbackInfo { .. }) => { |
| unreachable!("loopback must not send packets out of the node") |
| } |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| impl<I> icmp::IcmpContext<I> for BindingsNonSyncCtxImpl |
| where |
| I: socket::datagram::SocketCollectionIpExt<socket::datagram::IcmpEcho> + icmp::IcmpIpExt, |
| { |
| fn receive_icmp_error(&mut self, conn: icmp::IcmpConnId<I>, seq_num: u16, err: I::ErrorCode) { |
| I::get_collection_mut(self).receive_icmp_error(conn, seq_num, err) |
| } |
| } |
| |
| impl<I, B: BufferMut> icmp::BufferIcmpContext<I, B> for BindingsNonSyncCtxImpl |
| where |
| I: socket::datagram::SocketCollectionIpExt<socket::datagram::IcmpEcho> + icmp::IcmpIpExt, |
| IcmpEchoReply: for<'a> IcmpMessage<I, &'a [u8], Code = IcmpUnusedCode>, |
| { |
| fn receive_icmp_echo_reply( |
| &mut self, |
| conn: icmp::IcmpConnId<I>, |
| src_ip: I::Addr, |
| dst_ip: I::Addr, |
| id: u16, |
| seq_num: u16, |
| data: B, |
| ) { |
| I::get_collection_mut(self).receive_icmp_echo_reply(conn, src_ip, dst_ip, id, seq_num, data) |
| } |
| } |
| |
| impl<I> UdpContext<I> for BindingsNonSyncCtxImpl |
| where |
| I: socket::datagram::SocketCollectionIpExt<socket::datagram::Udp> + icmp::IcmpIpExt, |
| { |
| fn receive_icmp_error(&mut self, id: UdpBoundId<I>, err: I::ErrorCode) { |
| I::get_collection_mut(self).receive_icmp_error(id, err) |
| } |
| } |
| |
| impl<I, B: BufferMut> BufferUdpContext<I, B> for BindingsNonSyncCtxImpl |
| where |
| I: socket::datagram::SocketCollectionIpExt<socket::datagram::Udp> + IpExt, |
| { |
| fn receive_udp_from_conn( |
| &mut self, |
| conn: UdpConnId<I>, |
| src_ip: I::Addr, |
| src_port: NonZeroU16, |
| body: &B, |
| ) { |
| I::get_collection_mut(self).receive_udp_from_conn(conn, src_ip, src_port, body) |
| } |
| |
| fn receive_udp_from_listen( |
| &mut self, |
| listener: UdpListenerId<I>, |
| src_ip: I::Addr, |
| dst_ip: I::Addr, |
| src_port: Option<NonZeroU16>, |
| body: &B, |
| ) { |
| I::get_collection_mut(self) |
| .receive_udp_from_listen(listener, src_ip, dst_ip, src_port, body) |
| } |
| } |
| |
| impl<I: Ip> EventContext<netstack3_core::IpDeviceEvent<DeviceId, I>> for BindingsNonSyncCtxImpl { |
| fn on_event(&mut self, event: netstack3_core::IpDeviceEvent<DeviceId, I>) { |
| let (device, event) = match event { |
| netstack3_core::IpDeviceEvent::AddressAdded { device, addr, state } => ( |
| device, |
| InterfaceUpdate::AddressAdded { |
| addr: addr.into(), |
| initial_state: interfaces_watcher::AddressState { |
| valid_until: zx::Time::INFINITE, |
| assignment_state: state, |
| }, |
| }, |
| ), |
| netstack3_core::IpDeviceEvent::AddressRemoved { device, addr } => { |
| (device, InterfaceUpdate::AddressRemoved(addr.into())) |
| } |
| netstack3_core::IpDeviceEvent::AddressStateChanged { device, addr, state } => ( |
| device, |
| InterfaceUpdate::AddressAssignmentStateChanged { |
| addr: addr.into(), |
| new_state: state, |
| }, |
| ), |
| }; |
| self.notify_interface_update(device, event); |
| } |
| } |
| |
| impl<I: Ip> EventContext<netstack3_core::IpLayerEvent<DeviceId, I>> for BindingsNonSyncCtxImpl { |
| fn on_event(&mut self, event: netstack3_core::IpLayerEvent<DeviceId, I>) { |
| let (device, subnet, has_default_route) = match event { |
| netstack3_core::IpLayerEvent::DeviceRouteAdded { device, subnet } => { |
| (device, subnet, true) |
| } |
| netstack3_core::IpLayerEvent::DeviceRouteRemoved { device, subnet } => { |
| (device, subnet, false) |
| } |
| }; |
| // We only care about the default route. |
| if subnet.prefix() != 0 || subnet.network() != I::UNSPECIFIED_ADDRESS { |
| return; |
| } |
| self.notify_interface_update( |
| device, |
| InterfaceUpdate::DefaultRouteChanged { version: I::VERSION, has_default_route }, |
| ); |
| } |
| } |
| |
| impl EventContext<netstack3_core::DadEvent<DeviceId>> for BindingsNonSyncCtxImpl { |
| fn on_event(&mut self, event: netstack3_core::DadEvent<DeviceId>) { |
| match event { |
| netstack3_core::DadEvent::AddressAssigned { device, addr } => { |
| self.on_event(netstack3_core::IpDeviceEvent::<_, Ipv6>::AddressStateChanged { |
| device, |
| addr: *addr, |
| state: netstack3_core::IpAddressState::Assigned, |
| }) |
| } |
| } |
| } |
| } |
| |
| impl EventContext<netstack3_core::Ipv6RouteDiscoveryEvent<DeviceId>> for BindingsNonSyncCtxImpl { |
| fn on_event(&mut self, _event: netstack3_core::Ipv6RouteDiscoveryEvent<DeviceId>) { |
| // TODO(https://fxbug.dev/97203): Update forwarding table in response to |
| // the event. |
| } |
| } |
| |
| impl BindingsNonSyncCtxImpl { |
| fn notify_interface_update(&self, device: DeviceId, event: InterfaceUpdate) { |
| self.devices |
| .get_core_device(device) |
| .unwrap_or_else(|| panic!("issued event {:?} for deleted device {:?}", event, device)) |
| .info() |
| .common_info() |
| .events |
| .notify(event) |
| .expect("interfaces worker closed"); |
| } |
| } |
| |
| trait MutableDeviceState { |
| /// Invoke a function on the state associated with the device `id`. |
| fn update_device_state<F: FnOnce(&mut DeviceInfo)>(&mut self, id: u64, f: F); |
| } |
| |
| impl<NonSyncCtx> MutableDeviceState for Ctx<NonSyncCtx> |
| where |
| NonSyncCtx: NonSyncContext + DeviceStatusNotifier + AsRef<Devices> + AsMut<Devices>, |
| { |
| fn update_device_state<F: FnOnce(&mut DeviceInfo)>(&mut self, id: u64, f: F) { |
| if let Some(device_info) = self.non_sync_ctx.as_mut().get_device_mut(id) { |
| f(device_info); |
| self.non_sync_ctx.device_status_changed(id) |
| } |
| } |
| } |
| |
| trait InterfaceControl { |
| /// Enables an interface. |
| /// |
| /// Both `admin_enabled` and `phy_up` must be true for the interface to be |
| /// enabled. |
| fn enable_interface(&mut self, id: u64) -> Result<(), fidl_net_stack::Error>; |
| |
| /// Disables an interface. |
| /// |
| /// Either an Admin (fidl) or Phy change can disable an interface. |
| fn disable_interface(&mut self, id: u64) -> Result<(), fidl_net_stack::Error>; |
| } |
| |
| fn set_interface_enabled<NonSyncCtx: NonSyncContext + AsRef<Devices> + AsMut<Devices>>( |
| Ctx { sync_ctx, non_sync_ctx }: &mut Ctx<NonSyncCtx>, |
| id: u64, |
| should_enable: bool, |
| ) -> Result<(), fidl_net_stack::Error> { |
| let device = non_sync_ctx.as_mut().get_device_mut(id).ok_or(fidl_net_stack::Error::NotFound)?; |
| let core_id = device.core_id(); |
| |
| let (dev_enabled, events) = match device.info_mut() { |
| DeviceSpecificInfo::Ethernet(EthernetInfo { |
| common_info: CommonInfo { admin_enabled, mtu: _, events, name: _ }, |
| client: _, |
| mac: _, |
| features: _, |
| phy_up, |
| }) |
| | DeviceSpecificInfo::Netdevice(NetdeviceInfo { |
| common_info: CommonInfo { admin_enabled, mtu: _, events, name: _ }, |
| handler: _, |
| mac: _, |
| phy_up, |
| }) => (*admin_enabled && *phy_up, events), |
| DeviceSpecificInfo::Loopback(LoopbackInfo { |
| common_info: CommonInfo { admin_enabled, mtu: _, events, name: _ }, |
| }) => (*admin_enabled, events), |
| }; |
| |
| if should_enable { |
| // We want to enable the interface, but its device is considered |
| // disabled so we do nothing further. |
| // |
| // This can happen when the interface was set to be administratively up |
| // but the phy is down. |
| if !dev_enabled { |
| return Ok(()); |
| } |
| } else { |
| assert!(!dev_enabled, "caller attemped to disable an interface that is considered enabled"); |
| } |
| |
| events |
| .notify(InterfaceUpdate::OnlineChanged(should_enable)) |
| .expect("interfaces worker not running"); |
| |
| update_ipv4_configuration(sync_ctx, non_sync_ctx, core_id, |config| { |
| config.ip_config.ip_enabled = should_enable; |
| }); |
| update_ipv6_configuration(sync_ctx, non_sync_ctx, core_id, |config| { |
| config.ip_config.ip_enabled = should_enable; |
| }); |
| |
| Ok(()) |
| } |
| |
| impl<NonSyncCtx> InterfaceControl for Ctx<NonSyncCtx> |
| where |
| NonSyncCtx: NonSyncContext + AsRef<Devices> + AsMut<Devices>, |
| { |
| fn enable_interface(&mut self, id: u64) -> Result<(), fidl_net_stack::Error> { |
| set_interface_enabled(self, id, true /* should_enable */) |
| } |
| |
| fn disable_interface(&mut self, id: u64) -> Result<(), fidl_net_stack::Error> { |
| set_interface_enabled(self, id, false /* should_enable */) |
| } |
| } |
| |
| type NetstackContext = Arc<Mutex<Ctx<BindingsNonSyncCtxImpl>>>; |
| |
| /// The netstack. |
| /// |
| /// Provides the entry point for creating a netstack to be served as a |
| /// component. |
| #[derive(Clone)] |
| pub struct Netstack { |
| ctx: NetstackContext, |
| interfaces_event_sink: interfaces_watcher::WorkerInterfaceSink, |
| } |
| |
| /// Contains the information needed to start serving a network stack over FIDL. |
| pub struct NetstackSeed { |
| netstack: Netstack, |
| interfaces_worker: interfaces_watcher::Worker, |
| interfaces_watcher_sink: interfaces_watcher::WorkerWatcherSink, |
| } |
| |
| impl Default for NetstackSeed { |
| fn default() -> Self { |
| let (interfaces_worker, interfaces_watcher_sink, interfaces_event_sink) = |
| interfaces_watcher::Worker::new(); |
| Self { |
| netstack: Netstack { ctx: Default::default(), interfaces_event_sink }, |
| interfaces_worker, |
| interfaces_watcher_sink, |
| } |
| } |
| } |
| |
| impl LockableContext for Netstack { |
| type NonSyncCtx = BindingsNonSyncCtxImpl; |
| } |
| |
| impl InterfaceEventProducerFactory for Netstack { |
| fn create_interface_event_producer( |
| &self, |
| id: BindingId, |
| properties: InterfaceProperties, |
| ) -> InterfaceEventProducer { |
| self.interfaces_event_sink |
| .add_interface(id, properties) |
| .expect("interface worker not running") |
| } |
| } |
| |
| enum Service { |
| Stack(fidl_fuchsia_net_stack::StackRequestStream), |
| Socket(fidl_fuchsia_posix_socket::ProviderRequestStream), |
| Interfaces(fidl_fuchsia_net_interfaces::StateRequestStream), |
| InterfacesAdmin(fidl_fuchsia_net_interfaces_admin::InstallerRequestStream), |
| Debug(fidl_fuchsia_net_debug::InterfacesRequestStream), |
| } |
| |
| enum WorkItem { |
| Incoming(Service), |
| Task(fasync::Task<()>), |
| } |
| |
| trait RequestStreamExt: RequestStream { |
| fn serve_with<F, Fut, E>(self, f: F) -> futures::future::Map<Fut, fn(Result<(), E>) -> ()> |
| where |
| E: std::error::Error, |
| F: FnOnce(Self) -> Fut, |
| Fut: Future<Output = Result<(), E>>; |
| } |
| |
| impl<D: DiscoverableProtocolMarker, S: RequestStream<Protocol = D>> RequestStreamExt for S { |
| fn serve_with<F, Fut, E>(self, f: F) -> futures::future::Map<Fut, fn(Result<(), E>) -> ()> |
| where |
| E: std::error::Error, |
| F: FnOnce(Self) -> Fut, |
| Fut: Future<Output = Result<(), E>>, |
| { |
| f(self).map(|res| res.unwrap_or_else(|err| error!("{} error: {}", D::PROTOCOL_NAME, err))) |
| } |
| } |
| |
| impl NetstackSeed { |
| /// Consumes the netstack and starts serving all the FIDL services it |
| /// implements to the outgoing service directory. |
| pub async fn serve(self) -> Result<(), anyhow::Error> { |
| use anyhow::Context as _; |
| |
| debug!("Serving netstack"); |
| |
| let Self { netstack, interfaces_worker, interfaces_watcher_sink } = self; |
| |
| { |
| let mut ctx = netstack.lock().await; |
| let Ctx { sync_ctx, non_sync_ctx } = ctx.deref_mut(); |
| |
| // Add and initialize the loopback interface with the IPv4 and IPv6 |
| // loopback addresses and on-link routes to the loopback subnets. |
| let loopback = netstack3_core::add_loopback_device(sync_ctx, DEFAULT_LOOPBACK_MTU) |
| .expect("error adding loopback device"); |
| let devices: &mut Devices = non_sync_ctx.as_mut(); |
| let _binding_id: u64 = devices |
| .add_device(loopback, |id| { |
| const LOOPBACK_NAME: &'static str = "lo"; |
| let events = netstack.create_interface_event_producer( |
| id, |
| InterfaceProperties { |
| name: LOOPBACK_NAME.to_string(), |
| device_class: fidl_fuchsia_net_interfaces::DeviceClass::Loopback( |
| fidl_fuchsia_net_interfaces::Empty {}, |
| ), |
| }, |
| ); |
| events |
| .notify(InterfaceUpdate::OnlineChanged(true)) |
| .expect("interfaces worker not running"); |
| DeviceSpecificInfo::Loopback(LoopbackInfo { |
| common_info: CommonInfo { |
| mtu: DEFAULT_LOOPBACK_MTU, |
| admin_enabled: true, |
| events, |
| name: LOOPBACK_NAME.to_string(), |
| }, |
| }) |
| }) |
| .expect("error adding loopback device"); |
| // Don't need DAD and IGMP/MLD on loopback. |
| update_ipv4_configuration(sync_ctx, non_sync_ctx, loopback, |config| { |
| *config = Ipv4DeviceConfiguration { |
| ip_config: IpDeviceConfiguration { ip_enabled: true, gmp_enabled: false }, |
| }; |
| }); |
| update_ipv6_configuration(sync_ctx, non_sync_ctx, loopback, |config| { |
| *config = Ipv6DeviceConfiguration { |
| dad_transmits: None, |
| max_router_solicitations: None, |
| slaac_config: SlaacConfiguration { |
| enable_stable_addresses: true, |
| temporary_address_configuration: None, |
| }, |
| ip_config: IpDeviceConfiguration { ip_enabled: true, gmp_enabled: false }, |
| }; |
| }); |
| add_ip_addr_subnet( |
| sync_ctx, |
| non_sync_ctx, |
| loopback, |
| AddrSubnetEither::V4( |
| AddrSubnet::from_witness( |
| Ipv4::LOOPBACK_ADDRESS, |
| Ipv4::LOOPBACK_SUBNET.prefix(), |
| ) |
| .expect("error creating IPv4 loopback AddrSub"), |
| ), |
| ) |
| .expect("error adding IPv4 loopback address"); |
| add_route( |
| sync_ctx, |
| non_sync_ctx, |
| AddableEntryEither::new(Ipv4::LOOPBACK_SUBNET.into(), Some(loopback), None) |
| .expect("error creating IPv4 route entry"), |
| ) |
| .expect("error adding IPv4 loopback on-link subnet route"); |
| add_ip_addr_subnet( |
| sync_ctx, |
| non_sync_ctx, |
| loopback, |
| AddrSubnetEither::V6( |
| AddrSubnet::from_witness( |
| Ipv6::LOOPBACK_ADDRESS, |
| Ipv6::LOOPBACK_SUBNET.prefix(), |
| ) |
| .expect("error creating IPv6 loopback AddrSub"), |
| ), |
| ) |
| .expect("error adding IPv6 loopback address"); |
| add_route( |
| sync_ctx, |
| non_sync_ctx, |
| AddableEntryEither::new(Ipv6::LOOPBACK_SUBNET.into(), Some(loopback), None) |
| .expect("error creating IPv6 route entry"), |
| ) |
| .expect("error adding IPv6 loopback on-link subnet route"); |
| |
| // Start servicing timers. |
| let BindingsNonSyncCtxImpl { |
| rng: _, |
| timers, |
| devices: _, |
| icmp_echo_sockets: _, |
| udp_sockets: _, |
| } = non_sync_ctx; |
| timers.spawn(netstack.clone()); |
| } |
| |
| let interfaces_worker_task = fuchsia_async::Task::spawn(async move { |
| let result = interfaces_worker.run().await; |
| // The worker is not expected to end for the lifetime of the stack. |
| panic!("interfaces worker finished unexpectedly {:?}", result); |
| }); |
| |
| let mut fs = ServiceFs::new_local(); |
| let _: &mut ServiceFsDir<'_, _> = fs |
| .dir("svc") |
| .add_fidl_service(Service::Debug) |
| .add_fidl_service(Service::Stack) |
| .add_fidl_service(Service::Socket) |
| .add_fidl_service(Service::Interfaces) |
| .add_fidl_service(Service::InterfacesAdmin); |
| |
| let services = fs.take_and_serve_directory_handle().context("directory handle")?; |
| |
| // Buffer size doesn't matter much, we're just trying to reduce |
| // allocations. |
| const TASK_CHANNEL_BUFFER_SIZE: usize = 16; |
| let (task_sink, task_stream) = mpsc::channel(TASK_CHANNEL_BUFFER_SIZE); |
| let work_items = futures::stream::select( |
| services.map(WorkItem::Incoming), |
| task_stream.map(WorkItem::Task), |
| ); |
| let work_items_fut = work_items.for_each_concurrent(None, |wi| async { |
| match wi { |
| WorkItem::Incoming(Service::Stack(stack)) => { |
| stack |
| .serve_with(|rs| { |
| stack_fidl_worker::StackFidlWorker::serve(netstack.clone(), rs) |
| }) |
| .await |
| } |
| WorkItem::Incoming(Service::Socket(socket)) => { |
| socket.serve_with(|rs| socket::serve(netstack.clone(), rs)).await |
| } |
| WorkItem::Incoming(Service::Interfaces(interfaces)) => { |
| interfaces |
| .serve_with(|rs| { |
| interfaces_watcher::serve(rs, interfaces_watcher_sink.clone()) |
| }) |
| .await |
| } |
| WorkItem::Incoming(Service::InterfacesAdmin(installer)) => { |
| log::debug!( |
| "serving {}", |
| fidl_fuchsia_net_interfaces_admin::InstallerMarker::PROTOCOL_NAME |
| ); |
| interfaces_admin::serve(netstack.clone(), installer) |
| .map_err(anyhow::Error::from) |
| .forward(task_sink.clone().sink_map_err(anyhow::Error::from)) |
| .await |
| .unwrap_or_else(|e| { |
| log::warn!( |
| "error serving {}: {:?}", |
| fidl_fuchsia_net_interfaces_admin::InstallerMarker::PROTOCOL_NAME, |
| e |
| ) |
| }) |
| } |
| WorkItem::Incoming(Service::Debug(debug)) => { |
| debug.serve_with(|rs| debug_fidl_worker::serve(netstack.clone(), rs)).await |
| } |
| WorkItem::Task(task) => task.await, |
| } |
| }); |
| |
| let ((), ()) = futures::future::join(work_items_fut, interfaces_worker_task).await; |
| debug!("Services stream finished"); |
| Ok(()) |
| } |
| } |