blob: d06192e2e63d4802e234d01755948ce46b230128 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::apply::{apply_system_update, Initiator};
use crate::channel::{CurrentChannelManager, TargetChannelManager};
use crate::check::{check_for_system_update, SystemUpdateStatus};
use crate::connect::ServiceConnect;
use crate::update_monitor::{State, StateChangeCallback, UpdateMonitor};
use failure::{Error, ResultExt};
use fidl_fuchsia_update::{CheckStartedResult, ManagerState};
use fuchsia_async as fasync;
use fuchsia_inspect as finspect;
use fuchsia_merkle::Hash;
use fuchsia_syslog::{fx_log_err, fx_log_info};
use futures::future::BoxFuture;
use futures::lock::Mutex as AsyncMutex;
use futures::prelude::*;
use parking_lot::Mutex;
use std::sync::Arc;
/// Manages the lifecycle of an update attempt and notifies interested clients.
//
// # Lock Order
//
// `updater` is locked for the duration of an update attempt, and an update attempt will
// periodically lock `monitor` to send status updates. Before an async task or thread can lock
// `updater`, it must release any locks on `monitor`.
//
pub struct UpdateManager<T, Ch, C, A, S>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
S: StateChangeCallback,
{
monitor: Arc<Mutex<UpdateMonitor<S>>>,
updater: Arc<AsyncMutex<SystemInterface<T, Ch, C, A>>>,
}
struct SystemInterface<T, Ch, C, A>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
target_channel_updater: T,
current_channel_updater: Ch,
update_checker: C,
update_applier: A,
}
impl<T, Ch, S> UpdateManager<T, Ch, RealUpdateChecker, RealUpdateApplier, S>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
S: StateChangeCallback,
{
pub fn new(
target_channel_updater: T,
current_channel_updater: Ch,
node: finspect::Node,
) -> Self {
Self {
monitor: Arc::new(Mutex::new(UpdateMonitor::from_inspect_node(node))),
updater: Arc::new(AsyncMutex::new(SystemInterface::new(
target_channel_updater,
current_channel_updater,
RealUpdateChecker,
RealUpdateApplier,
))),
}
}
}
impl<T, Ch, C, A, S> UpdateManager<T, Ch, C, A, S>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
S: StateChangeCallback,
{
#[cfg(test)]
pub fn from_checker_and_applier(
target_channel_updater: T,
current_channel_updater: Ch,
update_checker: C,
update_applier: A,
) -> Self {
Self {
monitor: Arc::new(Mutex::new(UpdateMonitor::new())),
updater: Arc::new(AsyncMutex::new(SystemInterface::new(
target_channel_updater,
current_channel_updater,
update_checker,
update_applier,
))),
}
}
/// A Fuchsia Executor must be active when this method is called, b/c it uses fuchsia_async::spawn
pub fn try_start_update(
&self,
initiator: Initiator,
callback: Option<S>,
) -> CheckStartedResult {
let mut monitor = self.monitor.lock();
callback.map(|cb| monitor.add_temporary_callback(cb));
match monitor.manager_state() {
ManagerState::Idle => {
monitor.advance_manager_state(ManagerState::CheckingForUpdates);
let updater = Arc::clone(&self.updater);
let monitor = Arc::clone(&self.monitor);
// Spawn so that callers of this method are not blocked
fasync::spawn(async move {
// Lock the updater for the duration of the update attempt. Contention is not
// expected except in the case that a previous update attempt failed, has set
// manager_state back to Idle, but has not yet returned.
let mut updater = updater.lock().await;
updater.do_system_update_check_and_return_to_idle(monitor, initiator).await
});
CheckStartedResult::Started
}
_ => CheckStartedResult::InProgress,
}
}
pub fn get_state(&self) -> State {
let monitor = self.monitor.lock();
monitor.state()
}
pub fn add_permanent_callback(&self, callback: S) {
self.monitor.lock().add_permanent_callback(callback);
}
}
impl<T, Ch, C, A> SystemInterface<T, Ch, C, A>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
pub fn new(
target_channel_updater: T,
current_channel_updater: Ch,
update_checker: C,
update_applier: A,
) -> Self {
Self { target_channel_updater, current_channel_updater, update_checker, update_applier }
}
async fn do_system_update_check_and_return_to_idle<S: StateChangeCallback>(
&mut self,
monitor: Arc<Mutex<UpdateMonitor<S>>>,
initiator: Initiator,
) {
if let Err(e) = self.do_system_update_check(monitor.clone(), initiator).await {
fx_log_err!("update attempt failed: {:?}", e);
monitor.lock().advance_manager_state(ManagerState::EncounteredError);
}
let mut monitor = monitor.lock();
match monitor.manager_state() {
ManagerState::WaitingForReboot => fx_log_err!(
"system-update-checker is in the WaitingForReboot state. \
This should not have happened, because the sytem-updater should \
have rebooted the device before it returned."
),
_ => {
monitor.advance_manager_state(ManagerState::Idle);
}
}
}
async fn do_system_update_check<S: StateChangeCallback>(
&mut self,
monitor: Arc<Mutex<UpdateMonitor<S>>>,
initiator: Initiator,
) -> Result<(), Error> {
fx_log_info!(
"starting update check (requested by {})",
match initiator {
Initiator::Automatic => "service",
Initiator::Manual => "user",
}
);
self.target_channel_updater.update().await;
match self.update_checker.check().await.context("check_for_system_update failed")? {
SystemUpdateStatus::UpToDate { system_image } => {
fx_log_info!("current system_image merkle: {}", system_image);
fx_log_info!("system_image is already up-to-date");
self.current_channel_updater.update().await;
return Ok(());
}
SystemUpdateStatus::UpdateAvailable { current_system_image, latest_system_image } => {
fx_log_info!("current system_image merkle: {}", current_system_image);
fx_log_info!("new system_image available: {}", latest_system_image);
{
let mut monitor = monitor.lock();
monitor.set_version_available(latest_system_image.to_string());
monitor.advance_manager_state(ManagerState::PerformingUpdate);
}
self.update_applier
.apply(current_system_image, latest_system_image, initiator)
.await
.context("apply_system_update failed")?;
// On success, system-updater reboots the system before returning, so this code
// should never run. The only way to leave WaitingForReboot state is to restart
// the component
monitor.lock().advance_manager_state(ManagerState::WaitingForReboot);
}
}
Ok(())
}
}
// For mocking
pub trait UpdateChecker: Send + Sync + 'static {
fn check(&self) -> BoxFuture<'_, Result<SystemUpdateStatus, crate::errors::Error>>;
}
pub struct RealUpdateChecker;
impl UpdateChecker for RealUpdateChecker {
fn check(&self) -> BoxFuture<'_, Result<SystemUpdateStatus, crate::errors::Error>> {
check_for_system_update().boxed()
}
}
// For mocking
pub trait TargetChannelUpdater: Send + Sync + 'static {
fn update(&mut self) -> BoxFuture<'_, ()>;
}
impl<S: ServiceConnect + 'static> TargetChannelUpdater for TargetChannelManager<S> {
fn update(&mut self) -> BoxFuture<'_, ()> {
TargetChannelManager::update(self)
.unwrap_or_else(|e| fx_log_err!("while updating target channel: {:?}", e))
.boxed()
}
}
// For mocking
pub trait CurrentChannelUpdater: Send + Sync + 'static {
fn update(&mut self) -> BoxFuture<'_, ()>;
}
impl CurrentChannelUpdater for CurrentChannelManager {
fn update(&mut self) -> BoxFuture<'_, ()> {
CurrentChannelManager::update(self)
.unwrap_or_else(|e| fx_log_err!("while updating current channel: {:?}", e))
.boxed()
}
}
// For mocking
pub trait UpdateApplier: Send + Sync + 'static {
fn apply(
&self,
current_system_image: Hash,
latest_system_image: Hash,
initiator: Initiator,
) -> BoxFuture<'_, Result<(), crate::errors::Error>>;
}
pub struct RealUpdateApplier;
impl UpdateApplier for RealUpdateApplier {
fn apply(
&self,
current_system_image: Hash,
latest_system_image: Hash,
initiator: Initiator,
) -> BoxFuture<'_, Result<(), crate::errors::Error>> {
apply_system_update(current_system_image, latest_system_image, initiator).boxed()
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::oneshot;
use matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering};
pub const CALLBACK_CHANNEL_SIZE: usize = 20;
pub const CURRENT_SYSTEM_IMAGE: &str =
"0000000000000000000000000000000000000000000000000000000000000000";
pub const LATEST_SYSTEM_IMAGE: &str =
"1111111111111111111111111111111111111111111111111111111111111111";
#[derive(Clone)]
pub struct FakeUpdateChecker {
result: Result<SystemUpdateStatus, crate::errors::ErrorKind>,
call_count: Arc<AtomicU64>,
// Taking this mutex blocks update checker.
check_blocked: Arc<AsyncMutex<()>>,
}
impl FakeUpdateChecker {
fn new(result: Result<SystemUpdateStatus, crate::errors::ErrorKind>) -> Self {
Self {
result,
call_count: Arc::new(AtomicU64::new(0)),
check_blocked: Arc::new(AsyncMutex::new(())),
}
}
pub fn new_up_to_date() -> Self {
Self::new(Ok(SystemUpdateStatus::UpToDate {
system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid merkle"),
}))
}
pub fn new_update_available() -> Self {
Self::new(Ok(SystemUpdateStatus::UpdateAvailable {
current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid merkle"),
latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid merkle"),
}))
}
pub fn new_error() -> Self {
Self::new(Err(crate::errors::ErrorKind::ResolveUpdatePackage))
}
pub fn block(&self) -> Option<futures::lock::MutexGuard<'_, ()>> {
self.check_blocked.try_lock()
}
pub fn call_count(&self) -> u64 {
self.call_count.load(Ordering::SeqCst)
}
}
impl UpdateChecker for FakeUpdateChecker {
fn check(&self) -> BoxFuture<'_, Result<SystemUpdateStatus, crate::errors::Error>> {
let check_blocked = Arc::clone(&self.check_blocked);
let result = self.result.clone();
self.call_count.fetch_add(1, Ordering::SeqCst);
async move {
check_blocked.lock().await;
result.map_err(|e| e.into())
}
.boxed()
}
}
#[derive(Clone)]
pub struct FakeTargetChannelUpdater {
call_count: Arc<AtomicU64>,
}
impl FakeTargetChannelUpdater {
pub fn new() -> Self {
Self { call_count: Arc::new(AtomicU64::new(0)) }
}
pub fn call_count(&self) -> u64 {
self.call_count.load(Ordering::SeqCst)
}
}
impl TargetChannelUpdater for FakeTargetChannelUpdater {
fn update(&mut self) -> BoxFuture<'_, ()> {
let call_count = self.call_count.clone();
async move {
call_count.fetch_add(1, Ordering::SeqCst);
}
.boxed()
}
}
#[derive(Clone)]
pub struct FakeCurrentChannelUpdater {
call_count: Arc<AtomicU64>,
}
impl FakeCurrentChannelUpdater {
pub fn new() -> Self {
Self { call_count: Arc::new(AtomicU64::new(0)) }
}
pub fn call_count(&self) -> u64 {
self.call_count.load(Ordering::SeqCst)
}
}
impl CurrentChannelUpdater for FakeCurrentChannelUpdater {
fn update(&mut self) -> BoxFuture<'_, ()> {
let call_count = self.call_count.clone();
async move {
call_count.fetch_add(1, Ordering::SeqCst);
}
.boxed()
}
}
#[derive(Clone)]
pub struct UnreachableUpdateApplier;
impl UpdateApplier for UnreachableUpdateApplier {
fn apply(
&self,
_current_system_image: Hash,
_latest_system_image: Hash,
_initiator: Initiator,
) -> BoxFuture<'_, Result<(), crate::errors::Error>> {
unreachable!();
}
}
#[derive(Clone)]
pub struct FakeUpdateApplier {
result: Result<(), crate::errors::ErrorKind>,
call_count: Arc<AtomicU64>,
}
impl FakeUpdateApplier {
pub fn new_success() -> Self {
Self { result: Ok(()), call_count: Arc::new(AtomicU64::new(0)) }
}
pub fn new_error() -> Self {
Self {
result: Err(crate::errors::ErrorKind::SystemUpdaterFailed),
call_count: Arc::new(AtomicU64::new(0)),
}
}
pub fn call_count(&self) -> u64 {
self.call_count.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl UpdateApplier for FakeUpdateApplier {
fn apply(
&self,
_current_system_image: Hash,
_latest_system_image: Hash,
_initiator: Initiator,
) -> BoxFuture<'_, Result<(), crate::errors::Error>> {
self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
future::ready(self.result.clone().map_err(|e| e.into())).boxed()
}
}
#[derive(Clone)]
pub struct UnreachableStateChangeCallback;
impl StateChangeCallback for UnreachableStateChangeCallback {
fn on_state_change(&self, _new_state: State) -> Result<(), Error> {
unreachable!();
}
}
#[derive(Clone)]
pub struct StateChangeCollector {
states: Arc<Mutex<Vec<State>>>,
}
impl StateChangeCollector {
pub fn new() -> Self {
Self { states: Arc::new(Mutex::new(vec![])) }
}
pub fn take_states(&self) -> Vec<State> {
std::mem::replace(&mut self.states.lock(), vec![])
}
}
impl StateChangeCallback for StateChangeCollector {
fn on_state_change(&self, new_state: State) -> Result<(), Error> {
self.states.lock().push(new_state);
Ok(())
}
}
#[derive(Clone)]
struct FakeStateChangeCallback {
sender: Arc<Mutex<Sender<State>>>,
}
impl FakeStateChangeCallback {
fn new_callback_and_receiver() -> (Self, Receiver<State>) {
let (sender, receiver) = channel(CALLBACK_CHANNEL_SIZE);
(Self { sender: Arc::new(Mutex::new(sender)) }, receiver)
}
}
impl StateChangeCallback for FakeStateChangeCallback {
fn on_state_change(&self, new_state: State) -> Result<(), Error> {
self.sender
.lock()
.try_send(new_state)
.expect("FakeStateChangeCallback failed to send state");
Ok(())
}
}
type FakeUpdateManager = UpdateManager<
FakeTargetChannelUpdater,
FakeCurrentChannelUpdater,
FakeUpdateChecker,
FakeUpdateApplier,
FakeStateChangeCallback,
>;
type BlockingManagerManager = UpdateManager<
FakeTargetChannelUpdater,
FakeCurrentChannelUpdater,
BlockingUpdateChecker,
FakeUpdateApplier,
FakeStateChangeCallback,
>;
async fn next_n_states(receiver: &mut Receiver<State>, n: usize) -> Vec<State> {
let mut v = Vec::with_capacity(n);
for _ in 0..n {
v.push(receiver.next().await.expect("next_n_states stream empty"));
}
v
}
async fn wait_until_manager_state_n(
receiver: &mut Receiver<State>,
manager_state: ManagerState,
mut seen_count: u64,
) {
if seen_count == 0 {
return;
}
while let Some(new_state) = receiver.next().await {
if new_state.manager_state == manager_state {
seen_count -= 1;
if seen_count == 0 {
return;
}
}
}
panic!("wait_until_state_n emptied stream: {}", seen_count);
}
impl From<ManagerState> for State {
fn from(manager_state: ManagerState) -> Self {
match manager_state {
ManagerState::Idle | ManagerState::CheckingForUpdates => {
State { manager_state, version_available: None }
}
manager_state => State {
manager_state,
version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
},
}
}
}
#[test]
fn test_correct_initial_state() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
assert_eq!(manager.get_state(), Default::default());
}
#[test]
fn test_try_start_update_returns_started() {
let _executor = fasync::Executor::new().expect("create test executor");
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
assert_eq!(manager.try_start_update(Initiator::Manual, None), CheckStartedResult::Started);
}
#[fasync::run_singlethreaded(test)]
async fn test_temporary_callbacks_dropped_after_update_attempt() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
let (callback0, mut receiver0) = FakeStateChangeCallback::new_callback_and_receiver();
let (callback1, mut receiver1) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback0));
// Wait for first update attempt to complete, to guarantee that the second
// try_start_update() call starts a new attempt (and generates more callback calls).
wait_until_manager_state_n(&mut receiver0, ManagerState::Idle, 2).await;
manager.try_start_update(Initiator::Manual, Some(callback1));
// Wait for the second update attempt to complete, to guarantee the callbacks
// have been called with more states.
wait_until_manager_state_n(&mut receiver1, ManagerState::Idle, 2).await;
// The first callback should have been dropped after the first attempt completed,
// so it should still be empty.
assert_matches!(receiver0.try_next(), Ok(None));
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_callback_when_up_to_date() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
assert_eq!(
receiver.collect::<Vec<State>>().await,
vec![
ManagerState::Idle.into(),
ManagerState::CheckingForUpdates.into(),
ManagerState::Idle.into()
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_callback_when_update_available_and_apply_errors() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
assert_eq!(
receiver.collect::<Vec<State>>().await,
vec![
ManagerState::Idle.into(),
ManagerState::CheckingForUpdates.into(),
ManagerState::PerformingUpdate.into(),
ManagerState::EncounteredError.into(),
ManagerState::Idle.into()
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_callback_when_update_available_and_apply_succeeds() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_success(),
);
let (callback, mut receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
assert_eq!(
next_n_states(&mut receiver, 4).await,
vec![
ManagerState::Idle.into(),
ManagerState::CheckingForUpdates.into(),
ManagerState::PerformingUpdate.into(),
ManagerState::WaitingForReboot.into(),
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_permanent_callback_is_called() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
let (callback, mut receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.add_permanent_callback(callback);
manager.try_start_update(Initiator::Manual, None);
assert_eq!(
next_n_states(&mut receiver, 5).await,
vec![
ManagerState::Idle.into(),
ManagerState::CheckingForUpdates.into(),
ManagerState::PerformingUpdate.into(),
ManagerState::EncounteredError.into(),
ManagerState::Idle.into(),
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_permanent_callback_persists_across_attempts() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
let (callback, mut receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.add_permanent_callback(callback);
manager.try_start_update(Initiator::Manual, None);
// waiting for Idle state guarantees second try_start_update call
// starts a new attempt
assert_eq!(
next_n_states(&mut receiver, 5).await,
vec![
ManagerState::Idle.into(),
ManagerState::CheckingForUpdates.into(),
ManagerState::PerformingUpdate.into(),
ManagerState::EncounteredError.into(),
ManagerState::Idle.into(),
]
);
manager.try_start_update(Initiator::Manual, None);
assert_eq!(
next_n_states(&mut receiver, 4).await,
vec![
ManagerState::CheckingForUpdates.into(),
ManagerState::PerformingUpdate.into(),
ManagerState::EncounteredError.into(),
ManagerState::Idle.into(),
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_channel_updater_called() {
let channel_updater = FakeTargetChannelUpdater::new();
let manager = UpdateManager::from_checker_and_applier(
channel_updater.clone(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
UnreachableUpdateApplier,
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
receiver.collect::<Vec<State>>().await;
assert_eq!(channel_updater.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_update_applier_called_if_update_available() {
let update_applier = FakeUpdateApplier::new_error();
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
update_applier.clone(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
receiver.collect::<Vec<State>>().await;
assert_eq!(update_applier.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_update_applier_not_called_if_up_to_date() {
let update_applier = FakeUpdateApplier::new_error();
let current_channel_updater = FakeCurrentChannelUpdater::new();
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
current_channel_updater.clone(),
FakeUpdateChecker::new_up_to_date(),
update_applier.clone(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
receiver.collect::<Vec<State>>().await;
assert_eq!(update_applier.call_count(), 0);
assert_eq!(current_channel_updater.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_return_to_initial_state_on_update_check_error() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_error(),
FakeUpdateApplier::new_error(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
receiver.collect::<Vec<State>>().await;
assert_eq!(manager.get_state(), Default::default());
}
#[fasync::run_singlethreaded(test)]
async fn test_return_to_initial_state_on_update_apply_error() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback));
receiver.collect::<Vec<State>>().await;
assert_eq!(manager.get_state(), Default::default());
}
#[derive(Clone)]
pub struct BlockingUpdateChecker {
blocker: future::Shared<oneshot::Receiver<()>>,
}
impl BlockingUpdateChecker {
pub fn new_checker_and_sender() -> (Self, oneshot::Sender<()>) {
let (sender, receiver) = oneshot::channel();
let blocking_update_checker = BlockingUpdateChecker { blocker: receiver.shared() };
(blocking_update_checker, sender)
}
}
impl UpdateChecker for BlockingUpdateChecker {
fn check(&self) -> BoxFuture<'_, Result<SystemUpdateStatus, crate::errors::Error>> {
let blocker = self.blocker.clone();
async move {
assert!(blocker.await.is_ok(), "blocking future cancelled");
Ok(SystemUpdateStatus::UpdateAvailable {
current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid merkle"),
latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid merkle"),
})
}
.boxed()
}
}
#[fasync::run_singlethreaded(test)]
async fn test_get_state_in_checking_for_updates() {
let (blocking_update_checker, _sender) = BlockingUpdateChecker::new_checker_and_sender();
let manager = BlockingManagerManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
blocking_update_checker,
FakeUpdateApplier::new_error(),
);
manager.try_start_update(Initiator::Manual, None);
assert_eq!(manager.get_state().manager_state, ManagerState::CheckingForUpdates);
}
#[fasync::run_singlethreaded(test)]
async fn test_no_concurrent_update_attempts() {
let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender();
let update_applier = FakeUpdateApplier::new_error();
let manager = BlockingManagerManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
FakeCurrentChannelUpdater::new(),
blocking_update_checker,
update_applier.clone(),
);
let (callback, receiver) = FakeStateChangeCallback::new_callback_and_receiver();
let res0 = manager.try_start_update(Initiator::Manual, Some(callback));
// try_start_update advances state to CheckingForUpdates before returning
// and the blocking_update_checker keeps it there
let res1 = manager.try_start_update(Initiator::Manual, None);
assert_matches!(sender.send(()), Ok(()));
receiver.collect::<Vec<State>>().await;
assert_eq!(res0, CheckStartedResult::Started);
assert_eq!(res1, CheckStartedResult::InProgress);
assert_eq!(update_applier.call_count(), 1);
}
}