blob: 2301c049ed16598815cf815e65b4cffae0a2a8c2 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! The `push_source` library defines an implementation of the `PushSource` API and traits to hook
//! in an algorithm that produces time updates.
use anyhow::Error;
use async_trait::async_trait;
use fidl_fuchsia_time_external::{
Properties, PushSourceRequest, PushSourceRequestStream, PushSourceWatchSampleResponder,
PushSourceWatchStatusResponder, Status, TimeSample,
};
use fuchsia_zircon as zx;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
lock::Mutex,
StreamExt, TryStreamExt,
};
use log::warn;
use std::sync::{Arc, Weak};
use watch_handler::{Sender as WatchSender, WatchHandler};
/// A time update generated by an |UpdateAlgorithm|.
#[derive(Clone, PartialEq, Debug)]
pub enum Update {
/// A new TimeSample. The Arc may be removed once fidl tables support clone.
Sample(Arc<TimeSample>),
/// A new Status.
Status(Status),
}
impl From<Status> for Update {
fn from(status: Status) -> Self {
Update::Status(status)
}
}
impl From<TimeSample> for Update {
fn from(sample: TimeSample) -> Self {
Update::Sample(Arc::new(sample))
}
}
impl Update {
/// Returns true iff the update contained is a status.
pub fn is_status(&self) -> bool {
match self {
Update::Sample(_) => false,
Update::Status(_) => true,
}
}
}
/// An |UpdateAlgorithm| asynchronously produces Updates.
#[async_trait]
pub trait UpdateAlgorithm {
/// Update the algorithm's knowledge of device properties.
async fn update_device_properties(&self, properties: Properties);
/// Generate updates asynchronously and push them to |sink|. This method may run
/// indefinitely. This method may generate duplicate updates.
// TODO(satsukiu) - use a generator library instead once one is available
async fn generate_updates(&self, sink: Sender<Update>) -> Result<(), Error>;
}
/// An implementation of |fuchsia.time.external.PushSource| that routes time updates from an
/// |UpdateAlgorithm| to clients of the fidl protocol and routes device property updates from fidl
/// clients to the |UpdateAlgorithm|.
pub struct PushSource<UA: UpdateAlgorithm> {
/// Internal state of the push source.
internal: Mutex<PushSourceInternal>,
/// The algorithm used to obtain new updates.
update_algorithm: UA,
}
impl<UA: UpdateAlgorithm> PushSource<UA> {
/// Create a new |PushSource| that polls |update_algorithm| for time updates and starts in the
/// |initial_status| status.
pub fn new(update_algorithm: UA, initial_status: Status) -> Result<Self, Error> {
Ok(Self { internal: Mutex::new(PushSourceInternal::new(initial_status)), update_algorithm })
}
/// Polls updates received on |update_algorithm| and pushes them to bound clients.
pub async fn poll_updates(&self) -> Result<(), Error> {
// Updates should be processed immediately so add no extra buffer space.
let (sender, mut receiver) = channel(0);
let updater_fut = self.update_algorithm.generate_updates(sender);
let consumer_fut = async move {
while let Some(update) = receiver.next().await {
self.internal.lock().await.push_update(update).await;
}
};
let (update_res, _) = futures::future::join(updater_fut, consumer_fut).await;
update_res
}
/// Handle a single client's requests received on the given |request_stream|.
pub async fn handle_requests_for_stream(
&self,
mut request_stream: PushSourceRequestStream,
) -> Result<(), Error> {
let client_context = self.internal.lock().await.register_client();
while let Some(request) = request_stream.try_next().await? {
client_context.lock().await.handle_request(request, &self.update_algorithm).await?;
}
Ok(())
}
}
/// Contains internal state for |PushSource| that must be updated atomically.
struct PushSourceInternal {
/// A set of weak pointers to registered clients.
clients: Vec<Weak<Mutex<PushSourceClientHandler>>>,
/// The last known sample.
latest_sample: Option<Arc<TimeSample>>,
/// The last known status.
latest_status: Status,
}
impl PushSourceInternal {
/// Create a new |PushSourceInternal|.
pub fn new(initial_status: Status) -> Self {
PushSourceInternal { clients: vec![], latest_sample: None, latest_status: initial_status }
}
/// Create a new client handler registered to receive asynchonous updates
/// for the duration of its lifetime.
pub fn register_client(&mut self) -> Arc<Mutex<PushSourceClientHandler>> {
let client = Arc::new(Mutex::new(PushSourceClientHandler {
sample_watcher: WatchHandler::create(self.latest_sample.clone()),
status_watcher: WatchHandler::create(Some(self.latest_status)),
}));
self.clients.push(Arc::downgrade(&client));
client
}
/// Push a new update to all existing clients.
pub async fn push_update(&mut self, update: Update) {
match &update {
Update::Sample(sample) => self.latest_sample = Some(Arc::clone(&sample)),
Update::Status(status) => self.latest_status = *status,
}
// Discard any references to clients that no longer exist.
let mut client_arcs = vec![];
self.clients.retain(|client_weak| match client_weak.upgrade() {
Some(client_arc) => {
client_arcs.push(client_arc);
true
}
None => false,
});
for client in client_arcs {
client.lock().await.handle_update(update.clone());
}
}
}
/// Per-client state for handling requests.
struct PushSourceClientHandler {
/// Watcher for parking `WatchSample` requests.
sample_watcher: WatchHandler<Arc<TimeSample>, WatchSampleResponder>,
/// Watcher for parking `WatchStatus` requests.
status_watcher: WatchHandler<Status, WatchStatusResponder>,
}
impl PushSourceClientHandler {
/// Handle a fidl request received from the client.
async fn handle_request(
&mut self,
request: PushSourceRequest,
update_algorithm: &impl UpdateAlgorithm,
) -> Result<(), Error> {
match request {
PushSourceRequest::WatchSample { responder } => {
self.sample_watcher.watch(WatchSampleResponder(responder)).map_err(|e| {
e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
e
})?;
}
PushSourceRequest::WatchStatus { responder } => {
self.status_watcher.watch(WatchStatusResponder(responder)).map_err(|e| {
e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
e
})?;
}
PushSourceRequest::UpdateDeviceProperties { properties, .. } => {
update_algorithm.update_device_properties(properties).await;
}
}
Ok(())
}
/// Push an internal update to any hanging gets parked by the client.
fn handle_update(&mut self, update: Update) {
match update {
Update::Sample(sample) => self.sample_watcher.set_value(sample),
Update::Status(status) => self.status_watcher.set_value(status),
}
}
}
struct WatchSampleResponder(PushSourceWatchSampleResponder);
struct WatchStatusResponder(PushSourceWatchStatusResponder);
impl WatchSender<Arc<TimeSample>> for WatchSampleResponder {
fn send_response(self, data: Arc<TimeSample>) {
let time_sample = TimeSample {
utc: data.utc.clone(),
monotonic: data.monotonic.clone(),
standard_deviation: data.standard_deviation.clone(),
..TimeSample::EMPTY
};
self.0.send(time_sample).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
}
}
impl WatchSender<Status> for WatchStatusResponder {
fn send_response(self, data: Status) {
self.0.send(data).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
}
}
/// An UpdateAlgorithm that forwards updates produced by a test.
pub struct TestUpdateAlgorithm {
/// Receiver that accepts updates pushed by a test.
receiver: Mutex<Option<Receiver<Update>>>,
/// List of received device property updates
device_property_updates: Mutex<Vec<Properties>>,
}
impl TestUpdateAlgorithm {
pub fn new() -> (Self, Sender<Update>) {
let (sender, receiver) = channel(0);
(
TestUpdateAlgorithm {
receiver: Mutex::new(Some(receiver)),
device_property_updates: Mutex::new(vec![]),
},
sender,
)
}
}
#[async_trait]
impl UpdateAlgorithm for TestUpdateAlgorithm {
async fn update_device_properties(&self, properties: Properties) {
self.device_property_updates.lock().await.push(properties);
}
async fn generate_updates(&self, sink: Sender<Update>) -> Result<(), Error> {
let receiver = self.receiver.lock().await.take().unwrap();
receiver.map(Ok).forward(sink).await?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use fidl::{endpoints::create_proxy_and_stream, Error as FidlError};
use fidl_fuchsia_time_external::{PushSourceMarker, PushSourceProxy};
use fuchsia_async as fasync;
use futures::{FutureExt, SinkExt};
use matches::assert_matches;
struct TestHarness {
/// The PushSource under test.
test_source: Arc<PushSource<TestUpdateAlgorithm>>,
/// Tasks spawned for the test.
tasks: Vec<fasync::Task<Result<(), Error>>>,
/// Sender that injects updates to test_source.
update_sender: Sender<Update>,
}
impl TestHarness {
/// Create a new TestHarness.
fn new() -> Self {
let (update_algorithm, update_sender) = TestUpdateAlgorithm::new();
let test_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
let source_clone = Arc::clone(&test_source);
let update_task = fasync::Task::spawn(async move { source_clone.poll_updates().await });
TestHarness { test_source, tasks: vec![update_task], update_sender }
}
/// Return a new proxy connected to the test PushSource.
fn new_proxy(&mut self) -> PushSourceProxy {
let source_clone = Arc::clone(&self.test_source);
let (proxy, stream) = create_proxy_and_stream::<PushSourceMarker>().unwrap();
let server_task = fasync::Task::spawn(async move {
source_clone.handle_requests_for_stream(stream).await
});
self.tasks.push(server_task);
proxy
}
/// Push a new update to the PushSource.
async fn push_update(&mut self, update: Update) {
self.update_sender.send(update).await.unwrap();
}
/// Assert that the TestUpdateAlgorithm received the property updates.
async fn assert_device_properties(&self, properties: &[Properties]) {
assert_eq!(
self.test_source.update_algorithm.device_property_updates.lock().await.as_slice(),
properties
);
}
}
// Since we rely on WatchHandler to achieve most of the hanging get behavior, these tests
// focus primarily on behavior specific to PushSource and ensuring multiple clients are
// supported.
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_sample_closes_on_multiple_watches() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
// Since there aren't any samples yet the first call should hang
let first_watch_fut = proxy.watch_sample();
// Calling again while second watch is active should close the channel.
assert_matches!(
proxy.watch_sample().await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
assert_matches!(
first_watch_fut.await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_status_closes_on_multiple_watches() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
// First watch always immediately returns Ok
assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
// In absence of updates second watch does not finish
let second_watch_fut = proxy.watch_status();
// Calling again while second watch is active should close the channel.
assert_matches!(
proxy.watch_status().await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
assert_matches!(
second_watch_fut.await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_sample() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
// The first watch completes only after update is produced.
let sample_fut = proxy.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
monotonic: Some(23),
utc: Some(24),
standard_deviation: None,
..TimeSample::EMPTY
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
monotonic: Some(23),
utc: Some(24),
standard_deviation: None,
..TimeSample::EMPTY
}
);
// Subsequent watches complete only after a new update is produced.
let sample_fut = proxy.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
monotonic: Some(25),
utc: Some(26),
standard_deviation: None,
..TimeSample::EMPTY
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
monotonic: Some(25),
utc: Some(26),
standard_deviation: None,
..TimeSample::EMPTY
}
);
// Watches hangs in absence of new update.
assert!(proxy.watch_sample().now_or_never().is_none());
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_sample_sent_to_all_clients() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let proxy_2 = harness.new_proxy();
// The first watch completes only after update is produced.
let sample_fut = proxy.watch_sample();
let sample_fut_2 = proxy_2.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
monotonic: Some(23),
utc: Some(24),
standard_deviation: None,
..TimeSample::EMPTY
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
monotonic: Some(23),
utc: Some(24),
standard_deviation: None,
..TimeSample::EMPTY
}
);
assert_eq!(
sample_fut_2.await.unwrap(),
TimeSample {
monotonic: Some(23),
utc: Some(24),
standard_deviation: None,
..TimeSample::EMPTY
}
);
// Subsequent watches complete only after a new update is produced.
let sample_fut = proxy.watch_sample();
let sample_fut_2 = proxy_2.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
monotonic: Some(25),
utc: Some(26),
standard_deviation: None,
..TimeSample::EMPTY
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
monotonic: Some(25),
utc: Some(26),
standard_deviation: None,
..TimeSample::EMPTY
}
);
assert_eq!(
sample_fut_2.await.unwrap(),
TimeSample {
monotonic: Some(25),
utc: Some(26),
standard_deviation: None,
..TimeSample::EMPTY
}
);
// A client that connects later gets the latest update.
let proxy_3 = harness.new_proxy();
assert_eq!(
proxy_3.watch_sample().await.unwrap(),
TimeSample {
monotonic: Some(25),
utc: Some(26),
standard_deviation: None,
..TimeSample::EMPTY
}
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_status() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
// The first watch completes immediately.
assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
// Subsequent watches complete only after an update is produced.
let status_fut = proxy.watch_status();
harness.push_update(Update::Status(Status::Hardware)).await;
assert_eq!(status_fut.await.unwrap(), Status::Hardware);
// Watches hang in absence of a new update.
assert!(proxy.watch_status().now_or_never().is_none());
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_status_sent_to_all_clients() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let proxy_2 = harness.new_proxy();
// The first watch completes immediately.
assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
assert_eq!(proxy_2.watch_status().await.unwrap(), Status::Ok);
// Subsequent watches complete only after an update is produced.
let status_fut = proxy.watch_status();
let status_fut_2 = proxy_2.watch_status();
harness.push_update(Update::Status(Status::Hardware)).await;
assert_eq!(status_fut.await.unwrap(), Status::Hardware);
assert_eq!(status_fut_2.await.unwrap(), Status::Hardware);
// A client that connects later gets the latest update.
let proxy_3 = harness.new_proxy();
assert_eq!(proxy_3.watch_status().await.unwrap(), Status::Hardware);
}
#[fuchsia::test]
async fn test_property_updates_sent_to_update_algorithm() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let proxy_2 = harness.new_proxy();
proxy.update_device_properties(Properties::EMPTY).unwrap();
proxy_2.update_device_properties(Properties::EMPTY).unwrap();
// Sleep here to allow the executor to run the tasks servicing these requests.
fasync::Timer::new(fasync::Time::after(zx::Duration::from_nanos(1000))).await;
harness.assert_device_properties(&vec![Properties::EMPTY, Properties::EMPTY]).await;
}
}