| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #![warn(missing_docs)] |
| |
| //! `timekeeper` is responsible for external time synchronization in Fuchsia. |
| |
| use { |
| anyhow::{Context as _, Error}, |
| chrono::prelude::*, |
| fidl_fuchsia_deprecatedtimezone as ftz, fidl_fuchsia_net as fnet, fidl_fuchsia_time as ftime, |
| fuchsia_async::{self as fasync, DurationExt}, |
| fuchsia_component::server::ServiceFs, |
| fuchsia_zircon as zx, |
| futures::{StreamExt, TryStreamExt}, |
| log::{debug, error, info, warn}, |
| parking_lot::Mutex, |
| std::{path::Path, sync::Arc}, |
| }; |
| |
| mod diagnostics; |
| |
| #[fasync::run_singlethreaded] |
| async fn main() -> Result<(), Error> { |
| diagnostics::init(); |
| let mut fs = ServiceFs::new(); |
| |
| info!("diagnostics initialized, connecting notifier to servicefs."); |
| diagnostics::INSPECTOR.serve(&mut fs)?; |
| |
| let source = initial_utc_source("/config/build-info/minimum-utc-stamp".as_ref())?; |
| let notifier = Notifier::new(source); |
| |
| info!("connecting to external update service"); |
| let time_service = |
| fuchsia_component::client::connect_to_service::<ftz::TimeServiceMarker>().unwrap(); |
| let connectivity_service = |
| fuchsia_component::client::connect_to_service::<fnet::ConnectivityMarker>().unwrap(); |
| |
| fasync::spawn(maintain_utc(notifier.clone(), time_service, connectivity_service)); |
| |
| fs.dir("svc").add_fidl_service(move |requests: ftime::UtcRequestStream| { |
| notifier.handle_request_stream(requests); |
| }); |
| |
| fs.take_and_serve_directory_handle()?; |
| Ok(fs.collect().await) |
| } |
| |
| fn backstop_time(path: &Path) -> Result<DateTime<Utc>, Error> { |
| let file_contents = std::fs::read_to_string(path).context("reading backstop time from disk")?; |
| let parsed_offset = NaiveDateTime::parse_from_str(file_contents.trim(), "%s")?; |
| let utc = DateTime::from_utc(parsed_offset, Utc); |
| Ok(utc) |
| } |
| |
| fn initial_utc_source(backstop_path: &Path) -> Result<Option<ftime::UtcSource>, Error> { |
| let expected_minimum = backstop_time(backstop_path)?; |
| let current_utc = Utc::now(); |
| Ok(if current_utc > expected_minimum { |
| Some(ftime::UtcSource::Backstop) |
| } else { |
| warn!( |
| "latest known-past UTC time ({}) should be earlier than current system time ({})", |
| expected_minimum, current_utc, |
| ); |
| None |
| }) |
| } |
| |
| /// The top-level control loop for time synchronization. |
| /// |
| /// Checks for network connectivity before attempting any time updates. |
| /// |
| /// Actual updates are performed by calls to `fuchsia.deprecatedtimezone.TimeService` which we |
| /// plan to deprecate. |
| async fn maintain_utc( |
| notifs: Notifier, |
| time_service: ftz::TimeServiceProxy, |
| connectivity: fnet::ConnectivityProxy, |
| ) { |
| info!("waiting for network connectivity before attempting network time sync..."); |
| let mut conn_events = connectivity.take_event_stream(); |
| loop { |
| if let Ok(Some(fnet::ConnectivityEvent::OnNetworkReachable { reachable: true })) = |
| conn_events.try_next().await |
| { |
| break; |
| } |
| } |
| |
| for i in 0.. { |
| let sleep_duration = zx::Duration::from_seconds(2i64.pow(i)); // exponential backoff |
| info!("requesting roughtime service update the system time..."); |
| match time_service.update(1).await { |
| Ok(true) => { |
| let monotonic_before = zx::Time::get(zx::ClockId::Monotonic).into_nanos(); |
| let utc_now = Utc::now().timestamp_nanos(); |
| let monotonic_after = zx::Time::get(zx::ClockId::Monotonic).into_nanos(); |
| info!( |
| "CF-884:monotonic_before={}:utc={}:monotonic_after={}", |
| monotonic_before, utc_now, monotonic_after, |
| ); |
| notifs.0.lock().set_source(ftime::UtcSource::External, monotonic_before); |
| break; |
| } |
| Ok(false) => { |
| debug!( |
| "failed to update time, probably a network error. retrying in {}s.", |
| sleep_duration.into_seconds() |
| ); |
| } |
| Err(why) => { |
| error!("couldn't make request to update time: {:?}", why); |
| } |
| } |
| fasync::Timer::new(sleep_duration.after_now()).await; |
| } |
| } |
| |
| /// Notifies waiting clients when the clock has been updated, wrapped in a lock to allow |
| /// sharing between tasks. |
| #[derive(Clone, Debug)] |
| struct Notifier(Arc<Mutex<NotifyInner>>); |
| |
| impl Notifier { |
| fn new(source: Option<ftime::UtcSource>) -> Self { |
| Notifier(Arc::new(Mutex::new(NotifyInner { source, clients: Vec::new() }))) |
| } |
| |
| /// Spawns an async task to handle requests on this channel. |
| fn handle_request_stream(&self, requests: ftime::UtcRequestStream) { |
| let notifier = self.clone(); |
| fasync::spawn(async move { |
| let mut counted_requests = requests.enumerate(); |
| let mut last_seen_state = notifier.0.lock().source; |
| while let Some((request_count, Ok(ftime::UtcRequest::WatchState { responder }))) = |
| counted_requests.next().await |
| { |
| let mut n = notifier.0.lock(); |
| // we return immediately if this is the first request on this channel, but if |
| // the backstop time hasn't been set yet then we can't say anything |
| if n.source.is_some() && (request_count == 0 || last_seen_state != n.source) { |
| n.reply(responder, zx::Time::get(zx::ClockId::Monotonic).into_nanos()); |
| } else { |
| n.register(responder); |
| } |
| last_seen_state = n.source; |
| } |
| }); |
| } |
| } |
| |
| /// Notifies waiting clients when the clock has been updated. |
| #[derive(Debug)] |
| struct NotifyInner { |
| /// The current source for our UTC approximation. |
| source: Option<ftime::UtcSource>, |
| /// All clients waiting for an update to UTC's time. |
| clients: Vec<ftime::UtcWatchStateResponder>, |
| } |
| |
| impl NotifyInner { |
| /// Reply to a client with the current UtcState. |
| fn reply(&self, responder: ftime::UtcWatchStateResponder, update_time: i64) { |
| if let Err(why) = |
| responder.send(ftime::UtcState { timestamp: Some(update_time), source: self.source }) |
| { |
| warn!("failed to notify a client of an update: {:?}", why); |
| } |
| } |
| |
| /// Registers a client to be later notified that a clock update has occurred. |
| fn register(&mut self, responder: ftime::UtcWatchStateResponder) { |
| info!("registering a client for notifications"); |
| self.clients.push(responder); |
| } |
| |
| /// Increases the revision counter by 1 and notifies any clients waiting on updates from |
| /// previous revisions. |
| fn set_source(&mut self, source: ftime::UtcSource, update_time: i64) { |
| if self.source != Some(source) { |
| self.source = Some(source); |
| let clients = std::mem::replace(&mut self.clients, vec![]); |
| info!("UTC source changed to {:?}, notifying {} clients", source, clients.len()); |
| for responder in clients { |
| self.reply(responder, update_time); |
| } |
| } else { |
| info!("received UTC source update but the actual source didn't change."); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| #[allow(unused)] |
| use { |
| super::*, |
| chrono::{offset::TimeZone, NaiveDate}, |
| fuchsia_inspect::{assert_inspect_tree, testing::AnyProperty}, |
| fuchsia_zircon as zx, |
| std::{ |
| future::Future, |
| pin::Pin, |
| task::{Context, Poll, Waker}, |
| }, |
| }; |
| |
| #[test] |
| fn fixed_backstop_check() { |
| let y2k_backstop = "/pkg/data/y2k"; |
| let test_backstop = backstop_time(y2k_backstop.as_ref()).unwrap(); |
| let test_source = initial_utc_source(y2k_backstop.as_ref()).unwrap(); |
| let before_test_backstop = |
| Utc.from_utc_datetime(&NaiveDate::from_ymd(1999, 1, 1).and_hms(0, 0, 0)); |
| let after_test_backstop = |
| Utc.from_utc_datetime(&NaiveDate::from_ymd(2001, 1, 1).and_hms(0, 0, 0)); |
| |
| assert!(test_backstop > before_test_backstop); |
| assert!(test_backstop < after_test_backstop); |
| assert_eq!(test_source, Some(ftime::UtcSource::Backstop)); |
| } |
| |
| #[test] |
| fn fallible_backstop_check() { |
| assert_eq!(initial_utc_source("/pkg/data/end-of-unix-time".as_ref()).unwrap(), None); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn single_client() { |
| diagnostics::init(); |
| info!("starting single notification test"); |
| |
| let (utc, utc_requests) = |
| fidl::endpoints::create_proxy_and_stream::<ftime::UtcMarker>().unwrap(); |
| let (time_service, mut time_requests) = |
| fidl::endpoints::create_proxy_and_stream::<ftz::TimeServiceMarker>().unwrap(); |
| let (reachability, reachability_server) = |
| fidl::endpoints::create_proxy::<fnet::ConnectivityMarker>().unwrap(); |
| |
| // the "network" the time sync server uses is de facto reachable here |
| let (_, reachability_control) = |
| reachability_server.into_stream_and_control_handle().unwrap(); |
| reachability_control.send_on_network_reachable(true).unwrap(); |
| |
| let notifier = Notifier::new(Some(ftime::UtcSource::Backstop)); |
| let (mut allow_update, mut wait_for_update) = futures::channel::mpsc::channel(1); |
| info!("spawning test notifier"); |
| notifier.handle_request_stream(utc_requests); |
| fasync::spawn(maintain_utc(notifier.clone(), time_service, reachability)); |
| |
| fasync::spawn(async move { |
| while let Some(Ok(ftz::TimeServiceRequest::Update { responder, .. })) = |
| time_requests.next().await |
| { |
| let () = wait_for_update.next().await.unwrap(); |
| responder.send(true).unwrap(); |
| } |
| }); |
| |
| info!("checking that the time source has not been externally initialized yet"); |
| assert_eq!(utc.watch_state().await.unwrap().source.unwrap(), ftime::UtcSource::Backstop); |
| |
| let task_waker = futures::future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await; |
| let mut cx = Context::from_waker(&task_waker); |
| |
| let mut hanging = Box::pin(utc.watch_state()); |
| assert!( |
| hanging.as_mut().poll(&mut cx).is_pending(), |
| "hanging get should not return before time updated event has been emitted" |
| ); |
| |
| info!("sending network update event"); |
| allow_update.try_send(()).unwrap(); |
| |
| info!("waiting for time source update"); |
| assert_eq!(hanging.await.unwrap().source.unwrap(), ftime::UtcSource::External); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn inspect_values_are_present() -> Result<(), Error> { |
| diagnostics::init(); |
| assert_inspect_tree!(diagnostics::INSPECTOR, |
| root: contains { |
| start_time_monotonic_nanos: AnyProperty, |
| current: contains { |
| system_uptime_monotonic_nanos: AnyProperty, |
| utc_nanos: AnyProperty, |
| } |
| }); |
| Ok(()) |
| } |
| } |