blob: fc6e61110d78b60e41ddc588457fecbce79b56bf [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::anyhow,
event_queue::{ControlHandle, EventQueue, Notify},
fidl_fuchsia_update_ext::State,
fuchsia_inspect_contrib::inspectable::InspectableDebugString,
fuchsia_syslog::{fx_log_err, fx_log_warn},
futures::prelude::*,
std::time::Duration,
};
pub trait StateNotifier: Notify<Event = State> + Send + Sync + 'static {}
impl<T> StateNotifier for T where T: Notify<Event = State> + Send + Sync + 'static {}
#[derive(Debug)]
pub struct UpdateMonitor<N>
where
N: StateNotifier,
{
temporary_queue: ControlHandle<N>,
update_state: InspectableDebugString<Option<State>>,
version_available: InspectableDebugString<Option<String>>,
inspect_node: fuchsia_inspect::Node,
}
impl<N> UpdateMonitor<N>
where
N: StateNotifier,
{
pub fn from_inspect_node(node: fuchsia_inspect::Node) -> (impl Future<Output = ()>, Self) {
let (temporary_fut, temporary_queue) = EventQueue::new();
(
temporary_fut,
UpdateMonitor {
temporary_queue,
update_state: InspectableDebugString::new(None, &node, "update-state"),
version_available: InspectableDebugString::new(None, &node, "version-available"),
inspect_node: node,
},
)
}
#[cfg(test)]
pub fn new() -> (impl Future<Output = ()>, Self) {
Self::from_inspect_node(
fuchsia_inspect::Inspector::new().root().create_child("test-update-monitor-root-node"),
)
}
pub async fn add_temporary_callback(&mut self, callback: N) {
if let Err(e) = self.temporary_queue.add_client(callback).await {
fx_log_err!("error adding client to temporary queue: {:#}", anyhow!(e))
}
}
pub async fn advance_update_state(&mut self, next_update_state: State) {
*self.update_state.get_mut() = Some(next_update_state.clone());
if let Err(e) = self.temporary_queue.queue_event(next_update_state).await {
fx_log_warn!("error sending state to temporary queue: {:#}", anyhow!(e))
}
}
pub async fn clear(&mut self) {
*self.version_available.get_mut() = None;
if let Err(e) = self.temporary_queue.clear().await {
fx_log_warn!("error clearing clients of temporary queue: {:#}", anyhow!(e))
}
}
pub async fn try_flush(&mut self) {
match self.temporary_queue.try_flush(Duration::from_secs(5)).await {
Ok(flush_future) => {
if let Err(e) = flush_future.await {
fx_log_warn!("Timed out flushing temporary queue: {:#}", anyhow!(e));
}
}
Err(e) => {
fx_log_warn!("error trying to flush temporary queue: {:#}", anyhow!(e));
}
}
}
pub fn set_version_available(&mut self, version_available: String) {
*self.version_available.get_mut() = Some(version_available);
}
#[cfg(test)]
pub fn get_version_available(&self) -> Option<String> {
self.version_available.clone()
}
}
#[cfg(test)]
mod test {
use super::*;
use event_queue::{ClosedClient, Event, Notify};
use fidl_fuchsia_update_ext::random_version_available;
use fuchsia_async as fasync;
use fuchsia_zircon as zx;
use futures::{channel::mpsc, future::BoxFuture, pin_mut, task::Poll};
use parking_lot::Mutex;
use proptest::prelude::*;
use std::sync::Arc;
const VERSION_AVAILABLE: &str = "fake-version-available";
#[derive(Clone, Debug)]
struct FakeStateNotifier {
states: Arc<Mutex<Vec<State>>>,
}
impl FakeStateNotifier {
fn new() -> Self {
Self { states: Arc::new(Mutex::new(vec![])) }
}
}
impl Notify for FakeStateNotifier {
type Event = State;
type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
fn notify(&self, state: State) -> Self::NotifyFuture {
self.states.lock().push(state);
future::ready(Ok(()))
}
}
struct MpscNotifier<T> {
sender: mpsc::Sender<T>,
}
impl<T> Notify for MpscNotifier<T>
where
T: Event + Send + 'static,
{
type Event = T;
type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;
fn notify(&self, event: T) -> BoxFuture<'static, Result<(), ClosedClient>> {
let mut sender = self.sender.clone();
async move { sender.send(event).map(|result| result.map_err(|_| ClosedClient)).await }
.boxed()
}
}
async fn random_update_monitor<N: StateNotifier>(
update_state: Option<State>,
version_available: Option<String>,
) -> UpdateMonitor<N> {
let (fut, mut mms) = UpdateMonitor::<N>::new();
fasync::Task::spawn(fut).detach();
version_available.map(|s| mms.set_version_available(s));
if let Some(update_state) = update_state {
mms.advance_update_state(update_state).await;
}
mms
}
async fn wait_for_states(callback: &FakeStateNotifier, len: usize) {
while callback.states.lock().len() != len {
fasync::Timer::new(fasync::Time::after(zx::Duration::from_millis(10))).await;
}
}
proptest! {
#[test]
fn test_adding_callback_sends_current_state(
update_state: Option<State>,
version_available in random_version_available(),
) {
fasync::Executor::new().unwrap().run_singlethreaded(async {
let mut update_monitor = random_update_monitor(update_state.clone(), version_available).await;
let expected_states: Vec<_> = update_state.into_iter().collect();
let temporary_callback = FakeStateNotifier::new();
update_monitor.add_temporary_callback(temporary_callback.clone()).await;
wait_for_states(&temporary_callback, expected_states.len()).await;
prop_assert_eq!(
&temporary_callback.states.lock().clone(),
&expected_states
);
Ok(())
}).unwrap();
}
#[test]
fn test_advance_update_state_calls_callbacks(
initial_state: Option<State>,
version_available in random_version_available(),
next_states in prop::collection::vec(any::<State>(), 0..4),
) {
fasync::Executor::new().unwrap().run_singlethreaded(async {
let mut update_monitor = random_update_monitor(initial_state.clone(), version_available).await;
let temporary_callback = FakeStateNotifier::new();
let expected_states: Vec<_> = initial_state.clone().into_iter().chain(next_states.clone().into_iter()).collect();
update_monitor.add_temporary_callback(temporary_callback.clone()).await;
for state in next_states {
update_monitor.advance_update_state(state).await;
}
wait_for_states(&temporary_callback, expected_states.len()).await;
prop_assert_eq!(
&temporary_callback.states.lock().clone(),
&expected_states
);
Ok(())
}).unwrap();
}
#[test]
fn test_clear_clears_version_available(
update_state: Option<State>,
version_available in random_version_available(),
) {
fasync::Executor::new().unwrap().run_singlethreaded(async {
let mut update_monitor = random_update_monitor::<FakeStateNotifier>(update_state, version_available).await;
update_monitor.set_version_available(VERSION_AVAILABLE.to_string());
update_monitor.clear().await;
prop_assert_eq!(
update_monitor.get_version_available(),
None
);
Ok(())
}).unwrap();
}
#[test]
fn test_clear_clears_temporary_callbacks(
update_state: Option<State>,
version_available in random_version_available(),
) {
fasync::Executor::new().unwrap().run_singlethreaded(async {
let mut update_monitor = random_update_monitor(update_state, version_available).await;
let temporary_callback = FakeStateNotifier::new();
update_monitor.add_temporary_callback(temporary_callback.clone()).await;
update_monitor.clear().await;
temporary_callback.states.lock().clear();
update_monitor.advance_update_state(State::CheckingForUpdates).await;
prop_assert_eq!(temporary_callback.states.lock().clone(), vec![]);
Ok(())
}).unwrap();
}
#[test]
fn test_try_flush_flushes_temporary_callbacks(
update_state: State,
version_available in random_version_available(),
) {
let mut executor = fasync::Executor::new().unwrap();
let mut update_monitor = executor.run_singlethreaded(random_update_monitor(Some(update_state.clone()), version_available));
let (sender, mut receiver) = mpsc::channel(0);
let temporary_callback = MpscNotifier { sender };
executor.run_singlethreaded(update_monitor.add_temporary_callback(temporary_callback));
let flush = update_monitor.try_flush();
pin_mut!(flush);
prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Pending);
prop_assert_eq!(executor.run_until_stalled(&mut receiver.next()), Poll::Ready(Some(update_state)));
prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Ready(()));
}
#[test]
fn test_try_flush_timeout(
update_state: State,
version_available in random_version_available(),
) {
let mut executor = fasync::Executor::new_with_fake_time().unwrap();
// Can't use run_singlethreaded on executor with fake time.
let update_monitor_fut = random_update_monitor(Some(update_state), version_available);
pin_mut!(update_monitor_fut);
let mut update_monitor = match executor.run_until_stalled(&mut update_monitor_fut) {
Poll::Ready(monitor) => monitor,
Poll::Pending => panic!("random_update_monitor blocked"),
};
let (sender, _receiver) = mpsc::channel(0);
{
let temporary_callback = MpscNotifier { sender };
let add_temporary_callback = update_monitor.add_temporary_callback(temporary_callback);
pin_mut!(add_temporary_callback);
prop_assert_eq!(executor.run_until_stalled(&mut add_temporary_callback), Poll::Ready(()));
}
let flush = update_monitor.try_flush();
pin_mut!(flush);
prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Pending);
let expected_deadline = executor.now() + zx::Duration::from_seconds(5);
prop_assert_eq!(executor.wake_next_timer(), Some(expected_deadline));
prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Ready(()));
}
}
}
#[cfg(test)]
mod test_inspect {
use super::*;
use event_queue::{ClosedClient, Notify};
use fuchsia_async as fasync;
use fuchsia_inspect::assert_inspect_tree;
#[derive(Clone, Debug)]
struct FakeStateNotifier;
impl Notify for FakeStateNotifier {
type Event = State;
type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
fn notify(&self, _state: State) -> Self::NotifyFuture {
future::ready(Ok(()))
}
}
#[test]
fn test_inspect_initial_state() {
let inspector = fuchsia_inspect::Inspector::new();
let _update_monitor = UpdateMonitor::<FakeStateNotifier>::from_inspect_node(
inspector.root().create_child("update-monitor"),
);
assert_inspect_tree!(
inspector,
root: {
"update-monitor": {
"update-state": "None",
"version-available": "None",
}
}
);
}
#[fasync::run_singlethreaded(test)]
async fn test_inspect_update_state() {
let inspector = fuchsia_inspect::Inspector::new();
let (fut, mut update_monitor) = UpdateMonitor::<FakeStateNotifier>::from_inspect_node(
inspector.root().create_child("update-monitor"),
);
fasync::Task::spawn(fut).detach();
update_monitor.advance_update_state(State::CheckingForUpdates).await;
assert_inspect_tree!(
inspector,
root: {
"update-monitor": {
"update-state": format!("{:?}", Some(State::CheckingForUpdates)),
"version-available": "None",
}
}
);
}
#[test]
fn test_inspect_version_available() {
let inspector = fuchsia_inspect::Inspector::new();
let (_fut, mut update_monitor) = UpdateMonitor::<FakeStateNotifier>::from_inspect_node(
inspector.root().create_child("update-monitor"),
);
let version_available = "fake-version-available";
update_monitor.set_version_available(version_available.to_string());
assert_inspect_tree!(
inspector,
root: {
"update-monitor": {
"update-state": "None",
"version-available": format!("{:?}", Some(version_available)),
}
}
);
}
}