| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::future_help::{Observable, Observer}; |
| use crate::{NodeId, NodeLinkId}; |
| use anyhow::Error; |
| use fidl_fuchsia_overnet_protocol::RouteMetrics; |
| use fuchsia_async::Task; |
| use std::collections::BTreeMap; |
| use std::convert::TryInto; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| #[derive(Clone, Debug)] |
| pub(crate) struct ForwardingTable { |
| table: Arc<BTreeMap<NodeId, Metrics>>, |
| } |
| |
| impl ForwardingTable { |
| pub(crate) fn empty() -> ForwardingTable { |
| ForwardingTable { table: Arc::new(BTreeMap::new()) } |
| } |
| |
| pub(crate) fn iter(&self) -> impl Iterator<Item = (NodeId, &Metrics)> { |
| self.table.iter().map(|(n, m)| (*n, m)) |
| } |
| |
| pub(crate) fn route_for(&self, peer: NodeId) -> Option<NodeLinkId> { |
| self.table.get(&peer).map(|peer| peer.node_link_id) |
| } |
| |
| pub(crate) fn filter_out_via(self, node_id: NodeId) -> ForwardingTable { |
| ForwardingTable { |
| table: Arc::new( |
| self.table |
| .iter() |
| .filter(|(&destination, metrics)| { |
| destination != node_id && !metrics.is_via(node_id) |
| }) |
| .map(|(destination, metrics)| (*destination, metrics.clone())) |
| .collect(), |
| ), |
| } |
| } |
| |
| pub(crate) fn is_significantly_different_to(&self, other: &Self) -> bool { |
| if !self.table.keys().eq(other.table.keys()) { |
| return true; |
| } |
| |
| for (a, b) in self.table.values().zip(other.table.values()) { |
| if a.is_significantly_different_to(b) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| } |
| |
| #[derive(Clone, Debug)] |
| struct ReceivedMetrics { |
| round_trip_time: Option<Duration>, |
| intermediate_hops: Vec<NodeId>, |
| } |
| |
| impl From<RouteMetrics> for ReceivedMetrics { |
| fn from(m: RouteMetrics) -> Self { |
| Self { |
| round_trip_time: m.round_trip_time_us.map(Duration::from_micros), |
| intermediate_hops: m |
| .intermediate_hops |
| .map(|hops| hops.into_iter().map(Into::into).collect()) |
| .unwrap_or_else(Vec::new), |
| } |
| } |
| } |
| |
| fn score_rtt(rtt: Option<Duration>) -> impl PartialOrd { |
| rtt.map(|d| -d.as_secs_f32()) |
| } |
| |
| #[derive(Clone, Debug)] |
| pub(crate) struct LinkMetrics { |
| pub round_trip_time: Option<Duration>, |
| pub node_link_id: NodeLinkId, |
| } |
| |
| impl LinkMetrics { |
| pub(crate) fn score(&self) -> impl PartialOrd { |
| score_rtt(self.round_trip_time) |
| } |
| } |
| |
| #[derive(Clone, Debug)] |
| pub(crate) struct Metrics { |
| round_trip_time: Option<Duration>, |
| intermediate_hops: Vec<NodeId>, |
| node_link_id: NodeLinkId, |
| } |
| |
| impl From<&Metrics> for RouteMetrics { |
| fn from(metrics: &Metrics) -> Self { |
| Self { |
| round_trip_time_us: metrics |
| .round_trip_time |
| .and_then(|rtt| rtt.as_micros().try_into().ok()), |
| intermediate_hops: Some(metrics.intermediate_hops.iter().map(Into::into).collect()), |
| ..Self::EMPTY |
| } |
| } |
| } |
| |
| impl From<&LinkMetrics> for Metrics { |
| fn from(metrics: &LinkMetrics) -> Self { |
| Self { |
| round_trip_time: metrics.round_trip_time, |
| intermediate_hops: Vec::new(), |
| node_link_id: metrics.node_link_id, |
| } |
| } |
| } |
| |
| impl Metrics { |
| fn join(own_node_id: NodeId, received: ReceivedMetrics, link: &LinkMetrics) -> Self { |
| let mut intermediate_hops = received.intermediate_hops; |
| intermediate_hops.push(own_node_id); |
| Metrics { |
| round_trip_time: received.round_trip_time.zip(link.round_trip_time).map(|(a, b)| a + b), |
| node_link_id: link.node_link_id, |
| intermediate_hops, |
| } |
| } |
| |
| pub(crate) fn is_via(&self, node_id: NodeId) -> bool { |
| self.intermediate_hops.iter().find(|&&n| n == node_id).is_some() |
| } |
| |
| pub(crate) fn score(&self) -> impl PartialOrd { |
| score_rtt(self.round_trip_time) |
| } |
| |
| fn is_significantly_different_to(&self, other: &Self) -> bool { |
| if self.node_link_id != other.node_link_id { |
| return true; |
| } |
| |
| if self.intermediate_hops != other.intermediate_hops { |
| return true; |
| } |
| |
| if let Some(a) = self.round_trip_time { |
| if let Some(b) = other.round_trip_time { |
| let big_rtt = std::cmp::max(a, b); |
| let small_rtt = std::cmp::min(a, b); |
| let rtt_diff = big_rtt - small_rtt; |
| if rtt_diff > std::cmp::max(Duration::from_millis(5), big_rtt / 5) { |
| return true; |
| } |
| } else { |
| return true; |
| } |
| } else if other.round_trip_time.is_some() { |
| return true; |
| } |
| |
| false |
| } |
| } |
| |
| #[derive(Clone, Debug)] |
| struct Route { |
| destination: NodeId, |
| via: NodeId, |
| received_metrics: ReceivedMetrics, |
| } |
| |
| #[derive(Default, Debug, Clone)] |
| struct DB { |
| routes: Vec<Route>, |
| links: BTreeMap<NodeId, LinkMetrics>, |
| } |
| |
| pub(crate) struct Routes { |
| db: Observable<DB>, |
| forwarding_table: Observable<ForwardingTable>, |
| } |
| |
| impl Routes { |
| pub(crate) fn new() -> Routes { |
| let db = Observable::new(DB::default()); |
| let forwarding_table = |
| Observable::new(ForwardingTable { table: Arc::new(BTreeMap::new()) }); |
| Routes { db, forwarding_table } |
| } |
| |
| pub(crate) async fn update( |
| &self, |
| via: NodeId, |
| routes: impl Iterator<Item = (NodeId, RouteMetrics)>, |
| ) { |
| self.db |
| .edit(|db| { |
| db.routes.retain(|r| r.via != via); |
| db.routes.extend(routes.map(|(destination, route_metrics)| Route { |
| destination, |
| via, |
| received_metrics: route_metrics.into(), |
| })); |
| }) |
| .await; |
| } |
| |
| pub(crate) fn new_forwarding_table_observer(&self) -> Observer<ForwardingTable> { |
| self.forwarding_table.new_observer() |
| } |
| |
| pub(crate) async fn run_planner( |
| self: &Arc<Self>, |
| own_node_id: NodeId, |
| link_state: Observer<BTreeMap<NodeId, LinkMetrics>>, |
| ) -> Result<(), Error> { |
| let _merger = Task::spawn(self.clone().merge_links(link_state)); |
| |
| let mut db = self.db.new_observer(); |
| let mut last_emitted = ForwardingTable::empty(); |
| while let Some(db) = db.next().await { |
| log::trace!("[{:?}] Update with new route database {:#?}", own_node_id, db); |
| let mut wip: BTreeMap<NodeId, Metrics> = db |
| .links |
| .iter() |
| .map(|(destination, link_metrics)| (*destination, link_metrics.into())) |
| .collect(); |
| for Route { destination, via, received_metrics } in db.routes.into_iter() { |
| if let Some(link_metrics) = db.links.get(&via) { |
| let metrics = Metrics::join(own_node_id, received_metrics, link_metrics); |
| wip.entry(destination) |
| .and_modify(|existing| { |
| if metrics.score() > existing.score() { |
| *existing = metrics.clone(); |
| } |
| }) |
| .or_insert(metrics); |
| } |
| } |
| let table = ForwardingTable { table: Arc::new(wip) }; |
| if last_emitted.is_significantly_different_to(&table) { |
| log::trace!("[{:?}] New forwarding table: {:#?}", own_node_id, table); |
| self.forwarding_table.push(table.clone()).await; |
| last_emitted = table; |
| } |
| } |
| Ok(()) |
| } |
| |
| async fn merge_links(self: Arc<Self>, mut link_state: Observer<BTreeMap<NodeId, LinkMetrics>>) { |
| while let Some(links) = link_state.next().await { |
| self.db |
| .edit(|db| { |
| // Remove routes that depend on links which are no longer present. |
| db.routes.retain(|r| links.contains_key(&r.via)); |
| |
| db.links = links; |
| }) |
| .await; |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use futures::FutureExt; |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_merge_links_removes_routes() { |
| const NODE_ID: u64 = 42; |
| const FOREIGN_NODE_ID: u64 = 23; |
| const UNRELATED_NODE_ID: u64 = 82; |
| const NODE_LINK_ID: u64 = 99; |
| |
| let routes = Arc::new(Routes::new()); |
| routes |
| .db |
| .edit(|db| { |
| db.links.insert( |
| NodeId(NODE_ID), |
| LinkMetrics { round_trip_time: None, node_link_id: NodeLinkId(NODE_LINK_ID) }, |
| ); |
| db.links.insert( |
| NodeId(UNRELATED_NODE_ID), |
| LinkMetrics { round_trip_time: None, node_link_id: NodeLinkId(NODE_LINK_ID) }, |
| ); |
| db.routes.push(Route { |
| destination: NodeId(FOREIGN_NODE_ID), |
| via: NodeId(NODE_ID), |
| received_metrics: ReceivedMetrics { |
| round_trip_time: None, |
| intermediate_hops: Vec::new(), |
| }, |
| }); |
| db.routes.push(Route { |
| destination: NodeId(FOREIGN_NODE_ID), |
| via: NodeId(UNRELATED_NODE_ID), |
| received_metrics: ReceivedMetrics { |
| round_trip_time: None, |
| intermediate_hops: Vec::new(), |
| }, |
| }); |
| }) |
| .await; |
| |
| let mut updated = BTreeMap::new(); |
| updated.insert( |
| NodeId(UNRELATED_NODE_ID), |
| LinkMetrics { round_trip_time: None, node_link_id: NodeLinkId(NODE_LINK_ID) }, |
| ); |
| |
| let link_state = Observable::new(updated); |
| |
| let _merger = Task::spawn(Arc::clone(&routes).merge_links(link_state.new_observer())); |
| |
| let mut observer = routes.db.new_observer(); |
| let mut timeout = fuchsia_async::Timer::new(std::time::Duration::from_secs(5)).fuse(); |
| |
| while let Some(mut db) = futures::select! { |
| res = observer.next().fuse() => res, |
| _ = timeout => None, |
| } { |
| if db.links.len() > 1 { |
| // Assume we've seen an update for the initial values, before the merger's done its |
| // thing. |
| assert!(db.routes.len() > 1); |
| continue; |
| } |
| |
| assert_eq!(1, db.links.len()); |
| assert_eq!(1, db.routes.len()); |
| let route = db.routes.pop().unwrap(); |
| |
| assert!(db.links.contains_key(&NodeId(UNRELATED_NODE_ID))); |
| assert_eq!(NodeId(UNRELATED_NODE_ID), route.via); |
| |
| return; |
| } |
| |
| panic!("Timed out waiting for route db update"); |
| } |
| } |