| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| mod faketime_integration; |
| mod timekeeper_integration; |
| |
| use chrono::{Datelike, TimeZone, Timelike}; |
| use fidl::endpoints::{create_endpoints, ServerEnd}; |
| use fidl_fuchsia_cobalt::{CobaltEvent, LoggerFactoryMarker}; |
| use fidl_fuchsia_cobalt_test::{LogMethod, LoggerQuerierMarker, LoggerQuerierProxy}; |
| use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream}; |
| use fidl_fuchsia_io::{NodeMarker, MODE_TYPE_DIRECTORY, OPEN_RIGHT_READABLE, OPEN_RIGHT_WRITABLE}; |
| use fidl_fuchsia_logger::LogSinkMarker; |
| use fidl_fuchsia_testing::{ |
| FakeClockControlMarker, FakeClockControlProxy, FakeClockMarker, FakeClockProxy, |
| }; |
| use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream}; |
| use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample}; |
| use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream}; |
| use fuchsia_async as fasync; |
| use fuchsia_component::{ |
| client::{launcher, App, AppBuilder}, |
| server::{NestedEnvironment, ServiceFs}, |
| }; |
| use fuchsia_zircon::{self as zx, HandleBased, Rights}; |
| use futures::{ |
| channel::mpsc::{channel, Receiver, Sender}, |
| stream::{Stream, StreamExt, TryStreamExt}, |
| FutureExt, SinkExt, |
| }; |
| use lazy_static::lazy_static; |
| use log::{debug, info}; |
| use parking_lot::Mutex; |
| use push_source::{PushSource, TestUpdateAlgorithm, Update}; |
| use std::{ops::Deref, sync::Arc}; |
| use time_metrics_registry::PROJECT_ID; |
| use vfs::{directory::entry::DirectoryEntry, pseudo_directory}; |
| |
| /// URL for timekeeper. |
| const TIMEKEEPER_URL: &str = |
| "fuchsia-pkg://fuchsia.com/timekeeper-integration#meta/timekeeper_for_integration.cmx"; |
| /// URL for timekeeper with fake time. |
| const TIMEKEEPER_FAKE_TIME_URL: &str = |
| "fuchsia-pkg://fuchsia.com/timekeeper-integration#meta/timekeeper_with_fake_time.cmx"; |
| /// URL for the fake time component. |
| const FAKE_TIME_URL: &str = "fuchsia-pkg://fuchsia.com/timekeeper-integration#meta/fake_clock.cmx"; |
| /// URL for fake cobalt. |
| const COBALT_URL: &str = "fuchsia-pkg://fuchsia.com/mock_cobalt#meta/mock_cobalt.cmx"; |
| |
| /// A reference to a timekeeper running inside a nested environment which runs fake versions of |
| /// the services timekeeper requires. |
| pub struct NestedTimekeeper { |
| /// Application object for timekeeper. Kept in memory to keep the component alive. |
| _timekeeper_app: App, |
| |
| /// Application objects for additional launched components. Kept in memory to keep components |
| /// alive. |
| _launched_apps: Vec<App>, |
| |
| /// The nested environment timekeeper is running in. Needs to be kept |
| /// in scope to keep the nested environment alive. |
| _nested_envronment: NestedEnvironment, |
| |
| /// Task running fake services injected into the nested environment. |
| _task: fasync::Task<()>, |
| } |
| |
| /// Services injected and implemented by `NestedTimekeeper`. |
| enum InjectedServices { |
| TimeSourceControl(TimeSourceControlRequestStream), |
| Maintenance(MaintenanceRequestStream), |
| } |
| |
| /// A `PushSource` that allows a single client and can be controlled by a test. |
| pub struct PushSourcePuppet { |
| /// Channel through which connection requests are received. |
| client_recv: Receiver<ServerEnd<PushSourceMarker>>, |
| /// Push source implementation. |
| push_source: Arc<PushSource<TestUpdateAlgorithm>>, |
| /// Sender to push updates to `push_source`. |
| update_sink: Sender<Update>, |
| /// Task for retrieving updates in `push_source`. |
| /// Kept in memory to ensure that the push source serves requests. |
| _update_task: fasync::Task<()>, |
| /// Task serving the client. |
| client_task: Option<fasync::Task<()>>, |
| } |
| |
| impl PushSourcePuppet { |
| /// Create a new `PushSourcePuppet` that receives new client channels through `client_recv`. |
| fn new(client_recv: Receiver<ServerEnd<PushSourceMarker>>) -> Self { |
| let (update_algorithm, update_sink) = TestUpdateAlgorithm::new(); |
| let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap()); |
| let push_source_clone = Arc::clone(&push_source); |
| let update_task = fasync::Task::spawn(async move { |
| push_source_clone.poll_updates().await.unwrap(); |
| }); |
| Self { client_recv, push_source, update_sink, _update_task: update_task, client_task: None } |
| } |
| |
| /// Set the next sample reported by the time source. |
| pub async fn set_sample(&mut self, sample: TimeSample) { |
| self.ensure_client().await; |
| self.update_sink.send(sample.into()).await.unwrap(); |
| } |
| |
| /// Set the next status reported by the time source. |
| #[allow(dead_code)] |
| pub async fn set_status(&mut self, status: Status) { |
| self.ensure_client().await; |
| self.update_sink.send(status.into()).await.unwrap(); |
| } |
| |
| /// Wait for a client to connect if there's no existing client. |
| pub async fn ensure_client(&mut self) { |
| if self.client_task.is_none() { |
| let server_end = self.client_recv.next().await.unwrap(); |
| let push_source_clone = Arc::clone(&self.push_source); |
| self.client_task.replace(fasync::Task::spawn(async move { |
| push_source_clone |
| .handle_requests_for_stream(server_end.into_stream().unwrap()) |
| .await |
| .unwrap(); |
| })); |
| } |
| } |
| |
| /// Simulate a crash by closing client channels and wiping state. |
| pub async fn simulate_crash(&mut self) { |
| let (update_algorithm, update_sink) = TestUpdateAlgorithm::new(); |
| self.update_sink = update_sink; |
| self.push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap()); |
| self.client_task.take(); |
| let push_source_clone = Arc::clone(&self.push_source); |
| self._update_task = fasync::Task::spawn(async move { |
| push_source_clone.poll_updates().await.unwrap(); |
| }); |
| } |
| } |
| |
| /// The list of RTC update requests recieved by a `NestedTimekeeper`. |
| #[derive(Clone, Debug)] |
| pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>); |
| |
| impl RtcUpdates { |
| /// Get all received RTC times as a vec. |
| pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> { |
| self.0.lock().clone() |
| } |
| } |
| |
| /// A wrapper around a `FakeClockControlProxy` that also allows a client to read the current fake |
| /// time. |
| pub struct FakeClockController { |
| control_proxy: FakeClockControlProxy, |
| clock_proxy: FakeClockProxy, |
| } |
| |
| impl Deref for FakeClockController { |
| type Target = FakeClockControlProxy; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.control_proxy |
| } |
| } |
| |
| impl FakeClockController { |
| pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> { |
| self.clock_proxy.get().await |
| } |
| } |
| |
| impl NestedTimekeeper { |
| /// Launches an instance of timekeeper maintaining the provided |clock| in a nested |
| /// environment. If |initial_rtc_time| is provided, then the environment contains a fake RTC |
| /// device that reports the time as |initial_rtc_time|. |
| /// If use_fake_clock is true, also launches a fake clock service. |
| /// Returns a `NestedTimekeeper`, handles to the PushSource and RTC it obtains updates from, |
| /// Cobalt debug querier, and a fake clock control handle if use_fake_clock is true. |
| pub fn new( |
| clock: Arc<zx::Clock>, |
| initial_rtc_time: Option<zx::Time>, |
| use_fake_clock: bool, |
| ) -> (Self, PushSourcePuppet, RtcUpdates, LoggerQuerierProxy, Option<FakeClockController>) { |
| let mut service_fs = ServiceFs::new(); |
| // Route logs for components in nested env to the same logsink as the test. |
| service_fs.add_proxy_service::<LogSinkMarker, _>(); |
| |
| let mut launched_apps = vec![]; |
| // Launch a new instance of cobalt for each environment. This allows verifying |
| // the events cobalt receives for each test case. |
| let cobalt_app = AppBuilder::new(COBALT_URL).spawn(&launcher().unwrap()).unwrap(); |
| service_fs.add_proxy_service_to::<LoggerFactoryMarker, _>(Arc::clone( |
| cobalt_app.directory_request(), |
| )); |
| let cobalt_querier = cobalt_app.connect_to_service::<LoggerQuerierMarker>().unwrap(); |
| launched_apps.push(cobalt_app); |
| |
| // Launch fake clock if needed, again a new instance for each environment. |
| let fake_clock_control = if use_fake_clock { |
| let fake_clock_app = |
| AppBuilder::new(FAKE_TIME_URL).spawn(&launcher().unwrap()).unwrap(); |
| service_fs.add_proxy_service_to::<FakeClockMarker, _>(Arc::clone( |
| fake_clock_app.directory_request(), |
| )); |
| let control_proxy = |
| fake_clock_app.connect_to_service::<FakeClockControlMarker>().unwrap(); |
| let clock_proxy = fake_clock_app.connect_to_service::<FakeClockMarker>().unwrap(); |
| launched_apps.push(fake_clock_app); |
| Some(FakeClockController { control_proxy, clock_proxy }) |
| } else { |
| None |
| }; |
| |
| // Inject test control and maintenence services. |
| service_fs.add_fidl_service(InjectedServices::TimeSourceControl); |
| service_fs.add_fidl_service(InjectedServices::Maintenance); |
| // Inject fake devfs. |
| let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![]))); |
| let rtc_update_clone = rtc_updates.clone(); |
| let (devmgr_client, devmgr_server) = create_endpoints::<NodeMarker>().unwrap(); |
| let fake_devfs = match initial_rtc_time { |
| Some(initial_time) => pseudo_directory! { |
| "class" => pseudo_directory! { |
| "rtc" => pseudo_directory! { |
| "000" => vfs::service::host(move |stream| { |
| debug!("Fake RTC connected."); |
| Self::serve_fake_rtc(initial_time, rtc_update_clone.clone(), stream) |
| }) |
| } |
| } |
| }, |
| None => pseudo_directory! { |
| "class" => pseudo_directory! { |
| "rtc" => pseudo_directory! { |
| } |
| } |
| }, |
| }; |
| fake_devfs.open( |
| vfs::execution_scope::ExecutionScope::new(), |
| OPEN_RIGHT_READABLE | OPEN_RIGHT_WRITABLE, |
| MODE_TYPE_DIRECTORY, |
| vfs::path::Path::empty(), |
| devmgr_server, |
| ); |
| |
| let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL }; |
| |
| let nested_environment = |
| service_fs.create_salted_nested_environment("timekeeper_test").unwrap(); |
| let timekeeper_app = AppBuilder::new(timekeeper_url) |
| .add_handle_to_namespace("/dev".to_string(), devmgr_client.into_handle()) |
| .spawn(nested_environment.launcher()) |
| .unwrap(); |
| |
| let (server_end_send, server_end_recv) = channel(0); |
| |
| let injected_service_fut = async move { |
| service_fs |
| .for_each_concurrent(None, |conn_req| async { |
| match conn_req { |
| InjectedServices::TimeSourceControl(stream) => { |
| debug!("Time source control service connected."); |
| Self::serve_test_control(server_end_send.clone(), stream).await; |
| } |
| InjectedServices::Maintenance(stream) => { |
| debug!("Maintenance service connected."); |
| Self::serve_maintenance(Arc::clone(&clock), stream).await; |
| } |
| } |
| }) |
| .await; |
| }; |
| |
| let nested_timekeeper = NestedTimekeeper { |
| _timekeeper_app: timekeeper_app, |
| _launched_apps: launched_apps, |
| _nested_envronment: nested_environment, |
| _task: fasync::Task::spawn(injected_service_fut), |
| }; |
| |
| ( |
| nested_timekeeper, |
| PushSourcePuppet::new(server_end_recv), |
| rtc_updates, |
| cobalt_querier, |
| fake_clock_control, |
| ) |
| } |
| |
| async fn serve_test_control( |
| server_end_sender: Sender<ServerEnd<PushSourceMarker>>, |
| stream: TimeSourceControlRequestStream, |
| ) { |
| stream |
| .try_for_each_concurrent(None, |req| async { |
| let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req; |
| server_end_sender.clone().send(push_source).await.unwrap(); |
| Ok(()) |
| }) |
| .await |
| .unwrap(); |
| } |
| |
| async fn serve_fake_rtc( |
| initial_time: zx::Time, |
| rtc_updates: RtcUpdates, |
| mut stream: DeviceRequestStream, |
| ) { |
| while let Some(req) = stream.try_next().await.unwrap() { |
| match req { |
| DeviceRequest::Get { responder } => { |
| // Since timekeeper only pulls a time off of the RTC device once on startup, we |
| // don't attempt to update the sent time. |
| responder.send(&mut zx_time_to_rtc_time(initial_time)).unwrap(); |
| info!("Sent response from fake RTC."); |
| } |
| DeviceRequest::Set { rtc, responder } => { |
| rtc_updates.0.lock().push(rtc); |
| responder.send(zx::Status::OK.into_raw()).unwrap(); |
| } |
| } |
| } |
| } |
| |
| async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) { |
| while let Some(req) = stream.try_next().await.unwrap() { |
| let MaintenanceRequest::GetWritableUtcClock { responder } = req; |
| responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap(); |
| } |
| } |
| |
| /// Cleanly tear down the timekeeper and fakes. This is done manually so that timekeeper is |
| /// always torn down first, avoiding the situation where timekeeper sees a dependency close |
| /// its channel and log an error in response. |
| pub async fn teardown(self) { |
| let mut app = self._timekeeper_app; |
| app.kill().unwrap(); |
| app.wait().await.unwrap(); |
| let _ = self._task.cancel(); |
| } |
| } |
| |
| fn from_rfc2822(date: &str) -> zx::Time { |
| zx::Time::from_nanos(chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos()) |
| } |
| |
| lazy_static! { |
| pub static ref BACKSTOP_TIME: zx::Time = from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT"); |
| pub static ref VALID_RTC_TIME: zx::Time = from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT"); |
| pub static ref BEFORE_BACKSTOP_TIME: zx::Time = from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT"); |
| pub static ref VALID_TIME: zx::Time = from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT"); |
| pub static ref VALID_TIME_2: zx::Time = from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT"); |
| } |
| |
| /// Time between each reported sample. |
| pub const BETWEEN_SAMPLES: zx::Duration = zx::Duration::from_seconds(5); |
| |
| /// The standard deviation to report on valid time samples. |
| pub const STD_DEV: zx::Duration = zx::Duration::from_millis(50); |
| |
| /// Create a new clock with backstop time set to `BACKSTOP_TIME`. |
| pub fn new_clock() -> Arc<zx::Clock> { |
| Arc::new(zx::Clock::create(zx::ClockOpts::empty(), Some(*BACKSTOP_TIME)).unwrap()) |
| } |
| |
| fn zx_time_to_rtc_time(zx_time: zx::Time) -> fidl_fuchsia_hardware_rtc::Time { |
| let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos()); |
| fidl_fuchsia_hardware_rtc::Time { |
| seconds: date.second() as u8, |
| minutes: date.minute() as u8, |
| hours: date.hour() as u8, |
| day: date.day() as u8, |
| month: date.month() as u8, |
| year: date.year() as u16, |
| } |
| } |
| |
| pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::Time { |
| let date = chrono::Utc |
| .ymd(rtc_time.year as i32, rtc_time.month as u32, rtc_time.day as u32) |
| .and_hms(rtc_time.hours as u32, rtc_time.minutes as u32, rtc_time.seconds as u32); |
| zx::Time::from_nanos(date.timestamp_nanos()) |
| } |
| |
| /// Create a stream of CobaltEvents from a proxy. |
| pub fn create_cobalt_event_stream( |
| proxy: Arc<LoggerQuerierProxy>, |
| log_method: LogMethod, |
| ) -> std::pin::Pin<Box<dyn Stream<Item = CobaltEvent>>> { |
| async_utils::hanging_get::client::GeneratedFutureStream::new(Box::new(move || { |
| Some(proxy.watch_logs2(PROJECT_ID, log_method).map(|res| res.unwrap().0)) |
| })) |
| .map(futures::stream::iter) |
| .flatten() |
| .boxed() |
| } |