| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::apply::{apply_system_update, ApplyProgress, ApplyState}; |
| use crate::channel::{CurrentChannelManager, TargetChannelManager}; |
| use crate::check::{check_for_system_update, SystemUpdateStatus}; |
| use crate::connect::ServiceConnect; |
| use crate::update_monitor::{StateNotifier, UpdateMonitor}; |
| use crate::update_service::RealStateNotifier; |
| use anyhow::{anyhow, Context as _, Error}; |
| use async_generator::GeneratorState; |
| use fidl_fuchsia_update::{ |
| CheckNotStartedReason, CommitStatusProviderMarker, InstallationDeferralReason, |
| }; |
| use fidl_fuchsia_update_ext::{ |
| query_commit_status, CheckOptions, CommitStatus, Initiator, InstallationDeferredData, |
| InstallationErrorData, InstallationProgress, InstallingData, State, UpdateInfo, |
| }; |
| use fuchsia_async as fasync; |
| use fuchsia_component::client::connect_to_service; |
| use fuchsia_hash::Hash; |
| use fuchsia_inspect as finspect; |
| use fuchsia_syslog::{fx_log_err, fx_log_info}; |
| use futures::{ |
| channel::{mpsc, oneshot}, |
| future::BoxFuture, |
| pin_mut, |
| prelude::*, |
| select, |
| stream::BoxStream, |
| }; |
| use std::sync::Arc; |
| |
| #[derive(Debug)] |
| pub struct UpdateManagerControlHandle<N>(mpsc::Sender<UpdateManagerRequest<N>>); |
| |
| impl<N> UpdateManagerControlHandle<N> |
| where |
| N: StateNotifier, |
| { |
| /// Try to start an update with the given options and optional monitor, returning whether or |
| /// not the attempt was started (or attached to, if the options allow it). |
| pub async fn try_start_update( |
| &mut self, |
| options: CheckOptions, |
| callback: Option<N>, |
| ) -> Result<(), CheckNotStartedReason> { |
| let (send, recv) = oneshot::channel(); |
| let () = self |
| .0 |
| .send(UpdateManagerRequest::TryStartUpdate { options, callback, responder: send }) |
| .await |
| .map_err(|_| CheckNotStartedReason::Internal)?; |
| recv.await.map_err(|_| CheckNotStartedReason::Internal)? |
| } |
| |
| #[cfg(test)] |
| pub async fn get_state(&mut self) -> Option<State> { |
| let (send, recv) = oneshot::channel(); |
| let () = self.0.send(UpdateManagerRequest::GetState { responder: send }).await.ok()?; |
| recv.await.ok()? |
| } |
| |
| #[cfg(test)] |
| pub async fn get_last_known_update_package_hash(&mut self) -> Option<Hash> { |
| let (send, recv) = oneshot::channel(); |
| let () = self |
| .0 |
| .send(UpdateManagerRequest::GetLastKnownUpdatePackageHash { responder: send }) |
| .await |
| .ok()?; |
| recv.await.ok()? |
| } |
| } |
| |
| // Manually implement Clone as not all N impl Clone, so derive(Clone) won't always impl Clone. |
| // See https://github.com/rust-lang/rust/issues/26925 for more context. |
| impl<N> Clone for UpdateManagerControlHandle<N> { |
| fn clone(&self) -> Self { |
| Self(self.0.clone()) |
| } |
| } |
| |
| #[derive(Debug)] |
| pub(crate) enum UpdateManagerRequest<N> { |
| TryStartUpdate { |
| options: CheckOptions, |
| callback: Option<N>, |
| responder: oneshot::Sender<Result<(), CheckNotStartedReason>>, |
| }, |
| #[cfg_attr(not(test), allow(dead_code))] |
| GetState { responder: oneshot::Sender<Option<State>> }, |
| #[cfg_attr(not(test), allow(dead_code))] |
| GetLastKnownUpdatePackageHash { responder: oneshot::Sender<Option<Hash>> }, |
| } |
| |
| #[derive(Debug)] |
| enum StatusEvent { |
| State(State), |
| VersionAvailableKnown(String), |
| } |
| |
| pub struct UpdateManager<T, Ch, C, A, N, Cq> |
| where |
| T: TargetChannelUpdater, |
| Ch: CurrentChannelUpdater, |
| C: UpdateChecker, |
| A: UpdateApplier, |
| N: StateNotifier, |
| Cq: CommitQuerier, |
| { |
| monitor: UpdateMonitor<N>, |
| updater: SystemInterface<T, Ch, C, A, Cq>, |
| } |
| |
| struct SystemInterface<T, Ch, C, A, Cq> |
| where |
| T: TargetChannelUpdater, |
| Ch: CurrentChannelUpdater, |
| C: UpdateChecker, |
| A: UpdateApplier, |
| Cq: CommitQuerier, |
| { |
| target_channel_updater: Arc<T>, |
| current_channel_updater: Arc<Ch>, |
| update_checker: C, |
| update_applier: A, |
| last_known_update_package: Option<Hash>, |
| commit_status: Option<CommitStatus>, |
| commit_querier: Cq, |
| } |
| |
| impl<T, Ch> |
| UpdateManager<T, Ch, RealUpdateChecker, RealUpdateApplier, RealStateNotifier, RealCommitQuerier> |
| where |
| T: TargetChannelUpdater, |
| Ch: CurrentChannelUpdater, |
| { |
| pub async fn new( |
| target_channel_updater: Arc<T>, |
| current_channel_updater: Arc<Ch>, |
| node: finspect::Node, |
| ) -> Self { |
| let (fut, update_monitor) = UpdateMonitor::from_inspect_node(node); |
| fasync::Task::spawn(fut).detach(); |
| Self { |
| monitor: update_monitor, |
| updater: SystemInterface::new( |
| target_channel_updater, |
| current_channel_updater, |
| RealUpdateChecker, |
| RealUpdateApplier, |
| None, |
| RealCommitQuerier, |
| None, |
| ), |
| } |
| } |
| } |
| |
| impl<T, Ch, C, A, N, Cq> UpdateManager<T, Ch, C, A, N, Cq> |
| where |
| T: TargetChannelUpdater, |
| Ch: CurrentChannelUpdater, |
| C: UpdateChecker, |
| A: UpdateApplier, |
| N: StateNotifier, |
| Cq: CommitQuerier, |
| { |
| #[cfg(test)] |
| pub async fn from_checker_and_applier( |
| target_channel_updater: Arc<T>, |
| current_channel_updater: Arc<Ch>, |
| update_checker: C, |
| update_applier: A, |
| commit_querier: Cq, |
| ) -> Self { |
| let (fut, update_monitor) = UpdateMonitor::new(); |
| fasync::Task::spawn(fut).detach(); |
| Self { |
| monitor: update_monitor, |
| updater: SystemInterface::new( |
| target_channel_updater, |
| current_channel_updater, |
| update_checker, |
| update_applier, |
| None, |
| commit_querier, |
| None, |
| ), |
| } |
| } |
| |
| #[cfg(test)] |
| async fn from_checker_and_applier_with_commit_status( |
| target_channel_updater: Arc<T>, |
| current_channel_updater: Arc<Ch>, |
| update_checker: C, |
| update_applier: A, |
| commit_querier: Cq, |
| commit_status: Option<CommitStatus>, |
| ) -> Self { |
| let last_known_update_package = None; |
| let (fut, update_monitor) = UpdateMonitor::new(); |
| fasync::Task::spawn(fut).detach(); |
| Self { |
| monitor: update_monitor, |
| updater: SystemInterface::new( |
| target_channel_updater, |
| current_channel_updater, |
| update_checker, |
| update_applier, |
| last_known_update_package, |
| commit_querier, |
| commit_status, |
| ), |
| } |
| } |
| |
| /// Builds and returns the update manager async task, along with a control handle to interact |
| /// with the task. The returned future must be polled for the update manager task to make |
| /// forward progress. |
| pub fn start(self) -> (UpdateManagerControlHandle<N>, impl Future<Output = ()>) { |
| let (send, recv) = mpsc::channel(0); |
| (UpdateManagerControlHandle(send), self.run(recv)) |
| } |
| |
| #[cfg(test)] |
| pub fn spawn(self) -> UpdateManagerControlHandle<N> { |
| let (ctl, fut) = self.start(); |
| fasync::Task::spawn(fut).detach(); |
| ctl |
| } |
| |
| async fn run(self, requests: mpsc::Receiver<UpdateManagerRequest<N>>) { |
| let Self { mut monitor, mut updater } = self; |
| pin_mut!(requests); |
| |
| loop { |
| // Get the next request to start an update attempt, responding to other requests with |
| // the appropriate defaults when no update attempt is in progress. |
| let (options, callback) = loop { |
| let request = match requests.next().await { |
| Some(request) => request, |
| None => return, |
| }; |
| |
| match request { |
| UpdateManagerRequest::TryStartUpdate { options, callback, responder } => { |
| let _ = responder.send(Ok(())); |
| break (options, callback); |
| } |
| UpdateManagerRequest::GetState { responder } => { |
| let _ = responder.send(None); |
| } |
| UpdateManagerRequest::GetLastKnownUpdatePackageHash { responder } => { |
| let _ = responder.send(updater.last_known_update_package); |
| } |
| } |
| }; |
| |
| // Start the update check with the requested options, configuring a monitor if |
| // requested. |
| if let Some(callback) = callback { |
| monitor.add_temporary_callback(callback).await; |
| } |
| |
| // Used for testing: it's ok to be slightly stale. |
| let last_known_update_package = updater.last_known_update_package; |
| |
| let update_check = async_generator::generate(|mut co| { |
| let updater = &mut updater; |
| async move { updater.do_system_update_check(&mut co, options.initiator).await } |
| }); |
| pin_mut!(update_check); |
| let mut current_state = None; |
| |
| // Run the update check, forwarding status updates to monitors, responding to requests |
| // to monitor the attempt and blocking requests to start a new update attempt. |
| let update_check_res = loop { |
| enum Op<N> { |
| Request(UpdateManagerRequest<N>), |
| Status(StatusEvent), |
| } |
| let op = select! { |
| request = requests.select_next_some() => Op::Request(request), |
| status = update_check.select_next_some() => match status { |
| GeneratorState::Yielded(status) => Op::Status(status), |
| GeneratorState::Complete(res) => break res, |
| }, |
| }; |
| match op { |
| Op::Request(UpdateManagerRequest::TryStartUpdate { |
| options, |
| callback, |
| responder, |
| }) => { |
| let _ = |
| responder.send(if !options.allow_attaching_to_existing_update_check { |
| Err(CheckNotStartedReason::AlreadyInProgress) |
| } else { |
| if let Some(callback) = callback { |
| monitor.add_temporary_callback(callback).await; |
| } |
| Ok(()) |
| }); |
| } |
| Op::Request(UpdateManagerRequest::GetState { responder }) => { |
| let _ = responder.send(current_state.clone()); |
| } |
| Op::Request(UpdateManagerRequest::GetLastKnownUpdatePackageHash { |
| responder, |
| }) => { |
| let _ = responder.send(last_known_update_package.clone()); |
| } |
| Op::Status(StatusEvent::State(state)) => { |
| current_state = Some(state.clone()); |
| let should_flush = matches!(state, State::WaitingForReboot(_)); |
| monitor.advance_update_state(state).await; |
| if should_flush { |
| monitor.try_flush().await; |
| } |
| } |
| Op::Status(StatusEvent::VersionAvailableKnown(version)) => { |
| monitor.set_version_available(version); |
| } |
| } |
| }; |
| |
| // Log the result of the update check and reset the monitor queue/inspect state for the |
| // attempt. |
| match update_check_res { |
| Ok(()) => {} |
| Err(e) => { |
| fx_log_err!("update attempt failed: {:#}", anyhow!(e)); |
| } |
| } |
| monitor.clear().await; |
| } |
| } |
| } |
| |
| impl<T, Ch, C, A, Cq> SystemInterface<T, Ch, C, A, Cq> |
| where |
| T: TargetChannelUpdater, |
| Ch: CurrentChannelUpdater, |
| C: UpdateChecker, |
| A: UpdateApplier, |
| Cq: CommitQuerier, |
| { |
| fn new( |
| target_channel_updater: Arc<T>, |
| current_channel_updater: Arc<Ch>, |
| update_checker: C, |
| update_applier: A, |
| last_known_update_package: Option<Hash>, |
| commit_querier: Cq, |
| commit_status: Option<CommitStatus>, |
| ) -> Self { |
| Self { |
| target_channel_updater, |
| current_channel_updater, |
| update_checker, |
| update_applier, |
| last_known_update_package, |
| commit_querier, |
| commit_status, |
| } |
| } |
| |
| async fn do_system_update_check( |
| &mut self, |
| co: &mut async_generator::Yield<StatusEvent>, |
| initiator: Initiator, |
| ) -> Result<(), Error> { |
| co.yield_(StatusEvent::State(State::CheckingForUpdates)).await; |
| fx_log_info!( |
| "starting update check (requested by {})", |
| match initiator { |
| Initiator::Service => "service", |
| Initiator::User => "user", |
| } |
| ); |
| |
| self.target_channel_updater.update().await; |
| |
| match self |
| .update_checker |
| .check(self.last_known_update_package.as_ref()) |
| .await |
| .context("check_for_system_update failed") |
| { |
| Err(e) => { |
| co.yield_(StatusEvent::State(State::ErrorCheckingForUpdate)).await; |
| return Err(e); |
| } |
| Ok(SystemUpdateStatus::UpToDate { system_image, update_package }) => { |
| fx_log_info!("current system_image hash: {}", system_image); |
| fx_log_info!("system_image is already up-to-date"); |
| |
| self.last_known_update_package = Some(update_package); |
| |
| self.current_channel_updater.update().await; |
| co.yield_(StatusEvent::State(State::NoUpdateAvailable)).await; |
| |
| return Ok(()); |
| } |
| Ok(SystemUpdateStatus::UpdateAvailable { |
| current_system_image, |
| latest_system_image, |
| }) => { |
| fx_log_info!("current system_image hash: {}", current_system_image); |
| fx_log_info!("new system_image available: {}", latest_system_image); |
| let version_available = latest_system_image.to_string(); |
| |
| let status = match self.commit_status { |
| Some(CommitStatus::Committed) => Ok(CommitStatus::Committed), |
| Some(CommitStatus::Pending) | None => self |
| .commit_querier |
| .query_commit_status() |
| .await |
| .context("while querying commit status"), |
| }; |
| |
| match status { |
| Ok(CommitStatus::Committed) => { |
| self.commit_status = Some(CommitStatus::Committed); |
| } |
| Ok(CommitStatus::Pending) => { |
| self.commit_status = Some(CommitStatus::Pending); |
| co.yield_(StatusEvent::State(State::InstallationDeferredByPolicy( |
| InstallationDeferredData { |
| update: Some(UpdateInfo { |
| version_available: Some(version_available.clone()), |
| download_size: None, |
| }), |
| deferral_reason: Some( |
| InstallationDeferralReason::CurrentSystemNotCommitted, |
| ), |
| }, |
| ))) |
| .await; |
| return Ok(()); |
| } |
| Err(e) => return Err(e), |
| } |
| |
| { |
| co.yield_(StatusEvent::VersionAvailableKnown(version_available.clone())).await; |
| co.yield_(StatusEvent::State(State::InstallingUpdate(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(version_available.clone()), |
| download_size: None, |
| }), |
| installation_progress: None, |
| }))) |
| .await; |
| } |
| |
| match self |
| .update_applier |
| .apply(initiator) |
| .await |
| .context("apply_system_update failed") |
| { |
| Ok(mut stream) => { |
| let mut waiting_for_reboot = false; |
| while let Some(result) = stream.next().await { |
| match result { |
| Ok(apply_state) => { |
| let state = match apply_state { |
| ApplyState::InstallingUpdate(ApplyProgress { |
| download_size, |
| fraction_completed, |
| }) => State::InstallingUpdate(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(version_available.clone()), |
| download_size, |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed, |
| }), |
| }), |
| ApplyState::WaitingForReboot(ApplyProgress { |
| download_size, |
| fraction_completed, |
| }) => { |
| waiting_for_reboot = true; |
| State::WaitingForReboot(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some( |
| version_available.clone(), |
| ), |
| download_size, |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed, |
| }), |
| }) |
| } |
| }; |
| co.yield_(StatusEvent::State(state)).await; |
| } |
| Err((ApplyProgress { download_size, fraction_completed }, e)) => { |
| // If we failed to unblock reboot, it will ends up here and we |
| // should not go back to InstallationError. |
| if !waiting_for_reboot { |
| co.yield_(StatusEvent::State(State::InstallationError( |
| InstallationErrorData { |
| update: Some(UpdateInfo { |
| version_available: Some(version_available), |
| download_size, |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed, |
| }), |
| }, |
| ))) |
| .await; |
| } |
| return Err(e); |
| } |
| } |
| } |
| } |
| Err(e) => { |
| co.yield_(StatusEvent::State(State::InstallationError( |
| InstallationErrorData { |
| update: Some(UpdateInfo { |
| version_available: Some(version_available), |
| download_size: None, |
| }), |
| installation_progress: None, |
| }, |
| ))) |
| .await; |
| return Err(e); |
| } |
| } |
| } |
| } |
| Ok(()) |
| } |
| } |
| |
| // For mocking |
| pub trait UpdateChecker: Send + Sync + 'static { |
| fn check<'a>( |
| &self, |
| last_known_update_hash: Option<&'a Hash>, |
| ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>>; |
| } |
| |
| pub struct RealUpdateChecker; |
| |
| impl UpdateChecker for RealUpdateChecker { |
| fn check<'a>( |
| &self, |
| last_known_update_hash: Option<&'a Hash>, |
| ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> { |
| check_for_system_update(last_known_update_hash).boxed() |
| } |
| } |
| |
| // For mocking |
| pub trait TargetChannelUpdater: Send + Sync + 'static { |
| fn update(&self) -> BoxFuture<'_, ()>; |
| } |
| |
| impl<S: ServiceConnect + 'static> TargetChannelUpdater for TargetChannelManager<S> { |
| fn update(&self) -> BoxFuture<'_, ()> { |
| TargetChannelManager::update(self) |
| .unwrap_or_else(|e| fx_log_err!("while updating target channel: {:#}", anyhow!(e))) |
| .boxed() |
| } |
| } |
| |
| // For mocking |
| pub trait CurrentChannelUpdater: Send + Sync + 'static { |
| fn update(&self) -> BoxFuture<'_, ()>; |
| } |
| |
| impl CurrentChannelUpdater for CurrentChannelManager { |
| fn update(&self) -> BoxFuture<'_, ()> { |
| CurrentChannelManager::update(self) |
| .unwrap_or_else(|e| fx_log_err!("while updating current channel: {:#}", anyhow!(e))) |
| .boxed() |
| } |
| } |
| |
| // For mocking |
| pub trait UpdateApplier: Send + Sync + 'static { |
| fn apply<'a>( |
| &self, |
| initiator: Initiator, |
| ) -> BoxFuture< |
| 'a, |
| Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>, |
| >; |
| } |
| |
| pub struct RealUpdateApplier; |
| |
| impl UpdateApplier for RealUpdateApplier { |
| fn apply<'a>( |
| &self, |
| initiator: Initiator, |
| ) -> BoxFuture< |
| 'a, |
| Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>, |
| > { |
| apply_system_update(initiator).boxed() |
| } |
| } |
| |
| // For mocking. |
| pub trait CommitQuerier: Send + Sync + 'static { |
| fn query_commit_status<'a>(&self) -> BoxFuture<'a, Result<CommitStatus, anyhow::Error>>; |
| } |
| |
| pub struct RealCommitQuerier; |
| |
| impl CommitQuerier for RealCommitQuerier { |
| fn query_commit_status<'a>(&self) -> BoxFuture<'a, Result<CommitStatus, anyhow::Error>> { |
| async { |
| let provider = connect_to_service::<CommitStatusProviderMarker>() |
| .context("while connecting to commit status provider")?; |
| query_commit_status(&provider).await |
| } |
| .boxed() |
| } |
| } |
| |
| #[cfg(test)] |
| pub(crate) mod tests { |
| use super::*; |
| use crate::errors; |
| use event_queue::{ClosedClient, Notify}; |
| use fuchsia_async::{DurationExt, TimeoutExt}; |
| use fuchsia_zircon::prelude::*; |
| use fuchsia_zircon::{self as zx}; |
| use futures::channel::mpsc::{channel, Receiver, Sender}; |
| use futures::channel::oneshot; |
| use futures::future::BoxFuture; |
| use futures::lock::Mutex as AsyncMutex; |
| use matches::assert_matches; |
| use parking_lot::Mutex; |
| use std::sync::atomic::{AtomicU64, Ordering}; |
| |
| pub const CALLBACK_CHANNEL_SIZE: usize = 20; |
| pub const CURRENT_SYSTEM_IMAGE: &str = |
| "0000000000000000000000000000000000000000000000000000000000000000"; |
| pub const LATEST_SYSTEM_IMAGE: &str = |
| "1111111111111111111111111111111111111111111111111111111111111111"; |
| pub const CURRENT_UPDATE_PACKAGE: &str = |
| "2222222222222222222222222222222222222222222222222222222222222222"; |
| |
| pub(crate) struct FakeUpdateManagerControlHandle<N> { |
| requests: mpsc::Receiver<UpdateManagerRequest<N>>, |
| } |
| |
| impl<N> FakeUpdateManagerControlHandle<N> { |
| pub(crate) fn new() -> (UpdateManagerControlHandle<N>, Self) { |
| let (send, recv) = mpsc::channel(0); |
| |
| (UpdateManagerControlHandle(send), Self { requests: recv }) |
| } |
| |
| pub(crate) fn next(&mut self) -> Option<UpdateManagerRequest<N>> { |
| self.requests.next().now_or_never().flatten() |
| } |
| } |
| |
| type CheckResultFactory = fn() -> Result<SystemUpdateStatus, crate::errors::Error>; |
| |
| #[derive(Clone)] |
| pub struct FakeUpdateChecker { |
| result: CheckResultFactory, |
| call_count: Arc<AtomicU64>, |
| // Taking this mutex blocks update checker. |
| check_blocked: Arc<AsyncMutex<()>>, |
| } |
| impl FakeUpdateChecker { |
| fn new(result: CheckResultFactory) -> Self { |
| Self { |
| result, |
| call_count: Arc::new(AtomicU64::new(0)), |
| check_blocked: Arc::new(AsyncMutex::new(())), |
| } |
| } |
| pub fn new_up_to_date() -> Self { |
| Self::new(|| { |
| Ok(SystemUpdateStatus::UpToDate { |
| system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid hash"), |
| update_package: CURRENT_UPDATE_PACKAGE.parse().expect("valid hash"), |
| }) |
| }) |
| } |
| pub fn new_update_available() -> Self { |
| Self::new(|| { |
| Ok(SystemUpdateStatus::UpdateAvailable { |
| current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid hash"), |
| latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid hash"), |
| }) |
| }) |
| } |
| pub fn new_error() -> Self { |
| Self::new(|| { |
| Err(errors::Error::UpdatePackage(errors::UpdatePackage::Resolve( |
| zx::Status::INTERNAL, |
| ))) |
| }) |
| } |
| pub fn block(&self) -> Option<futures::lock::MutexGuard<'_, ()>> { |
| self.check_blocked.try_lock() |
| } |
| pub fn call_count(&self) -> u64 { |
| self.call_count.load(Ordering::SeqCst) |
| } |
| } |
| impl UpdateChecker for FakeUpdateChecker { |
| fn check<'a>( |
| &self, |
| _last_known_update_hash: Option<&'a Hash>, |
| ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> { |
| let check_blocked = Arc::clone(&self.check_blocked); |
| let result = (self.result)(); |
| self.call_count.fetch_add(1, Ordering::SeqCst); |
| |
| async move { |
| check_blocked.lock().await; |
| result |
| } |
| .boxed() |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct FakeTargetChannelUpdater { |
| call_count: Arc<AtomicU64>, |
| } |
| impl FakeTargetChannelUpdater { |
| pub fn new() -> Self { |
| Self { call_count: Arc::new(AtomicU64::new(0)) } |
| } |
| pub fn call_count(&self) -> u64 { |
| self.call_count.load(Ordering::SeqCst) |
| } |
| } |
| impl TargetChannelUpdater for FakeTargetChannelUpdater { |
| fn update(&self) -> BoxFuture<'_, ()> { |
| let call_count = self.call_count.clone(); |
| async move { |
| call_count.fetch_add(1, Ordering::SeqCst); |
| } |
| .boxed() |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct FakeCurrentChannelUpdater { |
| call_count: Arc<AtomicU64>, |
| } |
| impl FakeCurrentChannelUpdater { |
| pub fn new() -> Self { |
| Self { call_count: Arc::new(AtomicU64::new(0)) } |
| } |
| pub fn call_count(&self) -> u64 { |
| self.call_count.load(Ordering::SeqCst) |
| } |
| } |
| impl CurrentChannelUpdater for FakeCurrentChannelUpdater { |
| fn update(&self) -> BoxFuture<'_, ()> { |
| let call_count = self.call_count.clone(); |
| async move { |
| call_count.fetch_add(1, Ordering::SeqCst); |
| } |
| .boxed() |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct UnreachableUpdateApplier; |
| impl UpdateApplier for UnreachableUpdateApplier { |
| fn apply<'a>( |
| &self, |
| _initiator: Initiator, |
| ) -> BoxFuture< |
| 'a, |
| Result< |
| BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, |
| anyhow::Error, |
| >, |
| > { |
| unreachable!(); |
| } |
| } |
| |
| type ApplyResultFactory = fn() -> Result< |
| BoxStream<'static, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, |
| crate::errors::Error, |
| >; |
| |
| #[derive(Clone)] |
| pub struct FakeUpdateApplier { |
| result: ApplyResultFactory, |
| call_count: Arc<AtomicU64>, |
| } |
| impl FakeUpdateApplier { |
| pub fn new_success() -> Self { |
| Self { |
| result: || { |
| Ok(futures::stream::iter(vec![ |
| Ok(ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.42))), |
| Ok(ApplyState::WaitingForReboot(ApplyProgress::new(1000, 1.0))), |
| ]) |
| .chain(futures::stream::pending()) |
| .boxed()) |
| }, |
| call_count: Arc::new(AtomicU64::new(0)), |
| } |
| } |
| pub fn new_error() -> Self { |
| Self { |
| result: || Err(crate::errors::Error::SystemUpdaterFailed), |
| call_count: Arc::new(AtomicU64::new(0)), |
| } |
| } |
| pub fn call_count(&self) -> u64 { |
| self.call_count.load(std::sync::atomic::Ordering::Relaxed) |
| } |
| } |
| impl UpdateApplier for FakeUpdateApplier { |
| fn apply<'a>( |
| &self, |
| _initiator: Initiator, |
| ) -> BoxFuture< |
| 'a, |
| Result< |
| BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, |
| anyhow::Error, |
| >, |
| > { |
| self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); |
| future::ready((self.result)().map_err(|e| e.into())).boxed() |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct FakeCommitQuerier { |
| call_count: Arc<AtomicU64>, |
| committed: bool, |
| } |
| |
| impl FakeCommitQuerier { |
| pub fn new() -> Self { |
| Self { call_count: Arc::new(AtomicU64::new(0)), committed: true } |
| } |
| |
| pub fn new_pending() -> Self { |
| Self { call_count: Arc::new(AtomicU64::new(0)), committed: false } |
| } |
| } |
| |
| impl CommitQuerier for FakeCommitQuerier { |
| fn query_commit_status<'a>(&self) -> BoxFuture<'a, Result<CommitStatus, anyhow::Error>> { |
| self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); |
| if self.committed { |
| future::ready(Ok(CommitStatus::Committed)).boxed() |
| } else { |
| future::ready(Ok(CommitStatus::Pending)).boxed() |
| } |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct UnreachableNotifier; |
| impl Notify for UnreachableNotifier { |
| type Event = State; |
| type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>; |
| fn notify(&self, _state: State) -> Self::NotifyFuture { |
| unreachable!(); |
| } |
| } |
| |
| #[derive(Clone, Debug)] |
| pub struct StateChangeCollector { |
| states: Arc<Mutex<Vec<State>>>, |
| } |
| impl StateChangeCollector { |
| pub fn new() -> Self { |
| Self { states: Arc::new(Mutex::new(vec![])) } |
| } |
| pub fn take_states(&self) -> Vec<State> { |
| std::mem::replace(&mut self.states.lock(), vec![]) |
| } |
| } |
| impl Notify for StateChangeCollector { |
| type Event = State; |
| type NotifyFuture = future::Ready<Result<(), ClosedClient>>; |
| fn notify(&self, state: State) -> Self::NotifyFuture { |
| self.states.lock().push(state); |
| future::ready(Ok(())) |
| } |
| } |
| |
| #[derive(Clone)] |
| struct FakeStateNotifier { |
| sender: Arc<Mutex<Sender<State>>>, |
| } |
| impl FakeStateNotifier { |
| fn new_callback_and_receiver() -> (Self, Receiver<State>) { |
| let (sender, receiver) = channel(CALLBACK_CHANNEL_SIZE); |
| (Self { sender: Arc::new(Mutex::new(sender)) }, receiver) |
| } |
| } |
| impl Notify for FakeStateNotifier { |
| type Event = State; |
| type NotifyFuture = future::Ready<Result<(), ClosedClient>>; |
| fn notify(&self, state: State) -> Self::NotifyFuture { |
| self.sender.lock().try_send(state).expect("FakeStateNotifier failed to send state"); |
| future::ready(Ok(())) |
| } |
| } |
| |
| type FakeUpdateManager = UpdateManager< |
| FakeTargetChannelUpdater, |
| FakeCurrentChannelUpdater, |
| FakeUpdateChecker, |
| FakeUpdateApplier, |
| FakeStateNotifier, |
| FakeCommitQuerier, |
| >; |
| |
| type BlockingManagerManager = UpdateManager< |
| FakeTargetChannelUpdater, |
| FakeCurrentChannelUpdater, |
| BlockingUpdateChecker, |
| FakeUpdateApplier, |
| FakeStateNotifier, |
| FakeCommitQuerier, |
| >; |
| |
| async fn next_n_states(receiver: &mut Receiver<State>, n: usize) -> Vec<State> { |
| let mut v = Vec::with_capacity(n); |
| for _ in 0..n { |
| v.push(receiver.next().await.expect("next_n_states stream empty")); |
| } |
| v |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_correct_initial_state() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_up_to_date(), |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| |
| assert_eq!(manager.get_state().await, None); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_last_update_package_changed_when_no_update_available() { |
| let fake_update_checker = FakeUpdateChecker::new_up_to_date(); |
| |
| let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| fake_update_checker, |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| None, |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| assert_eq!(manager.try_start_update(options, Some(callback)).await, Ok(())); |
| |
| assert_eq!( |
| receiver.collect::<Vec<State>>().await, |
| vec![State::CheckingForUpdates, State::NoUpdateAvailable] |
| ); |
| |
| assert_eq!( |
| manager.get_last_known_update_package_hash().await, |
| Some(CURRENT_UPDATE_PACKAGE.parse().unwrap()) |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_last_update_package_unchanged_when_update_available() { |
| let fake_update_checker = FakeUpdateChecker::new_update_available(); |
| |
| let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| fake_update_checker, |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| None, |
| ) |
| .await |
| .spawn(); |
| let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| assert_eq!(manager.try_start_update(options, Some(callback)).await, Ok(())); |
| |
| assert_eq!( |
| next_n_states(&mut receiver, 4).await, |
| vec![ |
| State::CheckingForUpdates, |
| State::InstallingUpdate(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: None, |
| }), |
| installation_progress: None, |
| }), |
| State::InstallingUpdate(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: Some(1000), |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed: Some(0.42) |
| }) |
| }), |
| State::WaitingForReboot(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: Some(1000), |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed: Some(1.0) |
| }) |
| }), |
| ] |
| ); |
| |
| assert_eq!(manager.get_last_known_update_package_hash().await, None); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_is_current_status_committed_called_when_none() { |
| let fake_commit_querier = FakeCommitQuerier::new(); |
| let fidl_call_count = Arc::clone(&fake_commit_querier.call_count); |
| |
| let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_success(), |
| fake_commit_querier, |
| None, |
| ) |
| .await |
| .spawn(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| assert_eq!(manager.try_start_update(options, None).await, Ok(())); |
| |
| assert_eq!(fidl_call_count.load(Ordering::SeqCst), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_is_current_status_committed_called_when_pending() { |
| let fake_commit_querier = FakeCommitQuerier::new(); |
| let fidl_call_count = Arc::clone(&fake_commit_querier.call_count); |
| |
| let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_success(), |
| fake_commit_querier, |
| Some(CommitStatus::Pending), |
| ) |
| .await |
| .spawn(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| assert_eq!(manager.try_start_update(options, None).await, Ok(())); |
| |
| assert_eq!(fidl_call_count.load(Ordering::SeqCst), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_is_current_status_committed_not_called_when_committed() { |
| let fake_commit_querier = FakeCommitQuerier::new(); |
| let fidl_call_count = Arc::clone(&fake_commit_querier.call_count); |
| |
| let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_success(), |
| fake_commit_querier, |
| Some(CommitStatus::Committed), |
| ) |
| .await |
| .spawn(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| assert_eq!(manager.try_start_update(options, None).await, Ok(())); |
| |
| assert_eq!(fidl_call_count.load(Ordering::SeqCst), 0); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_try_start_update_returns_started() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_up_to_date(), |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| assert_eq!(manager.try_start_update(options, None).await, Ok(())); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_temporary_callbacks_dropped_after_update_attempt() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_up_to_date(), |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options.clone(), Some(callback)).await.unwrap(); |
| |
| // Drain the stream of status updates, which is only closed when the update attempt |
| // completes and the callbacks are dropped, so this would hang if the callback is not |
| // dropped after the update attempt. |
| assert_eq!( |
| receiver.collect::<Vec<State>>().await, |
| vec![State::CheckingForUpdates, State::NoUpdateAvailable] |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_try_start_update_callback_when_up_to_date() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_up_to_date(), |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| |
| assert_eq!( |
| receiver.collect::<Vec<State>>().await, |
| vec![State::CheckingForUpdates, State::NoUpdateAvailable] |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_try_start_update_callback_when_update_available_and_apply_errors() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_error(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| let expected_update_info = Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: None, |
| }); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| |
| assert_eq!( |
| receiver.collect::<Vec<State>>().await, |
| vec![ |
| State::CheckingForUpdates, |
| State::InstallingUpdate(InstallingData { |
| update: expected_update_info.clone(), |
| installation_progress: None, |
| }), |
| State::InstallationError(InstallationErrorData { |
| update: expected_update_info, |
| installation_progress: None, |
| }), |
| ] |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_try_start_update_callback_when_update_available_and_apply_succeeds() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| |
| assert_eq!( |
| next_n_states(&mut receiver, 4).await, |
| vec![ |
| State::CheckingForUpdates, |
| State::InstallingUpdate(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: None, |
| }), |
| installation_progress: None |
| }), |
| State::InstallingUpdate(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: Some(1000), |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed: Some(0.42) |
| }) |
| }), |
| State::WaitingForReboot(InstallingData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: Some(1000), |
| }), |
| installation_progress: Some(InstallationProgress { |
| fraction_completed: Some(1.0) |
| }) |
| }), |
| ] |
| ); |
| |
| // The update attempt will never leave the WaitingForReboot state. |
| assert_eq!( |
| receiver.next().map(Some).on_timeout(100.millis().after_now(), || None).await, |
| None |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_check_start_update_callback_when_update_available_and_pending() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_success(), |
| FakeCommitQuerier::new_pending(), |
| ) |
| .await |
| .spawn(); |
| let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| |
| assert_eq!( |
| next_n_states(&mut receiver, 2).await, |
| vec![ |
| State::CheckingForUpdates, |
| State::InstallationDeferredByPolicy(InstallationDeferredData { |
| update: Some(UpdateInfo { |
| version_available: Some(LATEST_SYSTEM_IMAGE.to_string()), |
| download_size: None |
| }), |
| deferral_reason: Some(InstallationDeferralReason::CurrentSystemNotCommitted) |
| }), |
| ] |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_channel_updater_called() { |
| let channel_updater = Arc::new(FakeTargetChannelUpdater::new()); |
| let mut manager = UpdateManager::from_checker_and_applier( |
| Arc::clone(&channel_updater), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_up_to_date(), |
| UnreachableUpdateApplier, |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| let _ = receiver.collect::<Vec<State>>().await; |
| |
| assert_eq!(channel_updater.call_count(), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_update_applier_called_if_update_available() { |
| let update_applier = FakeUpdateApplier::new_error(); |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| update_applier.clone(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| let _ = receiver.collect::<Vec<State>>().await; |
| |
| assert_eq!(update_applier.call_count(), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_update_applier_not_called_if_up_to_date() { |
| let update_applier = FakeUpdateApplier::new_error(); |
| let current_channel_updater = Arc::new(FakeCurrentChannelUpdater::new()); |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::clone(¤t_channel_updater), |
| FakeUpdateChecker::new_up_to_date(), |
| update_applier.clone(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| let _ = receiver.collect::<Vec<State>>().await; |
| |
| assert_eq!(update_applier.call_count(), 0); |
| assert_eq!(current_channel_updater.call_count(), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_return_to_initial_state_on_update_check_error() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_error(), |
| FakeUpdateApplier::new_error(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| let _ = receiver.collect::<Vec<State>>().await; |
| |
| assert_eq!(manager.get_state().await, Default::default()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_return_to_initial_state_on_update_apply_error() { |
| let mut manager = FakeUpdateManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| FakeUpdateChecker::new_update_available(), |
| FakeUpdateApplier::new_error(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, Some(callback)).await.unwrap(); |
| let _ = receiver.collect::<Vec<State>>().await; |
| |
| assert_eq!(manager.get_state().await, Default::default()); |
| } |
| |
| #[derive(Clone)] |
| pub struct BlockingUpdateChecker { |
| blocker: future::Shared<oneshot::Receiver<()>>, |
| } |
| impl BlockingUpdateChecker { |
| pub fn new_checker_and_sender() -> (Self, oneshot::Sender<()>) { |
| let (sender, receiver) = oneshot::channel(); |
| let blocking_update_checker = BlockingUpdateChecker { blocker: receiver.shared() }; |
| (blocking_update_checker, sender) |
| } |
| } |
| impl UpdateChecker for BlockingUpdateChecker { |
| fn check<'a>( |
| &self, |
| _last_known_update_hash: Option<&'a Hash>, |
| ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> { |
| let blocker = self.blocker.clone(); |
| async move { |
| assert!(blocker.await.is_ok(), "blocking future cancelled"); |
| Ok(SystemUpdateStatus::UpdateAvailable { |
| current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid hash"), |
| latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid hash"), |
| }) |
| } |
| .boxed() |
| } |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_get_state_in_checking_for_updates() { |
| let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender(); |
| let mut manager = BlockingManagerManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| blocking_update_checker, |
| FakeUpdateApplier::new_error(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| manager.try_start_update(options, None).await.unwrap(); |
| |
| // Wait for the update attempt to enter the CheckingForUpdates state, panicing if it |
| // completes prematurely. |
| loop { |
| let state = manager.get_state().await.unwrap(); |
| if state == State::CheckingForUpdates { |
| break; |
| } |
| } |
| |
| // Unblock the update attempt and verify that it eventually enters the idle state. |
| sender.send(()).unwrap(); |
| while let Some(_) = manager.get_state().await {} |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_no_concurrent_update_attempts_if_attach_not_requested() { |
| let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender(); |
| let update_applier = FakeUpdateApplier::new_error(); |
| let mut manager = BlockingManagerManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| blocking_update_checker, |
| update_applier.clone(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver(); |
| |
| let options = CheckOptions::builder().initiator(Initiator::User).build(); |
| let res0 = manager.try_start_update(options.clone(), Some(callback)).await; |
| let res1 = manager.try_start_update(options, None).await; |
| assert_matches!(sender.send(()), Ok(())); |
| let _ = receiver.collect::<Vec<State>>().await; |
| |
| assert_eq!(res0, Ok(())); |
| assert_eq!(res1, Err(CheckNotStartedReason::AlreadyInProgress)); |
| assert_eq!(update_applier.call_count(), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_merge_update_attempt_monitors_if_attach_requested() { |
| let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender(); |
| let update_applier = FakeUpdateApplier::new_error(); |
| let mut manager = BlockingManagerManager::from_checker_and_applier( |
| Arc::new(FakeTargetChannelUpdater::new()), |
| Arc::new(FakeCurrentChannelUpdater::new()), |
| blocking_update_checker, |
| update_applier.clone(), |
| FakeCommitQuerier::new(), |
| ) |
| .await |
| .spawn(); |
| let (callback0, receiver0) = FakeStateNotifier::new_callback_and_receiver(); |
| let (callback1, receiver1) = FakeStateNotifier::new_callback_and_receiver(); |
| let options = CheckOptions::builder().initiator(Initiator::User); |
| |
| let res0 = manager.try_start_update(options.clone().build(), Some(callback0)).await; |
| let res1 = manager |
| .try_start_update( |
| options.allow_attaching_to_existing_update_check(true).build(), |
| Some(callback1), |
| ) |
| .await; |
| assert_matches!(sender.send(()), Ok(())); |
| let states0 = receiver0.collect::<Vec<State>>().await; |
| let states1 = receiver1.collect::<Vec<State>>().await; |
| |
| assert_eq!(res0, Ok(())); |
| assert_eq!(res1, Ok(())); |
| assert_eq!(update_applier.call_count(), 1); |
| assert_eq!(states0, states1); |
| } |
| } |