blob: 38a80642ccdf2e746d9b096c3bc3fbc6d074c429 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{anyhow, Context as _, Error},
fidl_fuchsia_time_external::{self as ftexternal, PushSourceProxy, Status},
fuchsia_component::client::{launch, launcher, App},
fuchsia_zircon as zx,
futures::{
stream::{Select, Stream},
FutureExt, StreamExt, TryFutureExt,
},
std::{fmt::Debug, pin::Pin, sync::Arc},
tracing::info,
};
/// A time sample received from a source of time.
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct Sample {
/// The UTC time.
pub utc: zx::Time,
/// The monotonic time at which the UTC was most valid.
pub monotonic: zx::Time,
/// The standard deviation of the UTC error.
pub std_dev: zx::Duration,
}
#[cfg(test)]
impl Sample {
/// Constructs a new `Sample`.
pub fn new(utc: zx::Time, monotonic: zx::Time, std_dev: zx::Duration) -> Sample {
Sample { utc, monotonic, std_dev }
}
}
/// An event that may be observed from a source of time.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Event {
/// The status of the time source changed.
StatusChange {
/// The current status of the time source.
status: Status,
},
/// The time source produced a new time sample.
Sample(Sample),
}
impl From<Sample> for Event {
fn from(sample: Sample) -> Event {
Event::Sample(sample)
}
}
/// A definition of a time source that may subsequently be launched to create a stream of update
/// events.
pub trait TimeSource: Send + Sync + Debug {
/// The type of `Stream` produced when launching the `TimeSource`.
type EventStream: Stream<Item = Result<Event, Error>> + Unpin + Send;
/// Attempts to launch the time source and return a stream of its time output and status
/// change events.
fn launch(&self) -> Result<Self::EventStream, Error>;
}
/// A time source that communicates using the `fuchsia.time.external.PushSource` protocol.
#[derive(Debug)]
pub struct PushTimeSource {
/// The fully qualified name of the component to launch.
component: String,
}
/// The `Stream` of events produced by a `PushTimeSource`
type PushTimeSourceEventStream = Select<
Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>,
Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>,
>;
impl PushTimeSource {
/// Creates a new `PushTimeSource` using the supplied component name.
pub fn new(component: String) -> Self {
PushTimeSource { component }
}
/// Returns a stream of time output and status change events received using the supplied
/// `PushSourceProxy`, retaining the optional `App` for the same lifetime.
fn events_from_proxy(app: Option<App>, proxy: PushSourceProxy) -> PushTimeSourceEventStream {
// Store the App in a tuple with the PushSourceProxy to ensure it remains in scope.
let app_and_proxy = Arc::new((app, proxy));
let app_and_proxy_clone = Arc::clone(&app_and_proxy);
let status_stream = futures::stream::try_unfold(app_and_proxy, |app_and_proxy| {
app_and_proxy
.1
.watch_status()
.map_ok(move |status| Some((Event::StatusChange { status }, app_and_proxy)))
.err_into()
});
let sample_stream = futures::stream::try_unfold(app_and_proxy_clone, |app_and_proxy| {
app_and_proxy.1.watch_sample().map(move |result| match result {
Ok(sample) => match (sample.utc, sample.monotonic, sample.standard_deviation) {
(None, _, _) => Err(anyhow!("sample missing utc")),
(_, None, _) => Err(anyhow!("sample missing monotonic")),
(_, _, None) => Err(anyhow!("sample missing standard deviation")),
(Some(utc), Some(monotonic), Some(std_dev)) => Ok(Some((
Event::Sample(Sample {
utc: zx::Time::from_nanos(utc),
monotonic: zx::Time::from_nanos(monotonic),
std_dev: zx::Duration::from_nanos(std_dev),
}),
app_and_proxy,
))),
},
Err(err) => Err(err.into()),
})
});
futures::stream::select(status_stream.boxed(), sample_stream.boxed())
}
}
impl TimeSource for PushTimeSource {
type EventStream = PushTimeSourceEventStream;
fn launch(&self) -> Result<Self::EventStream, Error> {
let launcher = launcher().context("starting launcher")?;
info!("Launching PushTimeSource at {}", self.component);
let app = launch(&launcher, self.component.clone(), None)
.context(format!("launching push source {}", self.component))?;
let proxy = app.connect_to_protocol::<ftexternal::PushSourceMarker>()?;
Ok(PushTimeSource::events_from_proxy(Some(app), proxy))
}
}
#[cfg(test)]
use {futures::stream, parking_lot::Mutex};
/// A time source that immediately produces a collections of events supplied at construction.
/// The time source may be launched multiple times and will return a different collection of events
/// on each launch. It will return pending after the last event in the last collection, and will
/// terminate the stream after the last event in all other collections. The time source will return
/// an error if asked to launch after the last collection of events has been returned.
#[cfg(test)]
pub struct FakeTimeSource {
/// The collections of events to return. The TimeSource will return pending after the last
/// event in the last collection, and will terminate the stream after the last event in all
/// other collections.
collections: Mutex<Vec<Vec<Result<Event, Error>>>>,
}
#[cfg(test)]
impl FakeTimeSource {
/// Creates a new `FakeTimeSource` that produces the supplied single collection of successful
/// events.
pub fn events(events: Vec<Event>) -> Self {
FakeTimeSource {
collections: Mutex::new(vec![events.into_iter().map(|evt| Ok(evt)).collect()]),
}
}
/// Creates a new `FakeTimeSource` that produces the supplied collections of successful events.
pub fn event_collections(event_collections: Vec<Vec<Event>>) -> Self {
FakeTimeSource {
collections: Mutex::new(
event_collections
.into_iter()
.map(|collection| collection.into_iter().map(|evt| Ok(evt)).collect())
.collect(),
),
}
}
/// Creates a new `FakeTimeSource` that produces the supplied collections of results.
pub fn result_collections(result_collections: Vec<Vec<Result<Event, Error>>>) -> Self {
FakeTimeSource { collections: Mutex::new(result_collections) }
}
/// Creates a new `FakeTimeSource` that always fails to launch.
pub fn failing() -> Self {
FakeTimeSource { collections: Mutex::new(vec![]) }
}
}
#[cfg(test)]
impl Debug for FakeTimeSource {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("FakeTimeSource")
}
}
#[cfg(test)]
impl TimeSource for FakeTimeSource {
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>;
fn launch(&self) -> Result<Self::EventStream, Error> {
let mut lock = self.collections.lock();
if lock.is_empty() {
return Err(anyhow!("FakeTimeSource sent all supplied event collections"));
}
let events = lock.remove(0);
// Return a pending after the last event if this was the last collection.
if lock.is_empty() {
Ok(stream::iter(events).chain(stream::pending()).boxed())
} else {
Ok(stream::iter(events).boxed())
}
}
}
#[cfg(test)]
mod test {
use {super::*, fidl::prelude::*, fuchsia_async as fasync, lazy_static::lazy_static};
const STATUS_1: Status = Status::Initializing;
const SAMPLE_1_UTC_NANOS: i64 = 1234567;
const SAMPLE_1_MONO_NANOS: i64 = 222;
const SAMPLE_1_STD_DEV_NANOS: i64 = 8888;
lazy_static! {
static ref STATUS_EVENT_1: Event = Event::StatusChange { status: STATUS_1 };
static ref SAMPLE_EVENT_1: Event = Event::from(Sample {
utc: zx::Time::from_nanos(SAMPLE_1_UTC_NANOS),
monotonic: zx::Time::from_nanos(SAMPLE_1_MONO_NANOS),
std_dev: zx::Duration::from_nanos(SAMPLE_1_STD_DEV_NANOS),
});
static ref SAMPLE_EVENT_2: Event = Event::from(Sample {
utc: zx::Time::from_nanos(12345678),
monotonic: zx::Time::from_nanos(333),
std_dev: zx::Duration::from_nanos(9999),
});
}
#[fuchsia::test(allow_stalls = false)]
async fn single_event_set() -> Result<(), Error> {
let fake = FakeTimeSource::events(vec![*STATUS_EVENT_1, *SAMPLE_EVENT_1, *SAMPLE_EVENT_2]);
let mut events = fake.launch().context("Fake should launch without error")?;
assert_eq!(events.next().await.unwrap().unwrap(), *STATUS_EVENT_1);
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_1);
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_2);
// Making another call should lead to a stall and hence panic. We don't test this to
// avoid a degenerate test, but do in fake_no_events_then_pending.
assert!(fake.launch().is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
async fn double_event_set() -> Result<(), Error> {
let fake = FakeTimeSource::event_collections(vec![
vec![*STATUS_EVENT_1, *SAMPLE_EVENT_1],
vec![*SAMPLE_EVENT_2],
]);
let mut events = fake.launch().context("Fake should launch without error")?;
assert_eq!(events.next().await.unwrap().unwrap(), *STATUS_EVENT_1);
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_1);
assert!(events.next().await.is_none());
let mut events = fake.launch().context("Fake should relaunch without error")?;
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_2);
// Making another call should lead to a stall and hence panic. We don't test this to
// avoid a degenerate test, but do in fake_no_events_then_pending.
assert!(fake.launch().is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
#[should_panic]
async fn fake_no_events_then_pending() {
let fake = FakeTimeSource::events(vec![]);
let mut events = fake.launch().unwrap();
// Getting an event from the last collection should never complete, leading to a stall.
events.next().await;
}
#[fuchsia::test]
fn fake_failing() {
let fake = FakeTimeSource::failing();
assert!(fake.launch().is_err());
}
#[fuchsia::test]
fn new_push_time_source() {
const COMPONENT_NAME: &str = "alfred";
let time_source = PushTimeSource::new(COMPONENT_NAME.to_string());
assert_eq!(time_source.component, COMPONENT_NAME);
}
#[fuchsia::test]
async fn push_time_source_events() {
let (proxy, mut requests) =
fidl::endpoints::create_proxy_and_stream::<ftexternal::PushSourceMarker>().unwrap();
let _task = fasync::Task::spawn(async move {
while let Some(Ok(request)) = requests.next().await {
match request {
ftexternal::PushSourceRequest::WatchStatus { responder, .. } => {
responder.send(STATUS_1).unwrap();
}
ftexternal::PushSourceRequest::WatchSample { responder, .. } => {
let sample = ftexternal::TimeSample {
utc: Some(SAMPLE_1_UTC_NANOS),
monotonic: Some(SAMPLE_1_MONO_NANOS),
standard_deviation: Some(SAMPLE_1_STD_DEV_NANOS),
..ftexternal::TimeSample::EMPTY
};
responder.send(sample).unwrap();
}
_ => {}
};
}
});
let mut events = PushTimeSource::events_from_proxy(None, proxy);
// We expect to receive both events but the ordering is not deterministic.
let event1 = events.next().await.unwrap().unwrap();
let event2 = events.next().await.unwrap().unwrap();
match event1 {
Event::StatusChange { status: _ } => {
assert_eq!(event1, *STATUS_EVENT_1);
assert_eq!(event2, *SAMPLE_EVENT_1);
}
Event::Sample(_) => {
assert_eq!(event1, *SAMPLE_EVENT_1);
assert_eq!(event2, *STATUS_EVENT_1);
}
}
}
#[fuchsia::test]
async fn push_time_source_failure() {
let (proxy, mut requests) =
fidl::endpoints::create_proxy_and_stream::<ftexternal::PushSourceMarker>().unwrap();
let _task = fasync::Task::spawn(async move {
while let Some(Ok(request)) = requests.next().await {
// Close the channel on the first watch status request.
match request {
ftexternal::PushSourceRequest::WatchStatus { responder, .. } => {
responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
}
_ => {}
};
}
});
let mut events = PushTimeSource::events_from_proxy(None, proxy);
assert!(events.next().await.unwrap().is_err());
}
}