blob: 46d2851a18edf8937dee7a544df6ccbc78381f63 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::{HashMap, VecDeque};
use super::{devices::BindingId, util::IntoFidl};
use fidl::prelude::*;
use fidl_fuchsia_net as fnet;
use fidl_fuchsia_net_interfaces::{
self as finterfaces, StateRequest, StateRequestStream, WatcherRequest, WatcherRequestStream,
WatcherWatchResponder,
};
use fidl_fuchsia_net_interfaces_ext as finterfaces_ext;
use fuchsia_zircon as zx;
use futures::{
channel::mpsc, ready, sink::SinkExt as _, task::Poll, Future, FutureExt as _, StreamExt as _,
TryFutureExt as _, TryStreamExt as _,
};
use net_types::ip::{AddrSubnetEither, IpAddr, IpVersion};
use netstack3_core::ip::IpAddressState;
/// Possible errors when serving `fuchsia.net.interfaces/State`.
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("failed to send a Watcher task to parent")]
Send(#[from] WorkerClosedError),
#[error(transparent)]
Fidl(#[from] fidl::Error),
}
#[cfg_attr(test, derive(Default))]
pub(crate) struct WatcherOptions {
address_properties_interest: finterfaces::AddressPropertiesInterest,
include_non_assigned_addresses: bool,
}
/// Serves the `fuchsia.net.interfaces/State` protocol.
pub(crate) async fn serve(
stream: StateRequestStream,
sink: WorkerWatcherSink,
) -> Result<(), Error> {
stream
.err_into()
.try_fold(sink, |mut sink, req| async move {
let _ = &req;
let StateRequest::GetWatcher {
options:
finterfaces::WatcherOptions {
address_properties_interest,
include_non_assigned_addresses,
..
},
watcher,
control_handle: _,
} = req;
let watcher = watcher.into_stream()?;
sink.add_watcher(
watcher,
WatcherOptions {
address_properties_interest: address_properties_interest
.unwrap_or(finterfaces::AddressPropertiesInterest::default()),
include_non_assigned_addresses: include_non_assigned_addresses.unwrap_or(false),
},
)
.await?;
Ok(sink)
})
.map_ok(|_: WorkerWatcherSink| ())
.await
}
/// The maximum events to buffer at server side before the client consumes them.
///
/// The value is currently kept in sync with the netstack2 implementation.
const MAX_EVENTS: usize = 128;
#[derive(Debug)]
/// A bounded queue of [`Events`] for `fuchsia.net.interfaces/Watcher` protocol.
struct EventQueue {
events: VecDeque<finterfaces::Event>,
}
impl EventQueue {
/// Creates a new event queue containing all the interfaces in `state`
/// wrapped in a [`finterfaces::Event::Existing`] followed by a
/// [`finterfaces::Event::Idle`].
fn from_state(
state: &HashMap<BindingId, InterfaceState>,
include_non_assigned_addresses: bool,
) -> Result<Self, zx::Status> {
// NB: Leave room for idle event.
if state.len() >= MAX_EVENTS {
return Err(zx::Status::BUFFER_TOO_SMALL);
}
// NB: Compiler can't infer the parameter types.
let state_to_event = |(id, state): (&BindingId, &InterfaceState)| {
let InterfaceState {
properties: InterfaceProperties { name, device_class },
addresses,
has_default_ipv4_route,
has_default_ipv6_route,
online,
} = state;
let mut event = finterfaces::Event::Existing(
finterfaces_ext::Properties {
id: *id,
name: name.clone(),
device_class: device_class.clone(),
online: *online,
addresses: Worker::collect_addresses(addresses),
has_default_ipv4_route: *has_default_ipv4_route,
has_default_ipv6_route: *has_default_ipv6_route,
}
.into(),
);
filter_addresses(&mut event, include_non_assigned_addresses);
event
};
Ok(Self {
events: state
.iter()
.map(state_to_event)
.chain(std::iter::once(finterfaces::Event::Idle(finterfaces::Empty {})))
.collect(),
})
}
/// Adds a [`finterfaces::Event`] to the back of the queue.
fn push(&mut self, event: finterfaces::Event) -> Result<(), finterfaces::Event> {
let Self { events } = self;
if events.len() >= MAX_EVENTS {
return Err(event);
}
// NB: We could perform event consolidation here, but that's not
// implemented in NS2 at the moment of writing so it's easier to match
// behavior.
events.push_back(event);
Ok(())
}
/// Removes an [`Event`] from the front of the queue.
fn pop_front(&mut self) -> Option<finterfaces::Event> {
let Self { events } = self;
events.pop_front()
}
}
/// The task that serves `fuchsia.net.interfaces/Watcher`.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub(crate) struct Watcher {
stream: WatcherRequestStream,
options: WatcherOptions,
events: EventQueue,
responder: Option<WatcherWatchResponder>,
}
impl Future for Watcher {
type Output = Result<(), fidl::Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
loop {
let next_request = self.as_mut().stream.poll_next_unpin(cx)?;
match ready!(next_request) {
Some(WatcherRequest::Watch { responder }) => match self.events.pop_front() {
Some(e) => responder
.send(&e)
.unwrap_or_else(|e| tracing::error!("failed to respond: {e:?}")),
None => match &self.responder {
Some(existing) => {
existing
.control_handle()
.shutdown_with_epitaph(zx::Status::ALREADY_EXISTS);
return Poll::Ready(Ok(()));
}
None => {
self.responder = Some(responder);
}
},
},
None => return Poll::Ready(Ok(())),
}
}
}
}
fn filter_addresses(event: &mut finterfaces::Event, include_non_assigned_addresses: bool) {
let addresses = match event {
finterfaces::Event::Existing(finterfaces::Properties { addresses, .. })
| finterfaces::Event::Added(finterfaces::Properties { addresses, .. })
| finterfaces::Event::Changed(finterfaces::Properties { addresses, .. }) => {
addresses.as_mut()
}
finterfaces::Event::Idle(finterfaces::Empty {}) => None,
finterfaces::Event::Removed(id) => {
let _: &mut u64 = id;
None
}
};
if let Some(addresses) = addresses {
addresses.retain(|finterfaces::Address { assignment_state, .. }| {
match assignment_state.expect("required field") {
finterfaces::AddressAssignmentState::Assigned => true,
finterfaces::AddressAssignmentState::Unavailable
| finterfaces::AddressAssignmentState::Tentative => include_non_assigned_addresses,
}
})
}
}
impl Watcher {
fn push(&mut self, mut event: finterfaces::Event) {
let Self {
stream,
events,
responder,
options:
WatcherOptions { address_properties_interest: _, include_non_assigned_addresses },
} = self;
filter_addresses(&mut event, *include_non_assigned_addresses);
if let Some(responder) = responder.take() {
match responder.send(&event) {
Ok(()) => (),
Err(e) => tracing::error!("error sending event {:?} to watcher: {:?}", event, e),
}
return;
}
match events.push(event) {
Ok(()) => (),
Err(event) => {
tracing::warn!("failed to enqueue event {:?} on watcher, closing channel", event);
stream.control_handle().shutdown();
}
}
}
}
/// Interface specific events.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, Eq, PartialEq))]
pub(crate) enum InterfaceUpdate {
AddressAdded { addr: AddrSubnetEither, assignment_state: IpAddressState, valid_until: zx::Time },
AddressAssignmentStateChanged { addr: IpAddr, new_state: IpAddressState },
AddressPropertiesChanged { addr: IpAddr, update: AddressPropertiesUpdate },
AddressRemoved(IpAddr),
DefaultRouteChanged { version: IpVersion, has_default_route: bool },
OnlineChanged(bool),
}
/// Changes to address properties (e.g. via interfaces-admin).
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, Eq, PartialEq))]
pub(crate) struct AddressPropertiesUpdate {
/// The new value for `valid_until`.
pub(crate) valid_until: zx::Time,
}
/// Immutable interface properties.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, Eq, PartialEq))]
pub(crate) struct InterfaceProperties {
pub(crate) name: String,
pub(crate) device_class: finterfaces::DeviceClass,
}
/// Cached interface state by the worker.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, Eq, PartialEq))]
pub(crate) struct InterfaceState {
properties: InterfaceProperties,
online: bool,
addresses: HashMap<IpAddr, AddressProperties>,
has_default_ipv4_route: bool,
has_default_ipv6_route: bool,
}
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, Eq, PartialEq))]
struct AddressProperties {
prefix_len: u8,
state: AddressState,
}
/// Cached address state by the worker.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, Eq, PartialEq))]
pub(crate) struct AddressState {
pub(crate) valid_until: zx::Time,
pub(crate) assignment_state: IpAddressState,
}
#[derive(Debug)]
#[cfg_attr(test, derive(Clone))]
pub(crate) enum InterfaceEvent {
Added { id: BindingId, properties: InterfaceProperties },
Changed { id: BindingId, event: InterfaceUpdate },
Removed(BindingId),
}
#[derive(Debug)]
pub(crate) struct InterfaceEventProducer {
id: BindingId,
channel: mpsc::UnboundedSender<InterfaceEvent>,
}
impl InterfaceEventProducer {
/// Notifies the interface state [`Worker`] of [`event`] on this
/// [`InterfaceEventProducer`]'s interface.
pub(crate) fn notify(&self, event: InterfaceUpdate) -> Result<(), InterfaceUpdate> {
let Self { id, channel } = self;
channel.unbounded_send(InterfaceEvent::Changed { id: *id, event }).map_err(|e| {
match e.into_inner() {
InterfaceEvent::Changed { id: _, event } => event,
// All other patterns are unreachable, this is only so we can get
// back the event we just created above.
e => unreachable!("{:?}", e),
}
})
}
}
impl Drop for InterfaceEventProducer {
fn drop(&mut self) {
let Self { id, channel } = self;
channel.unbounded_send(InterfaceEvent::Removed(*id)).unwrap_or_else(
|_: mpsc::TrySendError<_>| {
// If the worker was closed before its producers we can assume
// it's no longer interested in events, so we simply drop these
// errors.
},
)
}
}
#[derive(thiserror::Error, Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub(crate) enum WorkerError {
#[error("attempted to reinsert interface {interface} over old {old:?}")]
AddedDuplicateInterface { interface: BindingId, old: InterfaceState },
#[error("attempted to remove nonexisting interface with id {0}")]
RemoveNonexistentInterface(BindingId),
#[error("attempted to update nonexisting interface with id {0}")]
UpdateNonexistentInterface(BindingId),
#[error("attempted to assign already assigned address {addr} on interface {interface}")]
AssignExistingAddr { interface: BindingId, addr: IpAddr },
#[error("attempted to unassign nonexisting interface address {addr} on interface {interface}")]
UnassignNonexistentAddr { interface: BindingId, addr: IpAddr },
#[error("attempted to update assignment state to {state:?} on non existing interface address {addr} on interface {interface}")]
UpdateStateOnNonexistentAddr { interface: BindingId, addr: IpAddr, state: IpAddressState },
#[error("attempted to update properties with {update:?} on non existing interface address {addr} on interface {interface}")]
UpdatePropertiesOnNonexistentAddr {
interface: BindingId,
addr: IpAddr,
update: AddressPropertiesUpdate,
},
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum ChangedAddressProperties {
InterestNotApplicable,
PropertiesChanged {
address_properties: finterfaces::AddressPropertiesInterest,
is_assigned: bool,
},
AssignmentStateChanged {
involves_assigned: bool,
},
}
#[derive(Default)]
pub(crate) struct RunOptions {
#[cfg(test)]
pub(crate) spy_interface_events:
Option<futures::channel::mpsc::UnboundedSender<InterfaceEvent>>,
}
pub(crate) struct Worker {
events: mpsc::UnboundedReceiver<InterfaceEvent>,
watchers: mpsc::Receiver<NewWatcher>,
pub(crate) run_options: RunOptions,
}
/// Arbitrarily picked constant to force backpressure on FIDL requests.
const WATCHER_CHANNEL_CAPACITY: usize = 32;
fn is_assigned(state: IpAddressState) -> bool {
match state {
IpAddressState::Assigned => true,
IpAddressState::Unavailable | IpAddressState::Tentative => false,
}
}
impl Worker {
pub(crate) fn new() -> (Worker, WorkerWatcherSink, WorkerInterfaceSink) {
let (events_sender, events_receiver) = mpsc::unbounded();
let (watchers_sender, watchers_receiver) = mpsc::channel(WATCHER_CHANNEL_CAPACITY);
(
Worker {
events: events_receiver,
watchers: watchers_receiver,
run_options: RunOptions::default(),
},
WorkerWatcherSink { sender: watchers_sender },
WorkerInterfaceSink { sender: events_sender },
)
}
/// Runs the worker until all [`WorkerWatcherSink`]s and
/// [`WorkerInterfaceSink`]s are closed.
///
/// On success, returns the set of currently opened [`Watcher`]s that the
/// `Worker` was polling on when all its sinks were closed.
pub(crate) async fn run(
self,
) -> Result<futures::stream::FuturesUnordered<Watcher>, WorkerError> {
let Self { events, watchers: watchers_stream, run_options } = self;
let mut current_watchers = futures::stream::FuturesUnordered::<Watcher>::new();
let mut interface_state = HashMap::new();
enum SinkAction {
NewWatcher(NewWatcher),
Event(InterfaceEvent),
}
let mut sink_actions = futures::stream::select_with_strategy(
watchers_stream.map(SinkAction::NewWatcher),
events.map(SinkAction::Event),
// Always consume events before watchers. That allows external
// observers to assume all side effects of a call are already
// applied before a watcher observes its initial existing set of
// properties.
|_: &mut ()| futures::stream::PollNext::Right,
);
loop {
let mut poll_watchers = if current_watchers.is_empty() {
futures::future::pending().left_future()
} else {
current_watchers.by_ref().next().right_future()
};
// NB: Declare an enumeration with actions to prevent too much logic
// in select macro.
enum Action {
WatcherEnded(Option<Result<(), fidl::Error>>),
Sink(Option<SinkAction>),
}
let action = futures::select! {
r = poll_watchers => Action::WatcherEnded(r),
a = sink_actions.next() => Action::Sink(a),
};
match action {
Action::WatcherEnded(r) => match r {
Some(Ok(())) => {}
Some(Err(e)) => {
if !e.is_closed() {
tracing::error!("error operating interface watcher {:?}", e);
}
}
// This should not be observable since we check if our
// watcher collection is empty above and replace it with a
// pending future.
None => unreachable!("should not observe end of FuturesUnordered"),
},
Action::Sink(Some(SinkAction::NewWatcher(NewWatcher {
watcher: stream,
options:
options @ WatcherOptions {
address_properties_interest: _,
include_non_assigned_addresses,
},
}))) => {
match EventQueue::from_state(&interface_state, include_non_assigned_addresses) {
Ok(events) => current_watchers.push(Watcher {
stream,
options,
events,
responder: None,
}),
Err(status) => {
tracing::warn!("failed to construct events for watcher: {}", status);
stream.control_handle().shutdown_with_epitaph(status);
}
}
}
Action::Sink(Some(SinkAction::Event(e))) => {
tracing::debug!("consuming event {:?}", e);
#[cfg(test)]
{
let RunOptions { spy_interface_events } = &run_options;
match spy_interface_events {
Some(sink) => sink.unbounded_send(e.clone()).unwrap_or(
// To simplify teardown, we allow tests to
// drop the spy receiver.
(),
),
None => (),
};
}
#[cfg(not(test))]
let RunOptions {} = &run_options;
let is_address_visible = |involves_assigned, include_non_assigned_addresses| {
// The address is visible if the change involves an
// assigned address since all watchers receive updates
// involving an assigned address. If the address change
// involves state(s) that are not considered assigned,
// then the change is only visible to watchers that
// request addresses that are not assigned.
involves_assigned || include_non_assigned_addresses
};
match Self::consume_event(&mut interface_state, e)? {
None => tracing::debug!("not publishing No-Op event."),
Some((event, changed_address_properties)) => {
let num_published = current_watchers
.iter_mut()
.filter_map(|watcher| {
let WatcherOptions {
address_properties_interest,
include_non_assigned_addresses,
} = &watcher.options;
let should_push = match &changed_address_properties {
ChangedAddressProperties::InterestNotApplicable => true,
ChangedAddressProperties::PropertiesChanged {
address_properties,
is_assigned,
} => {
address_properties_interest
.intersects(address_properties.clone())
&& is_address_visible(
*is_assigned,
*include_non_assigned_addresses,
)
}
ChangedAddressProperties::AssignmentStateChanged {
involves_assigned,
} => is_address_visible(
*involves_assigned,
*include_non_assigned_addresses,
),
};
// TODO(https://fxbug.dev/42061967): Mask address properties fields
// from address-added events according to watcher interest.
if should_push {
watcher.push(event.clone());
}
// Filter out watchers that didn't receive
// the event, so that calling `count()`
// returns the number of published events.
should_push.then_some(())
})
.count();
tracing::debug!(
"published event to {} of {} watchers",
num_published,
current_watchers.len()
);
}
}
}
// If all of the sinks close, shutdown the worker.
Action::Sink(None) => {
return Ok(current_watchers);
}
}
}
}
/// Consumes a single worker event, mutating state.
///
/// On `Err`, the worker must be stopped and `state` can't be considered
/// valid anymore.
fn consume_event(
state: &mut HashMap<BindingId, InterfaceState>,
event: InterfaceEvent,
) -> Result<Option<(finterfaces::Event, ChangedAddressProperties)>, WorkerError> {
match event {
InterfaceEvent::Added {
id,
properties: InterfaceProperties { name, device_class },
} => {
let online = false;
let has_default_ipv4_route = false;
let has_default_ipv6_route = false;
match state.insert(
id,
InterfaceState {
properties: InterfaceProperties { name: name.clone(), device_class },
online,
addresses: HashMap::new(),
has_default_ipv4_route,
has_default_ipv6_route,
},
) {
Some(old) => Err(WorkerError::AddedDuplicateInterface { interface: id, old }),
None => Ok(Some((
finterfaces::Event::Added(
finterfaces_ext::Properties {
id,
name,
device_class,
online,
addresses: Vec::new(),
has_default_ipv4_route,
has_default_ipv6_route,
}
.into(),
),
ChangedAddressProperties::InterestNotApplicable,
))),
}
}
InterfaceEvent::Removed(rm) => match state.remove(&rm) {
Some(InterfaceState { .. }) => Ok(Some((
finterfaces::Event::Removed(rm.get()),
ChangedAddressProperties::InterestNotApplicable,
))),
None => Err(WorkerError::RemoveNonexistentInterface(rm)),
},
InterfaceEvent::Changed { id, event } => {
let InterfaceState {
properties: _,
online,
addresses,
has_default_ipv4_route,
has_default_ipv6_route,
} = state
.get_mut(&id)
.ok_or_else(|| WorkerError::UpdateNonexistentInterface(id))?;
match event {
InterfaceUpdate::AddressAdded { addr, assignment_state, valid_until } => {
let (addr, prefix_len) = addr.addr_prefix();
let addr = *addr;
match addresses.insert(
addr,
AddressProperties {
prefix_len,
state: AddressState { assignment_state, valid_until },
},
) {
Some(AddressProperties { .. }) => {
Err(WorkerError::AssignExistingAddr { interface: id, addr })
}
None => Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(Self::collect_addresses(addresses)),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged {
involves_assigned: is_assigned(assignment_state),
},
))),
}
}
InterfaceUpdate::AddressAssignmentStateChanged { addr, new_state } => {
let AddressProperties {
prefix_len: _,
state: AddressState { assignment_state, valid_until: _ },
} = addresses.get_mut(&addr).ok_or_else(|| {
WorkerError::UpdateStateOnNonexistentAddr {
interface: id,
addr,
state: new_state,
}
})?;
if *assignment_state == new_state {
return Ok(None);
}
let involves_assigned =
is_assigned(*assignment_state) || is_assigned(new_state);
*assignment_state = new_state;
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(Self::collect_addresses(addresses)),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned },
)))
}
InterfaceUpdate::AddressRemoved(addr) => match addresses.remove(&addr) {
Some(AddressProperties {
prefix_len: _,
state: AddressState { assignment_state, valid_until: _ },
}) => Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: (Some(Self::collect_addresses(addresses))),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged {
involves_assigned: is_assigned(assignment_state),
},
))),
None => Err(WorkerError::UnassignNonexistentAddr { interface: id, addr }),
},
InterfaceUpdate::DefaultRouteChanged {
version,
has_default_route: new_value,
} => {
let mut table =
finterfaces::Properties { id: Some(id.get()), ..Default::default() };
let (state, prop) = match version {
IpVersion::V4 => {
(has_default_ipv4_route, &mut table.has_default_ipv4_route)
}
IpVersion::V6 => {
(has_default_ipv6_route, &mut table.has_default_ipv6_route)
}
};
Ok((*state != new_value)
.then(|| {
*state = new_value;
*prop = Some(new_value);
})
.map(move |()| {
(
finterfaces::Event::Changed(table),
ChangedAddressProperties::InterestNotApplicable,
)
}))
}
InterfaceUpdate::OnlineChanged(new_online) => {
Ok((*online != new_online).then(|| {
*online = new_online;
(
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
online: Some(new_online),
..Default::default()
}),
ChangedAddressProperties::InterestNotApplicable,
)
}))
}
InterfaceUpdate::AddressPropertiesChanged {
addr,
update: update @ AddressPropertiesUpdate { valid_until: new_valid_until },
} => {
let AddressState { assignment_state, valid_until } =
match addresses.get_mut(&addr) {
Some(AddressProperties { prefix_len: _, state }) => state,
None => {
return Err(WorkerError::UpdatePropertiesOnNonexistentAddr {
interface: id,
addr,
update,
})
}
};
if new_valid_until == core::mem::replace(valid_until, new_valid_until) {
return Ok(None);
}
let is_assigned = is_assigned(*assignment_state);
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(Self::collect_addresses(addresses)),
..Default::default()
}),
// TODO(https://fxbug.dev/42056818): once preferred lifetimes
// are supported, we need to respect (dis)interest in them
// too.
ChangedAddressProperties::PropertiesChanged {
address_properties:
finterfaces::AddressPropertiesInterest::VALID_UNTIL,
is_assigned,
},
)))
}
}
}
}
}
fn collect_addresses<T: SortableInterfaceAddress>(
addrs: &HashMap<IpAddr, AddressProperties>,
) -> Vec<T> {
let mut addrs = addrs
.iter()
.map(
|(
addr,
AddressProperties {
prefix_len,
state: AddressState { valid_until, assignment_state },
},
)| {
finterfaces_ext::Address {
addr: fnet::Subnet { addr: addr.into_fidl(), prefix_len: *prefix_len },
valid_until: valid_until.into_nanos(),
assignment_state: assignment_state.into_fidl(),
}
.into()
},
)
.collect::<Vec<T>>();
// Provide a stably ordered vector of addresses.
addrs.sort_by_key(|addr| addr.get_sort_key());
addrs
}
}
/// This trait enables the implementation of [`Worker::collect_addresses`] to be
/// agnostic to extension and pure FIDL types, it is not meant to be used in
/// other contexts.
trait SortableInterfaceAddress: From<finterfaces_ext::Address> {
type Key: Ord;
fn get_sort_key(&self) -> Self::Key;
}
impl SortableInterfaceAddress for finterfaces_ext::Address {
type Key = fnet::Subnet;
fn get_sort_key(&self) -> fnet::Subnet {
self.addr.clone()
}
}
impl SortableInterfaceAddress for finterfaces::Address {
type Key = Option<fnet::Subnet>;
fn get_sort_key(&self) -> Option<fnet::Subnet> {
self.addr.clone()
}
}
struct NewWatcher {
watcher: finterfaces::WatcherRequestStream,
options: WatcherOptions,
}
#[derive(thiserror::Error, Debug)]
#[error("Connection to interfaces worker closed")]
pub(crate) struct WorkerClosedError {}
#[derive(Clone)]
pub(crate) struct WorkerWatcherSink {
sender: mpsc::Sender<NewWatcher>,
}
impl WorkerWatcherSink {
/// Adds a new interface watcher to be operated on by [`Worker`].
pub(crate) async fn add_watcher(
&mut self,
watcher: finterfaces::WatcherRequestStream,
options: WatcherOptions,
) -> Result<(), WorkerClosedError> {
self.sender
.send(NewWatcher { watcher, options })
.await
.map_err(|_: mpsc::SendError| WorkerClosedError {})
}
}
#[derive(Clone)]
pub(crate) struct WorkerInterfaceSink {
sender: mpsc::UnboundedSender<InterfaceEvent>,
}
impl WorkerInterfaceSink {
/// Adds a new interface `id` with fixed properties `properties`.
///
/// Added interfaces are always assumed to be offline and have no assigned
/// address or default routes.
///
/// The returned [`InterfaceEventProducer`] can be used to feed interface
/// changes to be notified to FIDL watchers. On drop,
/// `InterfaceEventProducer` notifies the [`Worker`] that the interface was
/// removed.
///
/// Note that the [`Worker`] will exit with an error if two interfaces with
/// the same identifier are created at the same time, but that is not
/// observable from `add_interface`. It does not provide guardrails to
/// prevent identifier reuse, however.
pub(crate) fn add_interface(
&self,
id: BindingId,
properties: InterfaceProperties,
) -> Result<InterfaceEventProducer, WorkerClosedError> {
self.sender
.unbounded_send(InterfaceEvent::Added { id, properties })
.map_err(|_: mpsc::TrySendError<_>| WorkerClosedError {})?;
Ok(InterfaceEventProducer { id, channel: self.sender.clone() })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bindings::util::TryIntoCore as _;
use assert_matches::assert_matches;
use const_unwrap::const_unwrap_option;
use fidl_fuchsia_hardware_network as fhardware_network;
use fixture::fixture;
use futures::Stream;
use itertools::Itertools as _;
use net_types::{
ip::{AddrSubnet, IpAddress as _, Ipv6, Ipv6Addr},
Witness as _,
};
use std::{
convert::{TryFrom as _, TryInto as _},
num::NonZeroU64,
pin::pin,
};
use test_case::test_case;
impl WorkerWatcherSink {
fn create_watcher(&mut self) -> finterfaces::WatcherProxy {
let (watcher, stream) =
fidl::endpoints::create_proxy_and_stream::<finterfaces::WatcherMarker>()
.expect("create proxy");
self.add_watcher(stream, WatcherOptions::default())
.now_or_never()
.expect("unexpected backpressure on sink")
.expect("failed to send watcher to worker");
watcher
}
fn create_watcher_event_stream(
&mut self,
) -> impl Stream<Item = finterfaces::Event> + Unpin {
futures::stream::unfold(self.create_watcher(), |watcher| {
watcher.watch().map(move |e| match e {
Ok(event) => Some((event, watcher)),
Err(e) => {
if e.is_closed() {
None
} else {
panic!("error fetching next event on watcher {:?}", e);
}
}
})
})
}
}
async fn with_worker<
Fut: Future<Output = ()>,
F: FnOnce(WorkerWatcherSink, WorkerInterfaceSink) -> Fut,
>(
_name: &str,
f: F,
) {
let (worker, watcher_sink, interface_sink) = Worker::new();
let (r, ()) = futures::future::join(worker.run(), f(watcher_sink, interface_sink)).await;
let watchers = r.expect("worker failed");
let () = watchers.try_collect().await.expect("watchers error");
}
const IFACE1_ID: BindingId = const_unwrap_option(NonZeroU64::new(111));
const IFACE1_NAME: &str = "iface1";
const IFACE1_CLASS: finterfaces::DeviceClass =
finterfaces::DeviceClass::Device(fhardware_network::DeviceClass::Ethernet);
const IFACE2_ID: BindingId = const_unwrap_option(NonZeroU64::new(222));
const IFACE2_NAME: &str = "iface2";
const IFACE2_CLASS: finterfaces::DeviceClass =
finterfaces::DeviceClass::Loopback(finterfaces::Empty {});
/// Tests full integration between [`Worker`] and [`Watcher`]s through basic
/// state updates.
#[fixture(with_worker)]
#[fuchsia_async::run_singlethreaded(test)]
async fn basic_state_updates(
mut watcher_sink: WorkerWatcherSink,
interface_sink: WorkerInterfaceSink,
) {
let mut watcher = watcher_sink.create_watcher_event_stream();
assert_eq!(watcher.next().await, Some(finterfaces::Event::Idle(finterfaces::Empty {})));
let producer = interface_sink
.add_interface(
IFACE1_ID,
InterfaceProperties { name: IFACE1_NAME.to_string(), device_class: IFACE1_CLASS },
)
.expect("add interface");
assert_eq!(
watcher.next().await,
Some(finterfaces::Event::Added(
finterfaces_ext::Properties {
id: IFACE1_ID,
addresses: Vec::new(),
online: false,
device_class: IFACE1_CLASS,
has_default_ipv4_route: false,
has_default_ipv6_route: false,
name: IFACE1_NAME.to_string(),
}
.into()
))
);
let addr1 = AddrSubnetEither::V6(
AddrSubnet::new(*Ipv6::LOOPBACK_IPV6_ADDRESS, Ipv6Addr::BYTES * 8).unwrap(),
);
const ADDR_VALID_UNTIL: zx::Time = zx::Time::from_nanos(12345);
let base_properties =
finterfaces::Properties { id: Some(IFACE1_ID.get()), ..Default::default() };
for (event, expect) in [
(
InterfaceUpdate::AddressAdded {
addr: addr1.clone(),
assignment_state: IpAddressState::Assigned,
valid_until: ADDR_VALID_UNTIL,
},
finterfaces::Event::Changed(finterfaces::Properties {
addresses: Some(vec![finterfaces_ext::Address {
addr: addr1.clone().into_fidl(),
valid_until: ADDR_VALID_UNTIL.into_nanos(),
assignment_state: finterfaces::AddressAssignmentState::Assigned,
}
.into()]),
..base_properties.clone()
}),
),
(
InterfaceUpdate::DefaultRouteChanged {
version: IpVersion::V4,
has_default_route: true,
},
finterfaces::Event::Changed(finterfaces::Properties {
has_default_ipv4_route: Some(true),
..base_properties.clone()
}),
),
(
InterfaceUpdate::DefaultRouteChanged {
version: IpVersion::V6,
has_default_route: true,
},
finterfaces::Event::Changed(finterfaces::Properties {
has_default_ipv6_route: Some(true),
..base_properties.clone()
}),
),
(
InterfaceUpdate::DefaultRouteChanged {
version: IpVersion::V6,
has_default_route: false,
},
finterfaces::Event::Changed(finterfaces::Properties {
has_default_ipv6_route: Some(false),
..base_properties.clone()
}),
),
(
InterfaceUpdate::OnlineChanged(true),
finterfaces::Event::Changed(finterfaces::Properties {
online: Some(true),
..base_properties.clone()
}),
),
] {
producer.notify(event).expect("notify event");
assert_eq!(watcher.next().await, Some(expect));
}
// Install a new watcher and observe accumulated interface state.
let mut new_watcher = watcher_sink.create_watcher_event_stream();
assert_eq!(
new_watcher.next().await,
Some(finterfaces::Event::Existing(
finterfaces_ext::Properties {
id: IFACE1_ID,
name: IFACE1_NAME.to_string(),
device_class: IFACE1_CLASS,
online: true,
addresses: vec![finterfaces_ext::Address {
addr: addr1.into_fidl(),
valid_until: ADDR_VALID_UNTIL.into_nanos(),
assignment_state: finterfaces::AddressAssignmentState::Assigned,
}
.into()],
has_default_ipv4_route: true,
has_default_ipv6_route: false,
}
.into()
))
);
assert_eq!(new_watcher.next().await, Some(finterfaces::Event::Idle(finterfaces::Empty {})));
}
/// Tests [`Drop`] implementation for [`InterfaceEventProducer`].
#[fixture(with_worker)]
#[fuchsia_async::run_singlethreaded(test)]
async fn drop_producer_removes_interface(
mut watcher_sink: WorkerWatcherSink,
interface_sink: WorkerInterfaceSink,
) {
let mut watcher = watcher_sink.create_watcher_event_stream();
assert_eq!(watcher.next().await, Some(finterfaces::Event::Idle(finterfaces::Empty {})));
let producer1 = interface_sink
.add_interface(
IFACE1_ID,
InterfaceProperties { name: IFACE1_NAME.to_string(), device_class: IFACE1_CLASS },
)
.expect(" add interface");
let _producer2 = interface_sink.add_interface(
IFACE2_ID,
InterfaceProperties { name: IFACE2_NAME.to_string(), device_class: IFACE2_CLASS },
);
assert_matches!(
watcher.next().await,
Some(finterfaces::Event::Added(finterfaces::Properties {
id: Some(id),
..
})) if id == IFACE1_ID.get()
);
assert_matches!(
watcher.next().await,
Some(finterfaces::Event::Added(finterfaces::Properties {
id: Some(id),
..
})) if id == IFACE2_ID.get()
);
std::mem::drop(producer1);
assert_eq!(watcher.next().await, Some(finterfaces::Event::Removed(IFACE1_ID.get())));
// Create new watcher and enumerate, only interface 2 should be
// around now.
let mut new_watcher = watcher_sink.create_watcher_event_stream();
assert_matches!(
new_watcher.next().await,
Some(finterfaces::Event::Existing(finterfaces::Properties {
id: Some(id),
..
})) if id == IFACE2_ID.get()
);
assert_eq!(new_watcher.next().await, Some(finterfaces::Event::Idle(finterfaces::Empty {})));
}
fn iface1_initial_state() -> (BindingId, InterfaceState) {
(
IFACE1_ID,
InterfaceState {
properties: InterfaceProperties {
name: IFACE1_NAME.to_string(),
device_class: IFACE1_CLASS,
},
online: false,
addresses: Default::default(),
has_default_ipv4_route: false,
has_default_ipv6_route: false,
},
)
}
#[test]
fn consume_interface_added() {
let mut state = HashMap::new();
let (id, initial_state) = iface1_initial_state();
let event = InterfaceEvent::Added { id, properties: initial_state.properties.clone() };
// Add interface.
assert_eq!(
Worker::consume_event(&mut state, event.clone()),
Ok(Some((
finterfaces::Event::Added(
finterfaces_ext::Properties {
id,
name: initial_state.properties.name.clone(),
device_class: initial_state.properties.device_class.clone(),
online: false,
addresses: Vec::new(),
has_default_ipv4_route: false,
has_default_ipv6_route: false,
}
.into()
),
ChangedAddressProperties::InterestNotApplicable
)))
);
// Verify state has been updated.
assert_eq!(state.get(&id), Some(&initial_state));
// Adding again causes error.
assert_eq!(
Worker::consume_event(&mut state, event),
Err(WorkerError::AddedDuplicateInterface { interface: id, old: initial_state })
);
}
#[test]
fn consume_interface_removed() {
let (id, initial_state) = iface1_initial_state();
let mut state = HashMap::from([(id, initial_state)]);
// Remove interface.
assert_eq!(
Worker::consume_event(&mut state, InterfaceEvent::Removed(id)),
Ok(Some((
finterfaces::Event::Removed(id.get()),
ChangedAddressProperties::InterestNotApplicable
)))
);
// State is updated.
assert_eq!(state.get(&id), None);
// Can't remove again.
assert_eq!(
Worker::consume_event(&mut state, InterfaceEvent::Removed(id)),
Err(WorkerError::RemoveNonexistentInterface(id))
);
}
#[test]
fn consume_changed_bad_id() {
let mut state = HashMap::new();
assert_eq!(
Worker::consume_event(
&mut state,
InterfaceEvent::Changed {
id: IFACE1_ID,
event: InterfaceUpdate::OnlineChanged(true)
}
),
Err(WorkerError::UpdateNonexistentInterface(IFACE1_ID))
);
}
#[test_case(IpAddressState::Assigned, true; "assigned")]
#[test_case(IpAddressState::Unavailable, false; "unavailable")]
#[test_case(IpAddressState::Tentative, false; "tentative")]
fn consume_changed_address_add_and_removed(
assignment_state: IpAddressState,
involves_assigned: bool,
) {
let addr = AddrSubnetEither::V6(
AddrSubnet::new(*Ipv6::LOOPBACK_IPV6_ADDRESS, Ipv6Addr::BYTES * 8).unwrap(),
);
let valid_until = zx::Time::from_nanos(1234);
let (id, initial_state) = iface1_initial_state();
let mut state = HashMap::from([(id, initial_state)]);
let (ip_addr, prefix_len) = addr.addr_prefix();
// Add address.
{
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressAdded {
addr: addr.clone(),
assignment_state,
valid_until,
},
};
assert_eq!(
Worker::consume_event(&mut state, event.clone()),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(vec![finterfaces_ext::Address {
addr: addr.clone().into_fidl(),
valid_until: valid_until.into_nanos(),
assignment_state: assignment_state.into_fidl(),
}
.into()]),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned },
))),
);
// Check state is updated.
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&*ip_addr),
Some(&AddressProperties {
prefix_len,
state: AddressState { valid_until: valid_until, assignment_state }
})
);
// Can't add again.
assert_eq!(
Worker::consume_event(&mut state, event),
Err(WorkerError::AssignExistingAddr { addr: *ip_addr, interface: id })
);
}
// Remove address.
{
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressRemoved(ip_addr.get()),
};
assert_eq!(
Worker::consume_event(&mut state, event.clone()),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(Vec::new()),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned },
)))
);
// Check state is updated.
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&ip_addr),
None
);
// Can't remove again.
assert_eq!(
Worker::consume_event(&mut state, event),
Err(WorkerError::UnassignNonexistentAddr { interface: id, addr: ip_addr.get() })
);
}
}
#[test]
fn adding_and_removing_tentative_addresses_do_not_trigger_events() {
let addr = AddrSubnetEither::V6(
AddrSubnet::new(*Ipv6::LOOPBACK_IPV6_ADDRESS, Ipv6Addr::BYTES * 8).unwrap(),
);
let valid_until = zx::Time::from_nanos(1234);
let (id, initial_state) = iface1_initial_state();
let mut state = HashMap::from([(id, initial_state)]);
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressAdded {
addr: addr.clone(),
assignment_state: IpAddressState::Tentative,
valid_until: valid_until,
},
};
// Add address, no event should be generated.
assert_eq!(
Worker::consume_event(&mut state, event),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(vec![finterfaces_ext::Address {
addr: addr.into_fidl(),
valid_until: valid_until.into_nanos(),
assignment_state: finterfaces::AddressAssignmentState::Tentative,
}
.into()]),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned: false },
))),
);
let (ip_addr, prefix_len) = addr.addr_prefix();
// Check state is updated.
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&*ip_addr),
Some(&AddressProperties {
prefix_len,
state: AddressState { valid_until, assignment_state: IpAddressState::Tentative }
})
);
let event =
InterfaceEvent::Changed { id, event: InterfaceUpdate::AddressRemoved(*ip_addr) };
// Remove address, no event should be generated.
assert_eq!(
Worker::consume_event(&mut state, event),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(Vec::new()),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned: false },
))),
);
// Check state is updated.
assert_eq!(state.get(&id).expect("missing interface entry").addresses.get(&*ip_addr), None);
}
#[test_case(false; "no_changes_to_unavailable")]
#[test_case(true; "intersperse_changes_to_unavailable")]
fn consume_changed_address_state_change(intersperse_unavailable: bool) {
let subnet = AddrSubnetEither::<net_types::SpecifiedAddr<_>>::V6(
AddrSubnet::new(*Ipv6::LOOPBACK_IPV6_ADDRESS, Ipv6Addr::BYTES * 8).unwrap(),
);
let (addr, prefix_len) = subnet.addr_prefix();
let addr = *addr;
let valid_until = zx::Time::from_nanos(1234);
let address_properties = AddressProperties {
prefix_len,
state: AddressState { valid_until, assignment_state: IpAddressState::Tentative },
};
let (id, initial_state) = iface1_initial_state();
let initial_state = InterfaceState {
addresses: HashMap::from([(addr, address_properties)]),
..initial_state
};
// When `intersperse_unavailable` is `true` a state change to
// Unavailable is injected between all state changes: e.g.
// Tentative -> Assigned becomes Tentative -> Unavailable -> Assigned.
// This allows us to verify that changes to Unavailable do not influence
// the events observed by the client.
let maybe_change_state_to_unavailable =
|state: &mut HashMap<_, InterfaceState>, involves_assigned| {
if !intersperse_unavailable {
return;
}
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressAssignmentStateChanged {
addr,
new_state: IpAddressState::Unavailable,
},
};
// Changing state to Unavailable should never generate an event.
assert_eq!(
Worker::consume_event(state, event),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(vec![finterfaces_ext::Address {
addr: subnet.into_fidl(),
valid_until: valid_until.into_nanos(),
assignment_state: finterfaces::AddressAssignmentState::Unavailable,
}
.into()]),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned },
))),
);
// Check state is updated.
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&addr),
Some(&AddressProperties {
prefix_len,
state: AddressState {
valid_until,
assignment_state: IpAddressState::Unavailable,
}
}),
);
};
assert_eq!(
Worker::collect_addresses::<finterfaces::Address>(&initial_state.addresses),
[finterfaces::Address {
addr: Some(subnet.into_fidl()),
valid_until: Some(valid_until.into_nanos()),
assignment_state: Some(finterfaces::AddressAssignmentState::Tentative),
..Default::default()
}],
);
let mut state = HashMap::from([(id, initial_state)]);
maybe_change_state_to_unavailable(&mut state, false);
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressAssignmentStateChanged {
addr,
new_state: IpAddressState::Assigned,
},
};
// State switch causes event.
let expected_event = (
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(vec![finterfaces_ext::Address {
addr: subnet.into_fidl(),
valid_until: valid_until.into_nanos(),
assignment_state: finterfaces::AddressAssignmentState::Assigned,
}
.into()]),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged { involves_assigned: true },
);
assert_eq!(
Worker::consume_event(&mut state, event.clone()),
Ok(Some(expected_event.clone())),
);
// Check state is updated.
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&addr),
Some(&AddressProperties {
prefix_len,
state: AddressState { valid_until, assignment_state: IpAddressState::Assigned },
})
);
maybe_change_state_to_unavailable(&mut state, true);
assert_eq!(
Worker::consume_event(&mut state, event),
Ok(intersperse_unavailable.then_some(expected_event)),
);
maybe_change_state_to_unavailable(&mut state, true);
// Switch the state back to tentative, which will trigger removal.
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressAssignmentStateChanged {
addr,
new_state: IpAddressState::Tentative,
},
};
let expected_event = (
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(vec![finterfaces_ext::Address {
addr: subnet.into_fidl(),
valid_until: valid_until.into_nanos(),
assignment_state: finterfaces::AddressAssignmentState::Tentative,
}
.into()]),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged {
involves_assigned: !intersperse_unavailable,
},
);
assert_eq!(
Worker::consume_event(&mut state, event.clone()),
Ok(Some(expected_event.clone())),
);
// Check state is updated.
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&addr),
Some(&AddressProperties {
prefix_len,
state: AddressState { valid_until, assignment_state: IpAddressState::Tentative },
})
);
maybe_change_state_to_unavailable(&mut state, false);
assert_eq!(
Worker::consume_event(&mut state, event),
Ok(intersperse_unavailable.then_some(expected_event)),
);
maybe_change_state_to_unavailable(&mut state, false);
}
#[test_case(IpAddressState::Assigned; "assigned")]
#[test_case(IpAddressState::Unavailable; "unavailable")]
#[test_case(IpAddressState::Tentative; "tentative")]
fn consume_changed_start_unavailable(new_state: IpAddressState) {
let addr = AddrSubnetEither::<net_types::SpecifiedAddr<_>>::V6(
AddrSubnet::new(*Ipv6::LOOPBACK_IPV6_ADDRESS, Ipv6Addr::BYTES * 8).unwrap(),
);
let valid_until = zx::Time::from_nanos(1234);
let (ip_addr, prefix_len) = addr.addr_prefix();
// Set up the initial state.
let (id, mut initial_state) = iface1_initial_state();
assert_eq!(
initial_state.addresses.insert(
*ip_addr,
AddressProperties {
prefix_len,
state: AddressState {
valid_until,
assignment_state: IpAddressState::Unavailable
}
}
),
None
);
let mut state = HashMap::from([(id, initial_state)]);
// Send an event to change the state.
let event = InterfaceEvent::Changed {
id,
event: InterfaceUpdate::AddressAssignmentStateChanged { addr: *addr.addr(), new_state },
};
let expected_event = match new_state {
// Changing from `Unavailable` to `Unavailable` is a No-Op.
IpAddressState::Unavailable => None,
new_state @ (IpAddressState::Assigned | IpAddressState::Tentative) => Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
addresses: Some(vec![finterfaces_ext::Address {
addr: addr.into_fidl(),
valid_until: valid_until.into_nanos(),
assignment_state: new_state.into_fidl(),
}
.into()]),
..Default::default()
}),
ChangedAddressProperties::AssignmentStateChanged {
involves_assigned: is_assigned(new_state),
},
)),
};
assert_eq!(Worker::consume_event(&mut state, event), Ok(expected_event));
assert_eq!(
state.get(&id).expect("missing interface entry").addresses.get(&addr.addr()),
Some(&AddressProperties {
prefix_len,
state: AddressState { valid_until, assignment_state: new_state },
})
);
}
#[test]
fn consume_changed_online() {
let (id, initial_state) = iface1_initial_state();
let mut state = HashMap::from([(id, initial_state)]);
// Change to online.
assert_eq!(
Worker::consume_event(
&mut state,
InterfaceEvent::Changed { id, event: InterfaceUpdate::OnlineChanged(true) }
),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
online: Some(true),
..Default::default()
}),
ChangedAddressProperties::InterestNotApplicable
)))
);
// Check state is updated.
assert_eq!(state.get(&id).expect("missing interface entry").online, true);
// Change again produces no update.
assert_eq!(
Worker::consume_event(
&mut state,
InterfaceEvent::Changed { id, event: InterfaceUpdate::OnlineChanged(true) }
),
Ok(None)
);
}
#[test_case(IpVersion::V4; "ipv4")]
#[test_case(IpVersion::V6; "ipv6")]
fn consume_changed_default_route(version: IpVersion) {
let (id, initial_state) = iface1_initial_state();
let mut state = HashMap::from([(id, initial_state)]);
let expect_set_props = match version {
IpVersion::V4 => {
finterfaces::Properties { has_default_ipv4_route: Some(true), ..Default::default() }
}
IpVersion::V6 => {
finterfaces::Properties { has_default_ipv6_route: Some(true), ..Default::default() }
}
};
// Update default route.
assert_eq!(
Worker::consume_event(
&mut state,
InterfaceEvent::Changed {
id,
event: InterfaceUpdate::DefaultRouteChanged {
version,
has_default_route: true
}
}
),
Ok(Some((
finterfaces::Event::Changed(finterfaces::Properties {
id: Some(id.get()),
..expect_set_props
}),
ChangedAddressProperties::InterestNotApplicable
)))
);
// Check only the proper state is updated.
let InterfaceState { has_default_ipv4_route, has_default_ipv6_route, .. } =
state.get(&id).expect("missing interface entry");
assert_eq!(*has_default_ipv4_route, version == IpVersion::V4);
assert_eq!(*has_default_ipv6_route, version == IpVersion::V6);
// Change again produces no update.
assert_eq!(
Worker::consume_event(
&mut state,
InterfaceEvent::Changed {
id,
event: InterfaceUpdate::DefaultRouteChanged {
version,
has_default_route: true
}
}
),
Ok(None)
);
}
#[fixture(with_worker)]
#[fuchsia_async::run_singlethreaded(test)]
async fn watcher_enqueues_events(
mut watcher_sink: WorkerWatcherSink,
interface_sink: WorkerInterfaceSink,
) {
let mut create_watcher = || {
let mut watcher = watcher_sink.create_watcher_event_stream();
async move {
assert_eq!(
watcher.next().await,
Some(finterfaces::Event::Idle(finterfaces::Empty {}))
);
watcher
}
};
let watcher1 = create_watcher().await;
let watcher2 = create_watcher().await;
let range = 1..=10;
let producers = watcher1
.zip(futures::stream::iter(range.clone().map(|i| {
let i = BindingId::new(i).unwrap();
let producer = interface_sink
.add_interface(
i,
InterfaceProperties {
name: format!("if{}", i),
device_class: IFACE1_CLASS,
},
)
.expect("failed to add interface");
(producer, i)
})))
.map(|(event, (producer, i))| {
assert_matches!(
event,
finterfaces::Event::Added(finterfaces::Properties {
id: Some(id),
..
}) if id == i.get()
);
producer
})
.collect::<Vec<_>>()
.await;
assert_eq!(producers.len(), usize::try_from(*range.end()).unwrap());
let last = watcher2
.zip(futures::stream::iter(range.clone()))
.fold(None, |_, (event, i)| {
assert_matches!(
event,
finterfaces::Event::Added(finterfaces::Properties {
id: Some(id),
..
}) if id == i
);
futures::future::ready(Some(i))
})
.await;
assert_eq!(last, Some(*range.end()));
}
#[fixture(with_worker)]
#[fuchsia_async::run_singlethreaded(test)]
async fn idle_watcher_gets_closed(
mut watcher_sink: WorkerWatcherSink,
interface_sink: WorkerInterfaceSink,
) {
let watcher = watcher_sink.create_watcher();
// Get the idle event to make sure the worker sees the watcher.
assert_matches!(watcher.watch().await, Ok(finterfaces::Event::Idle(finterfaces::Empty {})));
// NB: Every round generates two events, addition and removal because we
// drop the producer.
for i in 1..=(MAX_EVENTS / 2 + 1).try_into().unwrap() {
let _: InterfaceEventProducer = interface_sink
.add_interface(
BindingId::new(i).unwrap(),
InterfaceProperties { name: format!("if{}", i), device_class: IFACE1_CLASS },
)
.expect("failed to add interface");
}
// Watcher gets closed.
assert_eq!(watcher.on_closed().await, Ok(zx::Signals::CHANNEL_PEER_CLOSED));
}
/// Tests that the worker can handle watchers coming and going.
#[test]
fn watcher_turnaround() {
let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
let (worker, mut watcher_sink, interface_sink) = Worker::new();
let sink_keep = watcher_sink.clone();
let create_watchers = fuchsia_async::Task::spawn(async move {
let mut watcher = watcher_sink.create_watcher_event_stream();
assert_eq!(watcher.next().await, Some(finterfaces::Event::Idle(finterfaces::Empty {})));
});
// NB: Map the output of the worker future so we can assert equality on
// the return, since its return is not Debug otherwise.
let worker_fut = worker.run().map(|result| {
let pending_watchers = result.expect("worker finished with error");
pending_watchers.len()
});
let mut worker_fut = pin!(worker_fut);
assert_eq!(executor.run_until_stalled(&mut worker_fut), std::task::Poll::Pending);
// If executor stalled then the task must've finished.
assert_eq!(create_watchers.now_or_never(), Some(()));
// Drop the sinks, should cause the worker to return.
std::mem::drop((sink_keep, interface_sink));
assert_eq!(executor.run_until_stalled(&mut worker_fut), std::task::Poll::Ready(0));
}
#[fixture(with_worker)]
#[fuchsia_async::run_singlethreaded(test)]
async fn address_sorting(
mut watcher_sink: WorkerWatcherSink,
interface_sink: WorkerInterfaceSink,
) {
let mut watcher = watcher_sink.create_watcher_event_stream();
assert_eq!(watcher.next().await, Some(finterfaces::Event::Idle(finterfaces::Empty {})));
const ADDR1: fnet::Subnet = net_declare::fidl_subnet!("2000::1/64");
const ADDR2: fnet::Subnet = net_declare::fidl_subnet!("192.168.1.1/24");
const ADDR3: fnet::Subnet = net_declare::fidl_subnet!("192.168.1.2/24");
for addrs in [ADDR1, ADDR2, ADDR3].into_iter().permutations(3) {
let producer = interface_sink
.add_interface(
IFACE1_ID,
InterfaceProperties {
name: IFACE1_NAME.to_string(),
device_class: IFACE1_CLASS,
},
)
.expect("failed to add interface");
assert_matches!(
watcher.next().await,
Some(finterfaces::Event::Added(finterfaces::Properties {
id: Some(id), .. }
)) if id == IFACE1_ID.get()
);
let mut expect = vec![];
for addr in addrs {
producer
.notify(InterfaceUpdate::AddressAdded {
addr: addr.try_into_core().expect("invalid address"),
assignment_state: IpAddressState::Assigned,
valid_until: zx::Time::INFINITE,
})
.expect("failed to notify");
expect.push(addr);
expect.sort();
let addresses = assert_matches!(
watcher.next().await,
Some(finterfaces::Event::Changed(finterfaces::Properties{
id: Some(id),
addresses: Some(addresses),
..
})) if id == IFACE1_ID.get() => addresses
);
let addresses = addresses
.into_iter()
.map(|finterfaces::Address { addr, .. }| addr.expect("missing address"))
.collect::<Vec<_>>();
assert_eq!(addresses, expect);
}
std::mem::drop(producer);
assert_eq!(watcher.next().await, Some(finterfaces::Event::Removed(IFACE1_ID.get())));
}
}
#[fixture(with_worker)]
#[fuchsia_async::run_singlethreaded(test)]
async fn watcher_disallows_double_get(
mut watcher_sink: WorkerWatcherSink,
_interface_sink: WorkerInterfaceSink,
) {
let watcher = watcher_sink.create_watcher();
assert_matches!(watcher.watch().await, Ok(finterfaces::Event::Idle(finterfaces::Empty {})));
let (r1, r2) = futures::future::join(watcher.watch(), watcher.watch()).await;
for r in [r1, r2] {
assert_matches!(
r,
Err(fidl::Error::ClientChannelClosed { status: zx::Status::ALREADY_EXISTS, .. })
);
}
}
#[test]
fn watcher_blocking_push() {
let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<finterfaces::WatcherMarker>()
.expect("failed to create watcher");
let mut watcher = Watcher {
stream,
events: EventQueue { events: Default::default() },
responder: None,
options: WatcherOptions::default(),
};
let mut watch_fut = proxy.watch();
assert_matches!(executor.run_until_stalled(&mut watch_fut), std::task::Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut watcher), std::task::Poll::Pending);
// Got a responder, we're pending.
assert_matches!(watcher.responder, Some(_));
watcher.push(finterfaces::Event::Idle(finterfaces::Empty {}));
// Responder is executed.
assert_matches!(watcher.responder, None);
assert_matches!(
executor.run_until_stalled(&mut watch_fut),
std::task::Poll::Ready(Ok(finterfaces::Event::Idle(finterfaces::Empty {})))
);
}
}