blob: 03a6b1c59192dc5ef1127d9fa1d07a972fe1f3cb [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod bound;
mod constants;
mod datatypes;
mod diagnostics;
mod httpsdate;
mod sampler;
use {
crate::diagnostics::{
CobaltDiagnostics, CompositeDiagnostics, Diagnostics, InspectDiagnostics,
},
crate::httpsdate::{HttpsDateUpdateAlgorithm, RetryStrategy},
crate::sampler::{HttpsSampler, HttpsSamplerImpl},
anyhow::{Context, Error},
fidl_fuchsia_net_interfaces::StateMarker,
fidl_fuchsia_time_external::{
PullSourceRequestStream, PushSourceRequestStream, Status, Urgency,
},
fuchsia_component::server::{ServiceFs, ServiceObj},
fuchsia_zircon as zx,
futures::{
future::{join, Future},
FutureExt, StreamExt,
},
pull_source::PullSource,
push_source::PushSource,
std::collections::HashMap,
tracing::warn,
};
/// Retry strategy used while polling for time.
const RETRY_STRATEGY: RetryStrategy = RetryStrategy {
min_between_failures: zx::Duration::from_seconds(1),
max_exponent: 3,
tries_per_exponent: 3,
converge_time_between_samples: zx::Duration::from_minutes(2),
maintain_time_between_samples: zx::Duration::from_minutes(20),
};
/// HttpsDate config, populated from build-time generated structured config.
pub struct Config {
https_timeout: zx::Duration,
standard_deviation_bound_percentage: u8,
first_rtt_time_factor: u16,
use_pull_api: bool,
sample_config_by_urgency: HashMap<Urgency, SampleConfig>,
}
pub struct SampleConfig {
max_attempts: u32,
num_polls: u32,
}
impl From<httpsdate_config::Config> for Config {
fn from(source: httpsdate_config::Config) -> Self {
let sample_config_by_urgency = [
(
Urgency::Low,
SampleConfig {
max_attempts: source.max_attempts_urgency_low,
num_polls: source.num_polls_urgency_low,
},
),
(
Urgency::Medium,
SampleConfig {
max_attempts: source.max_attempts_urgency_medium,
num_polls: source.num_polls_urgency_medium,
},
),
(
Urgency::High,
SampleConfig {
max_attempts: source.max_attempts_urgency_high,
num_polls: source.num_polls_urgency_high,
},
),
]
.into_iter()
.collect();
Config {
https_timeout: zx::Duration::from_seconds(source.https_timeout_sec.into()),
standard_deviation_bound_percentage: source.standard_deviation_bound_percentage,
first_rtt_time_factor: source.first_rtt_time_factor,
use_pull_api: source.use_pull_api,
sample_config_by_urgency,
}
}
}
/// Serves `PushSource` FIDL API.
pub struct PushServer<
'a,
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
> {
push_source: PushSource<HttpsDateUpdateAlgorithm<'a, S, D, N>>,
}
impl<'a, S, D, N> PushServer<'a, S, D, N>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
fn new(
diagnostics: D,
sampler: S,
internet_reachable: N,
config: &'a Config,
) -> Result<Self, Error> {
let update_algorithm = HttpsDateUpdateAlgorithm::new(
RETRY_STRATEGY,
diagnostics,
sampler,
internet_reachable,
config,
);
let push_source = PushSource::new(update_algorithm, Status::Initializing)?;
Ok(PushServer { push_source })
}
/// Start serving `PushSource` FIDL API.
fn serve<'b>(
&'b self,
fs: &'b mut ServiceFs<ServiceObj<'static, PushSourceRequestStream>>,
) -> Result<impl 'b + Future<Output = Result<(), anyhow::Error>>, Error> {
let update_fut = self.push_source.poll_updates();
fs.dir("svc").add_fidl_service(|stream: PushSourceRequestStream| stream);
let service_fut = fs.for_each_concurrent(None, |stream| {
handle_push_source_request(stream, &self.push_source)
});
Ok(join(update_fut, service_fut).map(|(update_result, _serve_result)| update_result))
}
}
/// Handle next `PushSource` FIDL API request.
async fn handle_push_source_request<T: push_source::UpdateAlgorithm>(
stream: PushSourceRequestStream,
push_source: &PushSource<T>,
) {
push_source
.handle_requests_for_stream(stream)
.await
.unwrap_or_else(|e| warn!("Error handling PushSource stream: {:?}", e));
}
/// Serves `PullSource` FIDL API.
pub struct PullServer<
'a,
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
> {
pull_source: PullSource<HttpsDateUpdateAlgorithm<'a, S, D, N>>,
}
impl<'a, S, D, N> PullServer<'a, S, D, N>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
fn new(
diagnostics: D,
sampler: S,
internet_reachable: N,
config: &'a Config,
) -> Result<Self, Error> {
let update_algorithm = HttpsDateUpdateAlgorithm::new(
RETRY_STRATEGY,
diagnostics,
sampler,
internet_reachable,
config,
);
let pull_source = PullSource::new(update_algorithm)?;
Ok(PullServer { pull_source })
}
/// Start serving `PullSource` FIDL API.
fn serve<'b>(
&'b self,
fs: &'b mut ServiceFs<ServiceObj<'static, PullSourceRequestStream>>,
) -> Result<impl 'b + Future<Output = Result<(), anyhow::Error>>, Error> {
fs.dir("svc").add_fidl_service(|stream: PullSourceRequestStream| stream);
Ok(fs
.for_each_concurrent(None, |stream| {
handle_pull_source_request(stream, &self.pull_source)
})
.map(|_| Ok(())))
}
}
/// Handle next `PullSource` FIDL API request.
async fn handle_pull_source_request<T: pull_source::UpdateAlgorithm>(
stream: PullSourceRequestStream,
pull_source: &PullSource<T>,
) {
pull_source
.handle_requests_for_stream(stream)
.await
.unwrap_or_else(|e| warn!("Error handling PullSource stream: {:?}", e));
}
/// Serves FIDL interfaces provided by the component.
async fn serve<S, D, N>(
config: &'_ Config,
sampler: S,
diagnostics: D,
internet_reachable: N,
) -> Result<(), Error>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
let _inspect_server_task = inspect_runtime::publish(
fuchsia_inspect::component::inspector(),
inspect_runtime::PublishOptions::default(),
);
if config.use_pull_api {
let mut fs = ServiceFs::new();
fs.take_and_serve_directory_handle()?;
let server = PullServer::new(diagnostics, sampler, internet_reachable, config)?;
let result = server.serve(&mut fs)?.await;
result
} else {
let mut fs = ServiceFs::new();
fs.take_and_serve_directory_handle()?;
let server = PushServer::new(diagnostics, sampler, internet_reachable, config)?;
let result = server.serve(&mut fs)?.await;
result
}
}
#[fuchsia::main(logging_tags=["time"])]
async fn main() -> Result<(), Error> {
let config = httpsdate_config::Config::take_from_startup_handle();
let time_source_url = config.time_source_endpoint_url.clone();
let inspector = fuchsia_inspect::component::inspector();
// Export structured configuration into diagnostics.
inspector.root().record_child("config", |config_node| config.record_inspect(config_node));
let inspect = InspectDiagnostics::new(inspector.root());
// From here on, use the local type `Config` for configuration bits.
let config: Config = config.into();
let (cobalt, cobalt_sender_fut) = CobaltDiagnostics::new();
let diagnostics = CompositeDiagnostics::new(inspect, cobalt);
let sampler = HttpsSamplerImpl::new(time_source_url.parse()?, &config);
let interface_state_service = fuchsia_component::client::connect_to_protocol::<StateMarker>()
.context("failed to connect to fuchsia.net.interfaces/State")?;
let internet_reachable = fidl_fuchsia_net_interfaces_ext::wait_for_reachability(
fidl_fuchsia_net_interfaces_ext::event_stream_from_state(
&interface_state_service,
fidl_fuchsia_net_interfaces_ext::IncludedAddresses::OnlyAssigned,
)
.context("failed to create network interface event stream")?,
)
.map(|r| r.context("reachability status stream error"));
let serve_fut = serve(&config, sampler, diagnostics, internet_reachable);
let (update_res, _) = join(serve_fut, cobalt_sender_fut).await;
update_res
}