blob: 1d8e3cac6716a1bec1d6be020ccdb47ee7d4d1c7 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::routes::LinkMetrics;
use crate::{
future_help::{Observable, Observer},
labels::{NodeId, NodeLinkId},
};
use anyhow::Error;
use futures::prelude::*;
use std::{
collections::{BTreeMap, HashMap},
time::Duration,
};
pub(crate) type LinkStatePublisher =
futures::channel::mpsc::Sender<(NodeLinkId, NodeId, Observer<Option<Duration>>)>;
pub(crate) type LinkStateReceiver =
futures::channel::mpsc::Receiver<(NodeLinkId, NodeId, Observer<Option<Duration>>)>;
type LinkStatusMap = HashMap<NodeLinkId, (NodeId, Option<Duration>)>;
/// The link status updater handles changes to the status of links. "Status" here means whether the
/// link is up or down, as well as the current ping time of the link as used for routing.
///
/// The [`LinkStateReceiver`] is a channel that will give us an [`Observer`] for each new link as it
/// is connected. The `Observer` contains an `Option<Duration>` where the duration given is the RTT
/// of that link. The option becomes `None` if the link becomes disconnected.
///
/// The link state is updated into the `observable` argument, which contains a `BTreeMap`
/// associating node IDs to information about the link to that node.
pub(crate) async fn run_link_status_updater(
observable: Observable<BTreeMap<NodeId, LinkMetrics>>,
receiver: LinkStateReceiver,
) -> Result<(), Error> {
let link_status = Observable::new(LinkStatusMap::new());
let link_status_observer = link_status.new_observer();
futures::future::join(collate(receiver, link_status), reduce(link_status_observer, observable))
.await;
Ok(())
}
/// Continually condenses updates about the status of active links into an
/// `Observable<LinkStatusMap>`.
async fn collate(receiver: LinkStateReceiver, link_status: Observable<LinkStatusMap>) {
let link_status = &link_status;
receiver
.for_each_concurrent(None, |(node_link_id, node_id, mut ping_time_observer)| async move {
while let Some(duration) = ping_time_observer.next().await {
link_status
.edit(|link_status| {
link_status.insert(node_link_id, (node_id, duration));
})
.await;
}
link_status
.edit(|link_status| {
link_status.remove(&node_link_id);
})
.await;
})
.await
}
/// Continually condenses a map of the status of all links into a map giving the status of the best
/// available link by which to reach each node.
async fn reduce(
mut link_status_observer: Observer<LinkStatusMap>,
observable: Observable<BTreeMap<NodeId, LinkMetrics>>,
) {
while let Some(link_status) = link_status_observer.next().await {
let mut new_status: BTreeMap<NodeId, LinkMetrics> = Default::default();
for (&node_link_id, &(node_id, round_trip_time)) in link_status.iter() {
let metrics = LinkMetrics { node_link_id, round_trip_time };
new_status
.entry(node_id)
.and_modify(|link_metrics| {
if metrics.score() > link_metrics.score() {
*link_metrics = metrics.clone();
}
})
.or_insert(metrics);
}
observable.push(new_status).await;
}
}