blob: 07484b260839514dc64a675242413674010c2850 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, format_err, Context as _, Error, Result};
use async_trait::async_trait;
use fidl_fuchsia_component::{
self as fcomponent, ChildIteratorMarker, CreateChildArgs, RealmMarker, RealmProxy,
};
use fidl_fuchsia_component_decl::{Child, ChildRef, CollectionRef, StartupMode};
use fidl_fuchsia_io::DirectoryProxy;
use fidl_fuchsia_time_external::{
self as ftexternal, PushSourceProxy, Status, TimeSample, Urgency,
};
use fuchsia_component::client;
use fuchsia_zircon as zx;
use futures::stream::Stream;
use futures::{FutureExt, TryFutureExt};
use std::fmt::Debug;
use std::sync::Arc;
use tracing::debug;
const TIMESOURCE_COLLECTION_NAME: &str = "timesource";
/// A time sample received from a source of time.
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct Sample {
/// The UTC time.
pub utc: zx::Time,
/// The monotonic time at which the UTC was most valid.
pub monotonic: zx::Time,
/// The standard deviation of the UTC error.
pub std_dev: zx::Duration,
}
impl TryFrom<TimeSample> for Sample {
type Error = anyhow::Error;
fn try_from(sample: TimeSample) -> Result<Self, Self::Error> {
let TimeSample { utc, monotonic, standard_deviation, .. } = sample;
match (utc, monotonic, standard_deviation) {
(None, _, _) => Err(anyhow!("sample missing utc")),
(_, None, _) => Err(anyhow!("sample missing monotonic")),
(_, _, None) => Err(anyhow!("sample missing standard deviation")),
(Some(utc), Some(monotonic), Some(std_dev)) => Ok(Sample {
utc: zx::Time::from_nanos(utc),
monotonic: zx::Time::from_nanos(monotonic),
std_dev: zx::Duration::from_nanos(std_dev),
}),
}
}
}
#[cfg(test)]
impl Sample {
/// Constructs a new `Sample`.
pub fn new(utc: zx::Time, monotonic: zx::Time, std_dev: zx::Duration) -> Sample {
Sample { utc, monotonic, std_dev }
}
}
/// An event that may be observed from a source of time.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Event {
/// The status of the time source changed.
StatusChange {
/// The current status of the time source.
status: Status,
},
/// The time source produced a new time sample.
Sample(Sample),
}
impl From<Sample> for Event {
fn from(sample: Sample) -> Event {
Event::Sample(sample)
}
}
/// One of the timesource API implementations.
#[derive(Debug)]
#[allow(dead_code)]
pub enum TimeSource {
Push(BoxedPushSource),
Pull(BoxedPullSource),
}
pub type BoxedPushSourceEventStream =
Box<dyn Stream<Item = Result<Event, Error>> + Unpin + Send + Sync>;
pub type BoxedPushSource = Box<dyn PushSource>;
pub type BoxedPullSource = Box<dyn PullSource>;
/// Provides abstraction over `fuchsia.time.external.PushSource`.
/// https://fuchsia.dev/fuchsia-src/concepts/kernel/time/utc/architecture?hl=en#timekeeper
#[async_trait]
pub trait PushSource: Send + Sync + Debug {
/// Attempts to launch the time source and return a stream of its time output and status
/// change events.
async fn watch(&self) -> Result<BoxedPushSourceEventStream, Error>;
}
/// Provides abstraction over `fuchsia.time.external.PullSource`.
/// https://fuchsia.dev/fuchsia-src/concepts/kernel/time/utc/architecture?hl=en#timekeeper
#[async_trait]
pub trait PullSource: Send + Sync + Debug {
/// Attempts to start the timesource component and request a time sample. Component is
/// unloaded after sample is returned in order to free system resources.
async fn sample(&self, _urgency: &Urgency) -> Result<Sample, Error>;
}
/// Starts the component that provides one of timesouce FIDL APIs.
#[derive(Debug)]
pub struct TimeSourceLauncher {
component_url: String,
name: String,
}
enum DestroyChildError {
NotFound,
Internal(anyhow::Error),
}
impl From<DestroyChildError> for anyhow::Error {
fn from(error: DestroyChildError) -> Self {
match error {
DestroyChildError::NotFound => anyhow!("Unable to destroy timesource: not found"),
DestroyChildError::Internal(e) => e,
}
}
}
impl From<anyhow::Error> for DestroyChildError {
fn from(value: anyhow::Error) -> Self {
Self::Internal(value)
}
}
impl TimeSourceLauncher {
/// Creates new launcher.
pub fn new<S>(component_url: S, name: S) -> Self
where
S: Into<String>,
{
let component_url = component_url.into();
let name = name.into();
TimeSourceLauncher { component_url, name }
}
/// Launches the timesource.
async fn launch(&self) -> Result<DirectoryProxy, Error> {
let realm = client::connect_to_protocol::<RealmMarker>()
.context("failed to connect to fuchsia.component.Realm")?;
self.ensure_timesource_destroyed(&realm).await.or_else(|e| match e {
// The intent is to remove the child if it exists, so disregard the related error.
DestroyChildError::NotFound => Ok(()),
DestroyChildError::Internal(e) => Err(e),
})?;
debug!("Launching TimeSource at {}", self.component_url);
let child_decl = Child {
name: Some(self.name.clone()),
url: Some(self.component_url.clone()),
startup: Some(StartupMode::Lazy),
..Default::default()
};
let collection_ref = CollectionRef { name: String::from(TIMESOURCE_COLLECTION_NAME) };
realm
.create_child(&collection_ref, &child_decl, CreateChildArgs::default())
.await
.context("realm.create_child failed")?
.map_err(|e| anyhow!("failed to create child: {:?}", e))?;
Ok(client::open_childs_exposed_directory(
&self.name,
Some(String::from(TIMESOURCE_COLLECTION_NAME)),
)
.await
.context("failed to open exposed directory")?)
}
/// Destroys previously launched timesource. Will generate an error if the child was not found.
async fn destroy(&self) -> Result<(), Error> {
let realm = client::connect_to_protocol::<RealmMarker>()
.context("failed to connect to fuchsia.component.Realm")?;
self.ensure_timesource_destroyed(&realm).await.map_err(Into::into)
}
/// Destroys previously launched timesource and returns `RealmProxy` used.
async fn ensure_timesource_destroyed(
&self,
realm: &RealmProxy,
) -> Result<(), DestroyChildError> {
debug!("Ensure TimeSource is not running: {}", self.component_url);
// Destroy the previously launched timesource.
let child_ref = ChildRef {
name: self.name.clone(),
collection: Some(String::from(TIMESOURCE_COLLECTION_NAME)),
};
// Realm::DestroyChild is not quite idempotent: attempting to destroy
// a child that does not exist results in log spam. So we only actually
// call DestroyChild if there was indication that such a child existed.
// It is still possible that such a child stops existing between the
// check and the call, but that should be a rare event.
if self.has_child(realm, &child_ref).await? {
realm
.destroy_child(&child_ref)
.await
.map_err(|e| DestroyChildError::Internal(e.into()))?
.or_else(|err: fcomponent::Error| match err {
fcomponent::Error::InstanceNotFound => Err(DestroyChildError::NotFound),
_ => Err(DestroyChildError::Internal(format_err!(
"Error destroying child {:?}",
err
))),
})
} else {
Ok(())
}
}
// Returns true if the `realm` contains the child referenced by `child_ref`.
async fn has_child(&self, realm: &RealmProxy, child_ref: &ChildRef) -> Result<bool> {
let (iter_proxy, server_end) = fidl::endpoints::create_proxy::<ChildIteratorMarker>()?;
let collection = CollectionRef { name: TIMESOURCE_COLLECTION_NAME.into() };
let _response = realm.list_children(&collection, server_end).await?;
loop {
let children = iter_proxy.next().await?;
if children.is_empty() {
break;
}
if children.into_iter().find(|e| *e == *child_ref).is_some() {
return Ok(true);
}
}
Ok(false)
}
}
impl Into<BoxedPushSource> for TimeSourceLauncher {
fn into(self) -> BoxedPushSource {
Box::new(PushSourceImpl { launcher: self })
}
}
impl Into<BoxedPullSource> for TimeSourceLauncher {
fn into(self) -> BoxedPullSource {
Box::new(PullSourceImpl { launcher: self })
}
}
/// Production implementation of the `PushSource` trait.
#[derive(Debug)]
pub struct PushSourceImpl {
launcher: TimeSourceLauncher,
}
impl PushSourceImpl {
/// Returns a stream of time output and status change events received using the supplied
/// `PushSourceProxy`, retaining the optional `App` for the same lifetime.
fn events_from_proxy(proxy: PushSourceProxy) -> BoxedPushSourceEventStream {
let proxy = Arc::new(proxy);
let status_stream = futures::stream::try_unfold(Arc::clone(&proxy), |proxy| {
proxy
.watch_status()
.map_ok(move |status| Some((Event::StatusChange { status }, proxy)))
.err_into()
});
let sample_stream = futures::stream::try_unfold(proxy, |proxy| {
proxy.watch_sample().map(move |result| {
result
.map_err(Into::into) // convert fidl error to anyhow.
.and_then(TryInto::try_into) // convert TimeSample to Sample.
.map(|sample| Some((Event::Sample(sample), proxy))) // wrap in tuple.
})
});
Box::new(futures::stream::select(Box::pin(status_stream), Box::pin(sample_stream)))
}
}
#[async_trait]
impl PushSource for PushSourceImpl {
/// Attempts to connect to PushSource FIDL API to receive time samples and status updates.
async fn watch(&self) -> Result<BoxedPushSourceEventStream, Error> {
let directory = self.launcher.launch().await?;
let proxy =
client::connect_to_protocol_at_dir_root::<ftexternal::PushSourceMarker>(&directory)
.context("failed to connect to the fuchsia.time.external.PushSource")?;
Ok(PushSourceImpl::events_from_proxy(proxy))
}
}
/// Production implementation of the `PullSource` trait.
#[allow(dead_code)]
#[derive(Debug)]
pub struct PullSourceImpl {
launcher: TimeSourceLauncher,
}
impl PullSourceImpl {
async fn sample_from_dir(
&self,
directory: &DirectoryProxy,
urgency: &Urgency,
) -> Result<Sample, Error> {
let proxy =
client::connect_to_protocol_at_dir_root::<ftexternal::PullSourceMarker>(directory)
.context("failed to connect to the fuchsia.time.external.PullSource")?;
proxy
.sample(*urgency)
.await?
.map_err(|e| format_err!("Error obtaining time sample: {:?}", e))?
.try_into()
}
}
#[async_trait]
impl PullSource for PullSourceImpl {
/// Attempts to start the timesource component and request a time sample. Component is
/// unloaded after sample is returned in order to free system resources.
async fn sample(&self, urgency: &Urgency) -> Result<Sample, Error> {
let directory = self.launcher.launch().await?;
// Don't check for errors here to ensure `destroy()` is called.
let sample = self.sample_from_dir(&directory, urgency).await;
self.launcher.destroy().await?;
sample
}
}
#[cfg(test)]
use {
fuchsia_sync::Mutex,
futures::{stream, StreamExt},
};
/// A time source that immediately produces a collections of events supplied at construction.
/// The time source may be launched multiple times and will return a different collection of events
/// on each launch. It will return pending after the last event in the last collection, and will
/// terminate the stream after the last event in all other collections. The time source will return
/// an error if asked to launch after the last collection of events has been returned.
#[cfg(test)]
pub struct FakePushTimeSource {
/// The collections of events to return. The TimeSource will return pending after the last
/// event in the last collection, and will terminate the stream after the last event in all
/// other collections.
collections: Mutex<Vec<Vec<Result<Event, Error>>>>,
}
#[cfg(test)]
impl From<FakePushTimeSource> for TimeSource {
fn from(s: FakePushTimeSource) -> Self {
TimeSource::Push(Box::new(s))
}
}
#[cfg(test)]
impl FakePushTimeSource {
/// Creates a new `FakePushTimeSource` that produces the supplied single collection of
/// successful events.
pub fn events(events: Vec<Event>) -> Self {
FakePushTimeSource {
collections: Mutex::new(vec![events.into_iter().map(|evt| Ok(evt)).collect()]),
}
}
/// Creates a new `FakePushTimeSource` that produces the supplied collections of successful
/// events.
pub fn event_collections(event_collections: Vec<Vec<Event>>) -> Self {
FakePushTimeSource {
collections: Mutex::new(
event_collections
.into_iter()
.map(|collection| collection.into_iter().map(|evt| Ok(evt)).collect())
.collect(),
),
}
}
/// Creates a new `FakePushTimeSource` that produces the supplied collections of results.
pub fn result_collections(result_collections: Vec<Vec<Result<Event, Error>>>) -> Self {
FakePushTimeSource { collections: Mutex::new(result_collections) }
}
/// Creates a new `FakePushTimeSource` that always fails to launch.
pub fn failing() -> Self {
FakePushTimeSource { collections: Mutex::new(vec![]) }
}
}
#[cfg(test)]
impl Debug for FakePushTimeSource {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("FakePushTimeSource")
}
}
#[cfg(test)]
#[async_trait]
impl PushSource for FakePushTimeSource {
async fn watch(&self) -> Result<BoxedPushSourceEventStream, Error> {
let mut lock = self.collections.lock();
if lock.is_empty() {
return Err(anyhow!("FakePushTimeSource sent all supplied event collections"));
}
let events = lock.remove(0);
// Return a pending after the last event if this was the last collection.
if lock.is_empty() {
Ok(Box::new(Box::pin(stream::iter(events).chain(stream::pending()))))
} else {
Ok(Box::new(Box::pin(stream::iter(events))))
}
}
}
/// A time source that upon request produces an event from the collection supplied at construction.
/// The time source may be launched multiple times and will return an event from the collection
/// on each launch. It will return error after the last event. The time source will return an error
/// if asked to launch after the last event from the collection has been returned.
#[cfg(test)]
pub struct FakePullTimeSource {
/// The collection of events to return. The TimeSource will return error after the last
/// event in the collection.
collection: Mutex<Vec<(Urgency, Result<Sample, Error>)>>,
}
#[cfg(test)]
impl From<FakePullTimeSource> for TimeSource {
fn from(s: FakePullTimeSource) -> Self {
TimeSource::Pull(Box::new(s))
}
}
#[cfg(test)]
impl Debug for FakePullTimeSource {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("FakePullTimeSource")
}
}
#[cfg(test)]
impl FakePullTimeSource {
/// Creates a new `FakePullTimeSource` that produces the supplied collection of successful
/// samples.
pub fn samples(events: Vec<(Urgency, Sample)>) -> Self {
FakePullTimeSource {
collection: Mutex::new(events.into_iter().map(|(u, s)| (u, Ok(s))).collect()),
}
}
/// Creates a new `FakePullTimeSource` that produces the supplied collection of results.
pub fn results(results: Vec<(Urgency, Result<Sample, Error>)>) -> Self {
FakePullTimeSource { collection: Mutex::new(results) }
}
/// Creates a new `FakePullTimeSource` that always fails to launch.
pub fn failing() -> Self {
FakePullTimeSource { collection: Mutex::new(Vec::new()) }
}
}
#[cfg(test)]
#[async_trait]
impl PullSource for FakePullTimeSource {
async fn sample(&self, urgency: &Urgency) -> Result<Sample, Error> {
let mut events = self.collection.lock();
if events.is_empty() {
return Err(anyhow!("FakePullTimeSource sent all supplied events."));
}
let (expected_urgency, sample) = events.remove(0);
if urgency == &expected_urgency {
sample
} else {
Err(anyhow!(
"Wrong urgency provided: expected {:?}, got {:?}.",
expected_urgency,
urgency
))
}
}
}
#[cfg(test)]
mod test {
use super::*;
use fidl::prelude::*;
use fuchsia_async as fasync;
use lazy_static::lazy_static;
const STATUS_1: Status = Status::Initializing;
const SAMPLE_1_UTC_NANOS: i64 = 1234567;
const SAMPLE_1_MONO_NANOS: i64 = 222;
const SAMPLE_1_STD_DEV_NANOS: i64 = 8888;
lazy_static! {
static ref STATUS_EVENT_1: Event = Event::StatusChange { status: STATUS_1 };
static ref SAMPLE_1: Sample = Sample {
utc: zx::Time::from_nanos(SAMPLE_1_UTC_NANOS),
monotonic: zx::Time::from_nanos(SAMPLE_1_MONO_NANOS),
std_dev: zx::Duration::from_nanos(SAMPLE_1_STD_DEV_NANOS),
};
static ref SAMPLE_EVENT_1: Event = Event::from(*SAMPLE_1);
static ref SAMPLE_2: Sample = Sample {
utc: zx::Time::from_nanos(12345678),
monotonic: zx::Time::from_nanos(333),
std_dev: zx::Duration::from_nanos(9999),
};
static ref SAMPLE_EVENT_2: Event = Event::from(*SAMPLE_2);
}
#[fuchsia::test(allow_stalls = false)]
async fn single_event_set() -> Result<(), Error> {
let fake =
FakePushTimeSource::events(vec![*STATUS_EVENT_1, *SAMPLE_EVENT_1, *SAMPLE_EVENT_2]);
let mut events = fake.watch().await.context("Fake should watch without error")?;
assert_eq!(events.next().await.unwrap().unwrap(), *STATUS_EVENT_1);
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_1);
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_2);
// Making another call should lead to a stall and hence panic. We don't test this to
// avoid a degenerate test, but do in fake_no_events_then_pending.
assert!(fake.watch().await.is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
async fn double_event_set() -> Result<(), Error> {
let fake = FakePushTimeSource::event_collections(vec![
vec![*STATUS_EVENT_1, *SAMPLE_EVENT_1],
vec![*SAMPLE_EVENT_2],
]);
let mut events = fake.watch().await.context("Fake should watch without error")?;
assert_eq!(events.next().await.unwrap().unwrap(), *STATUS_EVENT_1);
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_1);
assert!(events.next().await.is_none());
let mut events = fake.watch().await.context("Fake should watch without error")?;
assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_2);
// Making another call should lead to a stall and hence panic. We don't test this to
// avoid a degenerate test, but do in fake_no_events_then_pending.
assert!(fake.watch().await.is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
#[should_panic]
async fn fake_no_events_then_pending() {
let fake = FakePushTimeSource::events(vec![]);
let mut events = fake.watch().await.unwrap();
// Getting an event from the last collection should never complete, leading to a stall.
events.next().await;
}
#[fuchsia::test]
async fn fake_failing() -> Result<(), Error> {
let fake = FakePushTimeSource::failing();
let events = fake.watch().await.context("Fake should launch without error");
assert!(events.is_err());
Ok(())
}
#[fuchsia::test]
fn new_primary_time_source() {
const COMPONENT_NAME: &str = "alfred";
const COMPONENT_URL: &str = "pennyworth";
let time_source = TimeSourceLauncher::new(COMPONENT_URL, COMPONENT_NAME);
assert_eq!(time_source.component_url, COMPONENT_URL);
assert_eq!(time_source.name, COMPONENT_NAME);
}
#[fuchsia::test]
async fn push_time_source_events() {
let (proxy, mut requests) =
fidl::endpoints::create_proxy_and_stream::<ftexternal::PushSourceMarker>().unwrap();
let _task = fasync::Task::spawn(async move {
while let Some(Ok(request)) = requests.next().await {
match request {
ftexternal::PushSourceRequest::WatchStatus { responder, .. } => {
responder.send(STATUS_1).unwrap();
}
ftexternal::PushSourceRequest::WatchSample { responder, .. } => {
let sample = ftexternal::TimeSample {
utc: Some(SAMPLE_1_UTC_NANOS),
monotonic: Some(SAMPLE_1_MONO_NANOS),
standard_deviation: Some(SAMPLE_1_STD_DEV_NANOS),
..Default::default()
};
responder.send(&sample).unwrap();
}
_ => {}
};
}
});
let mut events = PushSourceImpl::events_from_proxy(proxy);
// We expect to receive both events but the ordering is not deterministic.
let event1 = events.next().await.unwrap().unwrap();
let event2 = events.next().await.unwrap().unwrap();
match event1 {
Event::StatusChange { status: _ } => {
assert_eq!(event1, *STATUS_EVENT_1);
assert_eq!(event2, *SAMPLE_EVENT_1);
}
Event::Sample(_) => {
assert_eq!(event1, *SAMPLE_EVENT_1);
assert_eq!(event2, *STATUS_EVENT_1);
}
}
}
#[fuchsia::test]
async fn push_time_source_failure() {
let (proxy, mut requests) =
fidl::endpoints::create_proxy_and_stream::<ftexternal::PushSourceMarker>().unwrap();
let _task = fasync::Task::spawn(async move {
while let Some(Ok(request)) = requests.next().await {
// Close the channel on the first watch status request.
match request {
ftexternal::PushSourceRequest::WatchStatus { responder, .. } => {
responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
}
_ => {}
};
}
});
let mut events = PushSourceImpl::events_from_proxy(proxy);
assert!(events.next().await.unwrap().is_err());
}
#[fuchsia::test(allow_stalls = false)]
async fn fake_pull() -> Result<(), Error> {
let fake = FakePullTimeSource::samples(vec![
(Urgency::Low, *SAMPLE_1),
(Urgency::Medium, *SAMPLE_2),
]);
let sample_1 = fake.sample(&Urgency::Low).await.context("sample with Urgency::Low")?;
assert_eq!(sample_1, *SAMPLE_1);
let sample_2 =
fake.sample(&Urgency::Medium).await.context("sample with Urgency::Medium")?;
assert_eq!(sample_2, *SAMPLE_2);
assert!(fake.sample(&Urgency::Low).await.is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
async fn fake_pull_results() -> Result<(), Error> {
let fake = FakePullTimeSource::results(vec![
(Urgency::Low, Err(anyhow!("test error"))),
(Urgency::Low, Ok(*SAMPLE_1)),
]);
assert!(fake.sample(&Urgency::Low).await.is_err());
let sample = fake.sample(&Urgency::Low).await.context("sample with Urgency::Low")?;
assert_eq!(sample, *SAMPLE_1);
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
async fn fake_pull_unexpected_urgency() -> Result<(), Error> {
let fake = FakePullTimeSource::samples(vec![(Urgency::Medium, *SAMPLE_1)]);
assert!(fake.sample(&Urgency::Low).await.is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
async fn fake_pull_no_events() -> Result<(), Error> {
let fake = FakePullTimeSource::samples(Vec::new());
assert!(fake.sample(&Urgency::Low).await.is_err());
Ok(())
}
#[fuchsia::test(allow_stalls = false)]
async fn fake_pull_failing() -> Result<(), Error> {
let fake = FakePullTimeSource::failing();
assert!(fake.sample(&Urgency::Low).await.is_err());
Ok(())
}
}