blob: 2b116a03e406820e4cdf81792a3997e4b379344a [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod controller;
mod conversion;
use std::collections::{hash_map::Entry, HashMap};
use std::num::NonZeroUsize;
use std::pin::pin;
use std::sync::Arc;
use fidl::endpoints::{ControlHandle as _, ProtocolMarker as _};
use fidl_fuchsia_net_filter as fnet_filter;
use fidl_fuchsia_net_filter_ext as fnet_filter_ext;
use fuchsia_zircon as zx;
use futures::{
channel::mpsc, future::FusedFuture as _, lock::Mutex, FutureExt as _, StreamExt as _,
TryStreamExt as _,
};
use itertools::Itertools as _;
use thiserror::Error;
use tracing::{error, warn};
use controller::{CommitResult, Controller};
// The maximum number of events a client for the `fuchsia.net.filter/Watcher` is
// allowed to have queued. Clients will be dropped if they exceed this limit.
// Keep this a multiple of `fnet_filter::MAX_BATCH_SIZE` (5 is somewhat
// arbitrary) so that we don't artificially truncate the allowed batch size.
const MAX_PENDING_EVENTS: usize = (fnet_filter::MAX_BATCH_SIZE * 5) as usize;
#[derive(Default)]
pub(crate) struct UpdateDispatcher(Arc<Mutex<UpdateDispatcherInner>>);
#[derive(Default)]
struct UpdateDispatcherInner {
resources: HashMap<fnet_filter_ext::ControllerId, Controller>,
clients: Vec<WatcherSink>,
}
enum CommitError {
RuleWithInvalidMatcher(fnet_filter_ext::RuleId),
RuleWithInvalidAction(fnet_filter_ext::RuleId),
CyclicalRoutineGraph(fnet_filter_ext::RoutineId),
ErrorOnChange { index: usize, error: fnet_filter::CommitError },
}
impl UpdateDispatcherInner {
fn new_controller(
&mut self,
id: fnet_filter_ext::ControllerId,
) -> fnet_filter_ext::ControllerId {
let Self { resources, clients: _ } = self;
let mut counter = 0usize;
let fnet_filter_ext::ControllerId(id) = id;
loop {
// Suffix the `id` with `counter` after the first iteration.
let id = fnet_filter_ext::ControllerId(
NonZeroUsize::new(counter)
.map(|counter| format!("{id}-{counter}"))
.unwrap_or(id.clone()),
);
match resources.entry(id.clone()) {
Entry::Vacant(entry) => {
let _ = entry.insert(Controller::default());
return id;
}
Entry::Occupied(_) => {
counter =
counter.checked_add(1).expect("should find a unique ID before overflowing");
}
}
}
}
fn remove_controller(&mut self, id: &fnet_filter_ext::ControllerId) {
let Self { resources, clients } = self;
let resources = resources.remove(id).expect("controller should only be removed once");
// Notify all existing watchers of the removed resources.
let events = resources
.existing_ids()
.map(|resource| fnet_filter_ext::Event::Removed(id.clone(), resource))
.chain(std::iter::once(fnet_filter_ext::Event::EndOfUpdate))
.collect::<Vec<_>>()
.into_iter();
for client in clients {
client.send(events.clone());
}
}
fn commit_changes(
&mut self,
controller_id: &fnet_filter_ext::ControllerId,
changes: Vec<fnet_filter_ext::Change>,
idempotent: bool,
ctx: &mut crate::bindings::Ctx,
) -> Result<(), CommitError> {
let Self { resources, clients } = self;
// NB: we update NS3 core and broadcast the updates to clients under a
// single lock to ensure bindings and core see updates in the same
// order.
let CommitResult { events, new_state, core_state_v4, core_state_v6 } = resources
.get_mut(controller_id)
.expect("controller should not be removed while serving")
.validate_and_convert_changes(changes, idempotent)?;
// Merge the new Core state from this controller with existing state
// from all the other controllers. Note that this is an infallible
// operation because the Core state has already been validated for each
// controller.
let (v4, v6) = resources
.iter()
.filter_map(|(id, controller)| (id != controller_id).then_some(controller))
.fold(
(core_state_v4.clone(), core_state_v6.clone()),
|(mut v4, mut v6), controller| {
v4.merge(&controller.core_state_v4);
v6.merge(&controller.core_state_v6);
(v4, v6)
},
);
// Attempt to install the new filtering state in Netstack3 Core.
ctx.api().filter().set_filter_state(v4.into(), v6.into()).map_err(|error| match error {
netstack3_core::filter::ValidationError::RuleWithInvalidMatcher(rule_id) => {
CommitError::RuleWithInvalidMatcher(rule_id)
}
})?;
// Only if validation was successful do we actually commit the changes.
// This ensures that the state will never be only partially updated.
resources
.get_mut(controller_id)
.expect("controller should not be removed while serving")
.apply_new_state(new_state, core_state_v4, core_state_v6);
// Notify all existing watchers of the update, if it was not a no-op.
if !events.is_empty() {
let events = events
.into_iter()
.map(|event| match event {
controller::Event::Added(resource) => {
fnet_filter_ext::Event::Added(controller_id.clone(), resource)
}
controller::Event::Removed(resource) => {
fnet_filter_ext::Event::Removed(controller_id.clone(), resource)
}
})
.chain(std::iter::once(fnet_filter_ext::Event::EndOfUpdate));
for client in clients {
client.send(events.clone());
}
}
Ok(())
}
fn connect_new_client(&mut self) -> Watcher {
let Self { resources, clients } = self;
let (watcher, sink) = Watcher::new_with_existing_resources(resources);
clients.push(sink);
watcher
}
fn disconnect_client(&mut self, watcher: Watcher) {
let Self { resources: _, clients } = self;
let (i, _): (usize, &WatcherSink) = clients
.iter()
.enumerate()
.filter(|(_i, client)| client.is_connected_to(&watcher))
.exactly_one()
.expect("watcher should be connected to exactly one sink");
let _: WatcherSink = clients.swap_remove(i);
}
}
#[derive(Debug)]
struct WatcherSink {
sink: mpsc::Sender<fnet_filter_ext::Event>,
cancel: async_utils::event::Event,
}
impl WatcherSink {
fn send(&mut self, changes: impl Iterator<Item = fnet_filter_ext::Event>) {
for change in changes {
self.sink.try_send(change).unwrap_or_else(|e| {
if e.is_full() {
if self.cancel.signal() {
warn!(
"too many unconsumed events (the client may not be \
calling Watch frequently enough): {}",
MAX_PENDING_EVENTS
)
}
} else {
panic!("unexpected error trying to send: {:?}", e)
}
})
}
}
fn is_connected_to(&self, watcher: &Watcher) -> bool {
self.sink.is_connected_to(&watcher.receiver)
}
}
struct Watcher {
existing_resources: <Vec<fnet_filter_ext::Event> as std::iter::IntoIterator>::IntoIter,
receiver: mpsc::Receiver<fnet_filter_ext::Event>,
canceled: async_utils::event::Event,
}
impl Watcher {
fn new_with_existing_resources(
resources: &HashMap<fnet_filter_ext::ControllerId, Controller>,
) -> (Self, WatcherSink) {
let existing_resources = resources
.into_iter()
.flat_map(|(controller, resources)| {
resources
.existing_resources()
.map(|resource| fnet_filter_ext::Event::Existing(controller.clone(), resource))
})
.chain(std::iter::once(fnet_filter_ext::Event::Idle))
.collect::<Vec<_>>()
.into_iter();
let (sender, receiver) = mpsc::channel(MAX_PENDING_EVENTS);
let cancel = async_utils::event::Event::new();
(
Watcher { existing_resources, receiver, canceled: cancel.clone() },
WatcherSink { sink: sender, cancel },
)
}
fn watch(&mut self) -> impl futures::Future<Output = Vec<fnet_filter_ext::Event>> + Unpin + '_ {
let Self { existing_resources, receiver, canceled: _ } = self;
futures::stream::iter(existing_resources)
.chain(receiver)
.ready_chunks(fnet_filter::MAX_BATCH_SIZE.into())
.into_future()
.map(|(r, _ready_chunks)| r.expect("underlying event stream unexpectedly ended"))
}
}
pub(crate) async fn serve_state(
stream: fnet_filter::StateRequestStream,
dispatcher: &UpdateDispatcher,
) -> Result<(), fidl::Error> {
stream
.try_for_each_concurrent(None, |request| async {
match request {
fnet_filter::StateRequest::GetWatcher {
options: _,
request,
control_handle: _,
} => {
let requests =
request.into_stream().expect("get request stream from server end");
serve_watcher(requests, dispatcher).await.unwrap_or_else(|e| {
warn!("error serving {}: {e:?}", fnet_filter::WatcherMarker::DEBUG_NAME)
});
}
}
Ok(())
})
.await
}
#[derive(Debug, Error)]
enum ServeWatcherError {
#[error("the request stream contained a FIDL error")]
ErrorInStream(fidl::Error),
#[error("a FIDL error was encountered while sending the response")]
FailedToRespond(fidl::Error),
#[error("the client called `Watch` while a previous call was already pending")]
PreviousPendingWatch,
#[error("the client was canceled")]
Canceled,
}
async fn serve_watcher(
stream: fnet_filter::WatcherRequestStream,
UpdateDispatcher(dispatcher): &UpdateDispatcher,
) -> Result<(), ServeWatcherError> {
let mut watcher = dispatcher.lock().await.connect_new_client();
let canceled_fut = watcher.canceled.wait();
let result = {
let mut request_stream = stream.map_err(ServeWatcherError::ErrorInStream).fuse();
let mut canceled_fut = pin!(canceled_fut);
let mut hanging_get = futures::future::OptionFuture::default();
loop {
hanging_get = futures::select! {
request = request_stream.try_next() => match request {
Ok(Some(fnet_filter::WatcherRequest::Watch { responder } )) => {
if hanging_get.is_terminated() {
// Convince the compiler that we're not holding on to a
// borrow of `watcher`.
drop(hanging_get);
// Either there is no pending request or the previously
// pending one has completed. We can accept a new
// hanging get.
Some(watcher.watch().map(move |events| (responder, events))).into()
} else {
break Err(ServeWatcherError::PreviousPendingWatch);
}
},
Ok(None) => break Ok(()),
Err(e) => break Err(e),
},
r = hanging_get => {
let (responder, events) = r.expect("OptionFuture is not selected if empty");
let events = events.into_iter().map(Into::into).collect::<Vec<_>>();
match responder.send(&events) {
Ok(()) => None.into(),
Err(e) => break Err(ServeWatcherError::FailedToRespond(e)),
}
},
() = canceled_fut => break Err(ServeWatcherError::Canceled),
};
}
};
dispatcher.lock().await.disconnect_client(watcher);
result
}
pub(crate) async fn serve_control(
stream: fnet_filter::ControlRequestStream,
dispatcher: &UpdateDispatcher,
ctx: &crate::bindings::Ctx,
) -> Result<(), fidl::Error> {
use fnet_filter::ControlRequest;
stream
.try_for_each_concurrent(None, |request| async {
match request {
ControlRequest::OpenController { id, request, control_handle: _ } => {
let UpdateDispatcher(inner) = dispatcher;
let final_id =
inner.lock().await.new_controller(fnet_filter_ext::ControllerId(id));
let (stream, control_handle) = request.into_stream_and_control_handle()?;
serve_controller(&final_id, stream, control_handle, dispatcher, ctx.clone())
.await
.unwrap_or_else(|e| warn!("error serving namespace controller: {e:?}"));
inner.lock().await.remove_controller(&final_id);
}
ControlRequest::ReopenDetachedController { key: _, request: _, control_handle } => {
error!("TODO(https://fxbug.dev/42182623): detaching is not implemented");
control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
}
}
Ok(())
})
.await
}
async fn serve_controller(
id: &fnet_filter_ext::ControllerId,
mut stream: fnet_filter::NamespaceControllerRequestStream,
control_handle: fnet_filter::NamespaceControllerControlHandle,
UpdateDispatcher(dispatcher): &UpdateDispatcher,
mut ctx: crate::bindings::Ctx,
) -> Result<(), fidl::Error> {
use fnet_filter::NamespaceControllerRequest;
control_handle.send_on_id_assigned(&id.0)?;
let mut pending_changes = Vec::new();
while let Some(request) = stream.try_next().await? {
match request {
NamespaceControllerRequest::PushChanges { changes, responder } => {
let num_changes = changes.len();
if pending_changes.len() + num_changes > fnet_filter::MAX_COMMIT_SIZE.into() {
responder.send(fnet_filter::ChangeValidationResult::TooManyChanges(
fnet_filter::Empty {},
))?;
continue;
}
enum Error {
ReturnToClient(fnet_filter::ChangeValidationError),
Fatal,
}
let result = changes
.into_iter()
.enumerate()
.map(|(i, change)| {
change.try_into().map_err(|error| {
use fnet_filter::ChangeValidationError;
use fnet_filter_ext::FidlConversionError;
warn!("encountered an invalid filter change at index {i}: {error:?}");
let error = match error {
FidlConversionError::UnknownUnionVariant(type_name) => {
warn!("client specified unknown variant of {type_name}");
Error::Fatal
}
FidlConversionError::MissingNamespaceId
| FidlConversionError::MissingNamespaceDomain
| FidlConversionError::MissingRoutineId
| FidlConversionError::MissingRoutineType
| FidlConversionError::MissingIpInstallationHook
| FidlConversionError::MissingNatInstallationHook => {
Error::ReturnToClient(
ChangeValidationError::MissingRequiredField,
)
}
FidlConversionError::ZeroInterfaceId => Error::ReturnToClient(
ChangeValidationError::InvalidInterfaceMatcher,
),
FidlConversionError::InvalidAddressRange
| FidlConversionError::AddressRangeFamilyMismatch
| FidlConversionError::SubnetPrefixTooLong
| FidlConversionError::SubnetHostBitsSet => Error::ReturnToClient(
ChangeValidationError::InvalidAddressMatcher,
),
FidlConversionError::InvalidPortRange => {
Error::ReturnToClient(ChangeValidationError::InvalidPortMatcher)
}
FidlConversionError::NotAnError => unreachable!(
"should not get this error when converting a `Change`"
),
};
(i, error)
})
})
.collect::<Result<Vec<_>, _>>()
.map(|changes| {
pending_changes.extend(changes);
});
let result = match result {
Ok(()) => fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}),
Err((valid_change_count, error)) => {
let error = match error {
Error::ReturnToClient(error) => error,
Error::Fatal => {
error!("fatal error serving filtering controller; closing channel");
control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
return Ok(());
}
};
// Return `Ok` for all valid changes, followed by the error we encountered,
// and pad the results with `NotReached` for the remaining changes.
let errors = std::iter::repeat(fnet_filter::ChangeValidationError::Ok)
.take(valid_change_count)
.chain(std::iter::once(error))
.chain(
std::iter::repeat(fnet_filter::ChangeValidationError::NotReached)
.take(num_changes - valid_change_count - 1),
)
.collect::<Vec<_>>();
fnet_filter::ChangeValidationResult::ErrorOnChange(errors)
}
};
responder.send(result)?;
}
NamespaceControllerRequest::Commit { payload: options, responder } => {
let fnet_filter::CommitOptions { idempotent, .. } = options;
let idempotent = idempotent.unwrap_or(false);
let changes = std::mem::take(&mut pending_changes);
let num_changes = changes.len();
let result =
dispatcher.lock().await.commit_changes(id, changes, idempotent, &mut ctx);
let result = match result {
Ok(()) => fnet_filter::CommitResult::Ok(fnet_filter::Empty {}),
Err(error) => match error {
CommitError::RuleWithInvalidMatcher(rule) => {
fnet_filter::CommitResult::RuleWithInvalidMatcher(rule.into())
}
CommitError::RuleWithInvalidAction(rule) => {
fnet_filter::CommitResult::RuleWithInvalidAction(rule.into())
}
CommitError::CyclicalRoutineGraph(routine) => {
fnet_filter::CommitResult::CyclicalRoutineGraph(routine.into())
}
CommitError::ErrorOnChange { index, error } => {
let errors = std::iter::repeat(fnet_filter::CommitError::Ok)
.take(index)
.chain(std::iter::once(error))
.chain(
std::iter::repeat(fnet_filter::CommitError::NotReached)
.take(num_changes - index - 1),
)
.collect::<Vec<_>>();
fnet_filter::CommitResult::ErrorOnChange(errors)
}
},
};
responder.send(result)?;
}
NamespaceControllerRequest::Detach { responder: _ } => {
error!("TODO(https://fxbug.dev/42182623): detaching is not implemented");
control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
}
}
}
Ok(())
}
// TODO(https://fxbug.dev/42182576): remove this once NetCfg interacts with
// fuchsia.net.filter for Netstack3.
pub(crate) async fn serve_deprecated(
stream: fidl_fuchsia_net_filter_deprecated::FilterRequestStream,
) -> Result<(), fidl::Error> {
use fidl_fuchsia_net_filter_deprecated::FilterRequest;
stream
.try_for_each(|request: FilterRequest| async move {
match request {
FilterRequest::DisableInterface { responder, .. } => {
error!(
"fuchsia.net.filter.deprecated.Filter is not implemented \
(https://fxbug.dev/42182576); ignoring DisableInterface"
);
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
FilterRequest::EnableInterface { responder, .. } => {
error!(
"fuchsia.net.filter.deprecated.Filter is not implemented \
(https://fxbug.dev/42182576); ignoring EnableInterface"
);
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
FilterRequest::GetRules { responder } => {
error!(
"fuchsia.net.filter.deprecated.Filter is not implemented \
(https://fxbug.dev/42182576); ignoring GetRules"
);
responder
.send(&[], 0)
.unwrap_or_else(|e| error!("Responder send error: {:?}", e))
}
FilterRequest::UpdateRules { rules, generation, responder } => {
error!(
"fuchsia.net.filter.deprecated.Filter is not implemented \
(https://fxbug.dev/42182576); ignoring UpdateRules \
{{ generation: {:?}, rules: {:?} }}",
generation, rules
);
responder.send(Ok(())).unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
FilterRequest::GetNatRules { .. } => {
todo!("https://fxbug.dev/42182576: implement filtering support");
}
FilterRequest::UpdateNatRules { .. } => {
todo!("https://fxbug.dev/42182576: implement filtering support");
}
FilterRequest::GetRdrRules { .. } => {
todo!("https://fxbug.dev/42182576: implement filtering support");
}
FilterRequest::UpdateRdrRules { .. } => {
todo!("https://fxbug.dev/42182576: implement filtering support");
}
FilterRequest::CheckPresence { responder } => {
responder.send().unwrap_or_else(|e| error!("failed to respond: {e:?}"));
}
};
Ok(())
})
.await
}