blob: 866a9450ac7d8ca5be33d5c65590f6aca77c5879 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::apply::{apply_system_update, Initiator};
use crate::channel::{CurrentChannelManager, TargetChannelManager};
use crate::check::{check_for_system_update, SystemUpdateStatus};
use crate::connect::ServiceConnect;
use crate::last_update_storage::{LastUpdateStorage, LastUpdateStorageFile};
use crate::update_monitor::{StateNotifier, UpdateMonitor};
use crate::update_service::RealStateNotifier;
use anyhow::{anyhow, Context as _, Error};
use fidl_fuchsia_pkg::{PackageResolverMarker, PackageResolverProxyInterface};
use fidl_fuchsia_update::CheckNotStartedReason;
use fidl_fuchsia_update_ext::{InstallationErrorData, InstallingData, State, UpdateInfo};
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_service;
use fuchsia_inspect as finspect;
use fuchsia_merkle::Hash;
use fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn};
use futures::future::BoxFuture;
use futures::lock::Mutex as AsyncMutex;
use futures::prelude::*;
use std::fs;
use std::path::Path;
use std::sync::Arc;
/// Manages the lifecycle of an update attempt and notifies interested clients.
//
// # Lock Order
//
// `updater` is locked for the duration of an update attempt, and an update attempt will
// periodically lock `monitor` to send status updates. Before an async task or thread can lock
// `updater`, it must release any locks on `monitor`.
//
pub struct UpdateManager<T, Ch, C, A, N>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
N: StateNotifier,
{
monitor: Arc<AsyncMutex<UpdateMonitor<N>>>,
updater: Arc<AsyncMutex<SystemInterface<T, Ch, C, A>>>,
}
struct SystemInterface<T, Ch, C, A>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
target_channel_updater: Arc<T>,
current_channel_updater: Arc<Ch>,
update_checker: C,
update_applier: A,
last_update_storage: Arc<dyn LastUpdateStorage + Send + Sync>,
last_known_update_package: Option<Hash>,
}
impl<T, Ch> UpdateManager<T, Ch, RealUpdateChecker, RealUpdateApplier, RealStateNotifier>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
{
pub async fn new(
target_channel_updater: Arc<T>,
current_channel_updater: Arc<Ch>,
node: finspect::Node,
) -> Self {
let (fut, update_monitor) = UpdateMonitor::from_inspect_node(node);
fasync::spawn(fut);
Self {
monitor: Arc::new(AsyncMutex::new(update_monitor)),
updater: Arc::new(AsyncMutex::new(
SystemInterface::load(
target_channel_updater,
current_channel_updater,
RealUpdateChecker,
RealUpdateApplier,
Arc::new(LastUpdateStorageFile { data_dir: "/data".into() }),
)
.await,
)),
}
}
}
impl<T, Ch, C, A, N> UpdateManager<T, Ch, C, A, N>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
N: StateNotifier,
{
#[cfg(test)]
pub async fn from_checker_and_applier(
target_channel_updater: Arc<T>,
current_channel_updater: Arc<Ch>,
update_checker: C,
update_applier: A,
last_update_storage: Arc<impl LastUpdateStorage + Send + Sync + 'static>,
) -> Self {
let (fut, update_monitor) = UpdateMonitor::new();
fasync::spawn(fut);
Self {
monitor: Arc::new(AsyncMutex::new(update_monitor)),
updater: Arc::new(AsyncMutex::new(SystemInterface::new(
target_channel_updater,
current_channel_updater,
update_checker,
update_applier,
last_update_storage,
None,
))),
}
}
#[cfg(test)]
pub async fn from_checker_and_applier_and_last_known_update_package(
target_channel_updater: Arc<T>,
current_channel_updater: Arc<Ch>,
update_checker: C,
update_applier: A,
last_update_storage: Arc<impl LastUpdateStorage + Send + Sync + 'static>,
last_known_update_package: Option<Hash>,
) -> Self {
let (fut, update_monitor) = UpdateMonitor::new();
fasync::spawn(fut);
Self {
monitor: Arc::new(AsyncMutex::new(update_monitor)),
updater: Arc::new(AsyncMutex::new(SystemInterface::new(
target_channel_updater,
current_channel_updater,
update_checker,
update_applier,
last_update_storage,
last_known_update_package,
))),
}
}
/// A Fuchsia Executor must be active when this method is called, b/c it uses fuchsia_async::spawn
pub async fn try_start_update(
&self,
initiator: Initiator,
callback: Option<N>,
allow_attaching_to_existing_update_check: Option<bool>,
) -> Result<(), CheckNotStartedReason> {
let mut monitor = self.monitor.lock().await;
match monitor.update_state() {
None => {
if let Some(cb) = callback {
monitor.add_temporary_callback(cb).await;
}
monitor.advance_update_state(Some(State::CheckingForUpdates)).await;
drop(monitor);
let updater = Arc::clone(&self.updater);
let monitor = Arc::clone(&self.monitor);
// Spawn so that callers of this method are not blocked
fasync::spawn(async move {
// Lock the updater for the duration of the update attempt. Contention is not
// expected except in the case that a previous update attempt failed, has set
// update_state back to None, but has not yet returned.
let mut updater = updater.lock().await;
updater.do_system_update_check_and_return_to_idle(monitor, initiator).await
});
Ok(())
}
_ => {
if allow_attaching_to_existing_update_check == Some(true) {
if let Some(cb) = callback {
monitor.add_temporary_callback(cb).await;
}
Ok(())
} else {
Err(CheckNotStartedReason::AlreadyInProgress)
}
}
}
}
pub async fn get_state(&self) -> Option<State> {
let monitor = self.monitor.lock().await;
monitor.update_state()
}
#[cfg(test)]
pub async fn add_temporary_callback(&self, callback: N) {
self.monitor.lock().await.add_temporary_callback(callback).await;
}
}
async fn load_last_update_package(
last_update_storage: &(dyn LastUpdateStorage + Send + Sync),
pkgfs_path: &Path,
package_resolver: Result<impl PackageResolverProxyInterface, anyhow::Error>,
) -> Option<Hash> {
if let Some(update) = last_update_storage.load() {
return Some(update);
}
if let Some(update) = discover_last_update_package(pkgfs_path, package_resolver).await {
last_update_storage.store(&update);
return Some(update);
}
None
}
async fn discover_last_update_package(
pkgfs_path: &Path,
package_resolver: Result<impl PackageResolverProxyInterface, anyhow::Error>,
) -> Option<Hash> {
fn check_dynamic_index(pkgfs_path: &Path) -> Result<Hash, anyhow::Error> {
let bytes = fs::read(pkgfs_path.join("packages/update/0/meta"))?;
let hex_str = std::str::from_utf8(&bytes)?;
Ok(hex_str.parse()?)
}
match check_dynamic_index(pkgfs_path) {
Ok(hash) => return Some(hash),
Err(err) => {
fx_log_warn!("error finding update package in dynamic index: {:#}", anyhow!(err))
}
}
async fn fetch_update_merkle(
package_resolver: Result<impl PackageResolverProxyInterface, anyhow::Error>,
) -> Result<Hash, anyhow::Error> {
let package_resolver = package_resolver?;
Ok(crate::check::latest_update_merkle(&package_resolver).await?)
}
match fetch_update_merkle(package_resolver).await {
Ok(hash) => return Some(hash),
Err(err) => fx_log_warn!("error resolving update package: {:#}", anyhow!(err)),
}
None
}
impl<T, Ch, C, A> SystemInterface<T, Ch, C, A>
where
T: TargetChannelUpdater,
Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
fn new(
target_channel_updater: Arc<T>,
current_channel_updater: Arc<Ch>,
update_checker: C,
update_applier: A,
last_update_storage: Arc<dyn LastUpdateStorage + Send + Sync>,
last_known_update_package: Option<Hash>,
) -> Self {
Self {
target_channel_updater,
current_channel_updater,
update_checker,
update_applier,
last_known_update_package,
last_update_storage,
}
}
pub async fn load(
target_channel_updater: Arc<T>,
current_channel_updater: Arc<Ch>,
update_checker: C,
update_applier: A,
last_update_storage: Arc<dyn LastUpdateStorage + Send + Sync>,
) -> Self {
let package_resolver = connect_to_service::<PackageResolverMarker>();
let last_known_update_package = load_last_update_package(
last_update_storage.as_ref(),
Path::new("/pkgfs"),
package_resolver,
)
.await;
Self::new(
target_channel_updater,
current_channel_updater,
update_checker,
update_applier,
last_update_storage,
last_known_update_package,
)
}
async fn do_system_update_check_and_return_to_idle<N: StateNotifier>(
&mut self,
monitor: Arc<AsyncMutex<UpdateMonitor<N>>>,
initiator: Initiator,
) {
if let Err(e) = self.do_system_update_check(monitor.clone(), initiator).await {
fx_log_err!("update attempt failed: {:#}", anyhow!(e));
}
let mut monitor = monitor.lock().await;
match monitor.update_state() {
Some(State::WaitingForReboot(_)) => fx_log_err!(
"system-update-checker is in the WaitingForReboot state. \
This should not have happened, because the sytem-updater should \
have rebooted the device before it returned."
),
_ => {
monitor.advance_update_state(None).await;
}
}
}
async fn do_system_update_check<N: StateNotifier>(
&mut self,
monitor: Arc<AsyncMutex<UpdateMonitor<N>>>,
initiator: Initiator,
) -> Result<(), Error> {
fx_log_info!(
"starting update check (requested by {})",
match initiator {
Initiator::Automatic => "service",
Initiator::Manual => "user",
}
);
self.target_channel_updater.update().await;
match self
.update_checker
.check(self.last_known_update_package.as_ref())
.await
.context("check_for_system_update failed")
{
Err(e) => {
monitor
.lock()
.await
.advance_update_state(Some(State::ErrorCheckingForUpdate))
.await;
return Err(e);
}
Ok(SystemUpdateStatus::UpToDate { system_image, update_package }) => {
fx_log_info!("current system_image merkle: {}", system_image);
fx_log_info!("system_image is already up-to-date");
if self.last_known_update_package.is_none() {
self.last_known_update_package = Some(update_package);
self.last_update_storage.store(&update_package);
}
self.current_channel_updater.update().await;
monitor.lock().await.advance_update_state(Some(State::NoUpdateAvailable)).await;
return Ok(());
}
Ok(SystemUpdateStatus::UpdateAvailable {
current_system_image,
latest_system_image,
latest_update_package,
}) => {
fx_log_info!("current system_image merkle: {}", current_system_image);
fx_log_info!("new system_image available: {}", latest_system_image);
{
let mut monitor = monitor.lock().await;
monitor.set_version_available(latest_system_image.to_string());
monitor
.advance_update_state(Some(State::InstallingUpdate(InstallingData {
update: Some(UpdateInfo {
version_available: Some(latest_system_image.to_string()),
download_size: None,
}),
installation_progress: None,
})))
.await;
}
self.last_update_storage.store(&latest_update_package);
if let Err(e) = self
.update_applier
.apply(current_system_image, latest_system_image, initiator)
.await
.context("apply_system_update failed")
{
monitor
.lock()
.await
.advance_update_state(Some(State::InstallationError(
InstallationErrorData {
update: Some(UpdateInfo {
version_available: Some(latest_system_image.to_string()),
download_size: None,
}),
installation_progress: None,
},
)))
.await;
return Err(e);
};
// On success, system-updater reboots the system before returning, so this code
// should never run. The only way to leave WaitingForReboot state is to restart
// the component
monitor
.lock()
.await
.advance_update_state(Some(State::WaitingForReboot(InstallingData {
update: Some(UpdateInfo {
version_available: Some(latest_system_image.to_string()),
download_size: None,
}),
installation_progress: None,
})))
.await;
}
}
Ok(())
}
}
// For mocking
pub trait UpdateChecker: Send + Sync + 'static {
fn check<'a>(
&self,
last_known_update_merkle: Option<&'a Hash>,
) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>>;
}
pub struct RealUpdateChecker;
impl UpdateChecker for RealUpdateChecker {
fn check<'a>(
&self,
last_known_update_merkle: Option<&'a Hash>,
) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> {
check_for_system_update(last_known_update_merkle).boxed()
}
}
// For mocking
pub trait TargetChannelUpdater: Send + Sync + 'static {
fn update(&self) -> BoxFuture<'_, ()>;
}
impl<S: ServiceConnect + 'static> TargetChannelUpdater for TargetChannelManager<S> {
fn update(&self) -> BoxFuture<'_, ()> {
TargetChannelManager::update(self)
.unwrap_or_else(|e| fx_log_err!("while updating target channel: {:#}", anyhow!(e)))
.boxed()
}
}
// For mocking
pub trait CurrentChannelUpdater: Send + Sync + 'static {
fn update(&self) -> BoxFuture<'_, ()>;
}
impl CurrentChannelUpdater for CurrentChannelManager {
fn update(&self) -> BoxFuture<'_, ()> {
CurrentChannelManager::update(self)
.unwrap_or_else(|e| fx_log_err!("while updating current channel: {:#}", anyhow!(e)))
.boxed()
}
}
// For mocking
pub trait UpdateApplier: Send + Sync + 'static {
fn apply(
&self,
current_system_image: Hash,
latest_system_image: Hash,
initiator: Initiator,
) -> BoxFuture<'_, Result<(), anyhow::Error>>;
}
pub struct RealUpdateApplier;
impl UpdateApplier for RealUpdateApplier {
fn apply(
&self,
current_system_image: Hash,
latest_system_image: Hash,
initiator: Initiator,
) -> BoxFuture<'_, Result<(), anyhow::Error>> {
apply_system_update(current_system_image, latest_system_image, initiator).boxed()
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use event_queue::{ClosedClient, Notify};
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::oneshot;
use futures::future::BoxFuture;
use matches::assert_matches;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
pub const CALLBACK_CHANNEL_SIZE: usize = 20;
pub const CURRENT_SYSTEM_IMAGE: &str =
"0000000000000000000000000000000000000000000000000000000000000000";
pub const LATEST_SYSTEM_IMAGE: &str =
"1111111111111111111111111111111111111111111111111111111111111111";
pub const CURRENT_UPDATE_PACKAGE: &str =
"2222222222222222222222222222222222222222222222222222222222222222";
pub const LATEST_UPDATE_PACKAGE: &str =
"3333333333333333333333333333333333333333333333333333333333333333";
#[derive(Clone)]
pub struct FakeUpdateChecker {
result: Result<SystemUpdateStatus, crate::errors::Error>,
call_count: Arc<AtomicU64>,
// Taking this mutex blocks update checker.
check_blocked: Arc<AsyncMutex<()>>,
}
impl FakeUpdateChecker {
fn new(result: Result<SystemUpdateStatus, crate::errors::Error>) -> Self {
Self {
result,
call_count: Arc::new(AtomicU64::new(0)),
check_blocked: Arc::new(AsyncMutex::new(())),
}
}
pub fn new_up_to_date() -> Self {
Self::new(Ok(SystemUpdateStatus::UpToDate {
system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid merkle"),
update_package: CURRENT_UPDATE_PACKAGE.parse().expect("valid merkle"),
}))
}
pub fn new_update_available() -> Self {
Self::new(Ok(SystemUpdateStatus::UpdateAvailable {
current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid merkle"),
latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid merkle"),
latest_update_package: LATEST_UPDATE_PACKAGE.parse().expect("valid merkle"),
}))
}
pub fn new_error() -> Self {
Self::new(Err(crate::errors::Error::ResolveUpdatePackage))
}
pub fn block(&self) -> Option<futures::lock::MutexGuard<'_, ()>> {
self.check_blocked.try_lock()
}
pub fn call_count(&self) -> u64 {
self.call_count.load(Ordering::SeqCst)
}
}
impl UpdateChecker for FakeUpdateChecker {
fn check<'a>(
&self,
_last_known_update_merkle: Option<&'a Hash>,
) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> {
let check_blocked = Arc::clone(&self.check_blocked);
let result = self.result.clone();
self.call_count.fetch_add(1, Ordering::SeqCst);
async move {
check_blocked.lock().await;
result.map_err(|e| e.into())
}
.boxed()
}
}
#[derive(Clone)]
pub struct FakeTargetChannelUpdater {
call_count: Arc<AtomicU64>,
}
impl FakeTargetChannelUpdater {
pub fn new() -> Self {
Self { call_count: Arc::new(AtomicU64::new(0)) }
}
pub fn call_count(&self) -> u64 {
self.call_count.load(Ordering::SeqCst)
}
}
impl TargetChannelUpdater for FakeTargetChannelUpdater {
fn update(&self) -> BoxFuture<'_, ()> {
let call_count = self.call_count.clone();
async move {
call_count.fetch_add(1, Ordering::SeqCst);
}
.boxed()
}
}
#[derive(Clone)]
pub struct FakeCurrentChannelUpdater {
call_count: Arc<AtomicU64>,
}
impl FakeCurrentChannelUpdater {
pub fn new() -> Self {
Self { call_count: Arc::new(AtomicU64::new(0)) }
}
pub fn call_count(&self) -> u64 {
self.call_count.load(Ordering::SeqCst)
}
}
impl CurrentChannelUpdater for FakeCurrentChannelUpdater {
fn update(&self) -> BoxFuture<'_, ()> {
let call_count = self.call_count.clone();
async move {
call_count.fetch_add(1, Ordering::SeqCst);
}
.boxed()
}
}
#[derive(Clone)]
pub struct UnreachableUpdateApplier;
impl UpdateApplier for UnreachableUpdateApplier {
fn apply(
&self,
_current_system_image: Hash,
_latest_system_image: Hash,
_initiator: Initiator,
) -> BoxFuture<'_, Result<(), anyhow::Error>> {
unreachable!();
}
}
#[derive(Clone)]
pub struct FakeUpdateApplier {
result: Result<(), crate::errors::Error>,
call_count: Arc<AtomicU64>,
}
impl FakeUpdateApplier {
pub fn new_success() -> Self {
Self { result: Ok(()), call_count: Arc::new(AtomicU64::new(0)) }
}
pub fn new_error() -> Self {
Self {
result: Err(crate::errors::Error::SystemUpdaterFailed),
call_count: Arc::new(AtomicU64::new(0)),
}
}
pub fn call_count(&self) -> u64 {
self.call_count.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl UpdateApplier for FakeUpdateApplier {
fn apply(
&self,
_current_system_image: Hash,
_latest_system_image: Hash,
_initiator: Initiator,
) -> BoxFuture<'_, Result<(), anyhow::Error>> {
self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
future::ready(self.result.clone().map_err(|e| e.into())).boxed()
}
}
#[derive(Clone)]
pub struct UnreachableNotifier;
impl Notify<State> for UnreachableNotifier {
fn notify(&self, _state: State) -> BoxFuture<'static, Result<(), ClosedClient>> {
unreachable!();
}
}
#[derive(Clone)]
pub struct StateChangeCollector {
states: Arc<Mutex<Vec<State>>>,
}
impl StateChangeCollector {
pub fn new() -> Self {
Self { states: Arc::new(Mutex::new(vec![])) }
}
pub fn take_states(&self) -> Vec<State> {
std::mem::replace(&mut self.states.lock(), vec![])
}
}
impl Notify<State> for StateChangeCollector {
fn notify(&self, state: State) -> BoxFuture<'static, Result<(), ClosedClient>> {
self.states.lock().push(state);
future::ready(Ok(())).boxed()
}
}
#[derive(Clone)]
struct FakeStateNotifier {
sender: Arc<Mutex<Sender<State>>>,
}
impl FakeStateNotifier {
fn new_callback_and_receiver() -> (Self, Receiver<State>) {
let (sender, receiver) = channel(CALLBACK_CHANNEL_SIZE);
(Self { sender: Arc::new(Mutex::new(sender)) }, receiver)
}
}
impl Notify<State> for FakeStateNotifier {
fn notify(&self, state: State) -> BoxFuture<'static, Result<(), ClosedClient>> {
self.sender.lock().try_send(state).expect("FakeStateNotifier failed to send state");
future::ready(Ok(())).boxed()
}
}
#[derive(Default)]
pub struct FakeLastUpdateStorage {
store: Mutex<Option<Hash>>,
}
impl FakeLastUpdateStorage {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
}
impl LastUpdateStorage for FakeLastUpdateStorage {
fn load(&self) -> Option<Hash> {
*self.store.lock()
}
fn store(&self, value: &Hash) {
*self.store.lock() = Some(*value);
}
}
type FakeUpdateManager = UpdateManager<
FakeTargetChannelUpdater,
FakeCurrentChannelUpdater,
FakeUpdateChecker,
FakeUpdateApplier,
FakeStateNotifier,
>;
type BlockingManagerManager = UpdateManager<
FakeTargetChannelUpdater,
FakeCurrentChannelUpdater,
BlockingUpdateChecker,
FakeUpdateApplier,
FakeStateNotifier,
>;
async fn next_n_states(receiver: &mut Receiver<State>, n: usize) -> Vec<State> {
let mut v = Vec::with_capacity(n);
for _ in 0..n {
v.push(receiver.next().await.expect("next_n_states stream empty"));
}
v
}
async fn wait_until_update_state_n(
receiver: &mut Receiver<State>,
update_state: State,
mut seen_count: u64,
) {
if seen_count == 0 {
return;
}
while let Some(new_state) = receiver.next().await {
if new_state == update_state {
seen_count -= 1;
if seen_count == 0 {
return;
}
}
}
panic!("wait_until_state_n emptied stream: {}", seen_count);
}
#[fasync::run_singlethreaded(test)]
async fn test_correct_initial_state() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
FakeLastUpdateStorage::new(),
)
.await;
assert_eq!(manager.get_state().await, Default::default());
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_returns_started() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
FakeLastUpdateStorage::new(),
)
.await;
assert_eq!(manager.try_start_update(Initiator::Manual, None, None).await, Ok(()));
}
#[fasync::run_singlethreaded(test)]
async fn test_temporary_callbacks_dropped_after_update_attempt() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback0, mut receiver0) = FakeStateNotifier::new_callback_and_receiver();
let (callback1, mut receiver1) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback0), None).await.unwrap();
// Wait for first update attempt to complete, to guarantee that the second
// try_start_update() call starts a new attempt (and generates more callback calls).
wait_until_update_state_n(&mut receiver0, State::NoUpdateAvailable, 1).await;
manager.try_start_update(Initiator::Manual, Some(callback1), None).await.unwrap();
// Wait for the second update attempt to complete, to guarantee the callbacks
// have been called with more states.
wait_until_update_state_n(&mut receiver1, State::NoUpdateAvailable, 1).await;
// The first callback should have been dropped after the first attempt completed,
// so it should still be empty.
assert_matches!(receiver0.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_callback_when_up_to_date() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
assert_eq!(
receiver.collect::<Vec<State>>().await,
vec![State::CheckingForUpdates, State::NoUpdateAvailable,]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_callback_when_update_available_and_apply_errors() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
let expected_update_info = Some(UpdateInfo {
version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
download_size: None,
});
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
assert_eq!(
receiver.collect::<Vec<State>>().await,
vec![
State::CheckingForUpdates,
State::InstallingUpdate(InstallingData {
update: expected_update_info.clone(),
installation_progress: None,
}),
State::InstallationError(InstallationErrorData {
update: expected_update_info,
installation_progress: None,
}),
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_try_start_update_callback_when_update_available_and_apply_succeeds() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_success(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver();
let expected_installing_data = InstallingData {
update: Some(UpdateInfo {
version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
download_size: None,
}),
installation_progress: None,
};
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
assert_eq!(
next_n_states(&mut receiver, 3).await,
vec![
State::CheckingForUpdates,
State::InstallingUpdate(expected_installing_data.clone()),
State::WaitingForReboot(expected_installing_data),
]
);
}
#[fasync::run_singlethreaded(test)]
async fn test_channel_updater_called() {
let channel_updater = Arc::new(FakeTargetChannelUpdater::new());
let manager = UpdateManager::from_checker_and_applier(
Arc::clone(&channel_updater),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
UnreachableUpdateApplier,
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(channel_updater.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_update_applier_called_if_update_available() {
let update_applier = FakeUpdateApplier::new_error();
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_update_available(),
update_applier.clone(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(update_applier.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_last_update_channel_stored_when_update_applied() {
let last_update_storage = FakeLastUpdateStorage::new();
let manager = FakeUpdateManager::from_checker_and_applier_and_last_known_update_package(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
last_update_storage.clone(),
Some(CURRENT_UPDATE_PACKAGE.parse().expect("valid merkle")),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(
last_update_storage.load(),
Some(LATEST_UPDATE_PACKAGE.parse().expect("valid merkle"))
);
}
#[fasync::run_singlethreaded(test)]
async fn test_update_applier_not_called_if_up_to_date() {
let update_applier = FakeUpdateApplier::new_error();
let current_channel_updater = Arc::new(FakeCurrentChannelUpdater::new());
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::clone(&current_channel_updater),
FakeUpdateChecker::new_up_to_date(),
update_applier.clone(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(update_applier.call_count(), 0);
assert_eq!(current_channel_updater.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_last_update_channel_stored_during_check_when_unknown() {
let last_update_storage = FakeLastUpdateStorage::new();
let manager = FakeUpdateManager::from_checker_and_applier_and_last_known_update_package(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_error(),
last_update_storage.clone(),
None,
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(
last_update_storage.load(),
Some(CURRENT_UPDATE_PACKAGE.parse().expect("valid merkle"))
);
}
#[fasync::run_singlethreaded(test)]
async fn test_last_update_channel_not_stored_during_check_when_known() {
let last_update_storage = FakeLastUpdateStorage::new();
let manager = FakeUpdateManager::from_checker_and_applier_and_last_known_update_package(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_error(),
last_update_storage.clone(),
Some(CURRENT_UPDATE_PACKAGE.parse().expect("valid merkle")),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(last_update_storage.load(), None);
}
#[fasync::run_singlethreaded(test)]
async fn test_return_to_initial_state_on_update_check_error() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_error(),
FakeUpdateApplier::new_error(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(manager.get_state().await, Default::default());
}
#[fasync::run_singlethreaded(test)]
async fn test_return_to_initial_state_on_update_apply_error() {
let manager = FakeUpdateManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
manager.try_start_update(Initiator::Manual, Some(callback), None).await.unwrap();
receiver.collect::<Vec<State>>().await;
assert_eq!(manager.get_state().await, Default::default());
}
#[derive(Clone)]
pub struct BlockingUpdateChecker {
blocker: future::Shared<oneshot::Receiver<()>>,
}
impl BlockingUpdateChecker {
pub fn new_checker_and_sender() -> (Self, oneshot::Sender<()>) {
let (sender, receiver) = oneshot::channel();
let blocking_update_checker = BlockingUpdateChecker { blocker: receiver.shared() };
(blocking_update_checker, sender)
}
}
impl UpdateChecker for BlockingUpdateChecker {
fn check<'a>(
&self,
_last_known_update_merkle: Option<&'a Hash>,
) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> {
let blocker = self.blocker.clone();
async move {
assert!(blocker.await.is_ok(), "blocking future cancelled");
Ok(SystemUpdateStatus::UpdateAvailable {
current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid merkle"),
latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid merkle"),
latest_update_package: LATEST_SYSTEM_IMAGE.parse().expect("valid merkle"),
})
}
.boxed()
}
}
#[fasync::run_singlethreaded(test)]
async fn test_get_state_in_checking_for_updates() {
let (blocking_update_checker, _sender) = BlockingUpdateChecker::new_checker_and_sender();
let manager = BlockingManagerManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
blocking_update_checker,
FakeUpdateApplier::new_error(),
FakeLastUpdateStorage::new(),
)
.await;
manager.try_start_update(Initiator::Manual, None, None).await.unwrap();
assert_eq!(manager.get_state().await, Some(State::CheckingForUpdates));
}
#[fasync::run_singlethreaded(test)]
async fn test_no_concurrent_update_attempts() {
let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender();
let update_applier = FakeUpdateApplier::new_error();
let manager = BlockingManagerManager::from_checker_and_applier(
Arc::new(FakeTargetChannelUpdater::new()),
Arc::new(FakeCurrentChannelUpdater::new()),
blocking_update_checker,
update_applier.clone(),
FakeLastUpdateStorage::new(),
)
.await;
let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
let res0 = manager.try_start_update(Initiator::Manual, Some(callback), None).await;
// try_start_update advances state to CheckingForUpdates before returning
// and the blocking_update_checker keeps it there
let res1 = manager.try_start_update(Initiator::Manual, None, None).await;
assert_matches!(sender.send(()), Ok(()));
receiver.collect::<Vec<State>>().await;
assert_eq!(res0, Ok(()));
assert_eq!(res1, Err(CheckNotStartedReason::AlreadyInProgress));
assert_eq!(update_applier.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_load_last_update_merkle() {
let storage_merkle = Hash::from([0x11; 32]);
let index_merkle = Hash::from([0x22; 32]);
let resolver_merkle = Hash::from([0x33; 32]);
let last_update_storage = FakeLastUpdateStorage::default();
last_update_storage.store(&storage_merkle);
let pkgfs_dir = tempfile::tempdir().expect("create temp dir");
fs::create_dir_all(pkgfs_dir.path().join("packages/update/0")).expect("mkdir");
fs::write(pkgfs_dir.path().join("packages/update/0/meta"), index_merkle.to_string())
.expect("write meta");
let package_resolver = Ok(
crate::check::test_check_for_system_update_impl::PackageResolverProxyTempDir::new_with_merkle(&resolver_merkle));
let result =
load_last_update_package(&last_update_storage, pkgfs_dir.path(), package_resolver)
.await;
assert_eq!(result, Some(storage_merkle))
}
#[fasync::run_singlethreaded(test)]
async fn test_load_last_update_merkle_from_dynamic_index() {
let index_merkle = Hash::from([0x22; 32]);
let resolver_merkle = Hash::from([0x33; 32]);
let last_update_storage = FakeLastUpdateStorage::default();
let pkgfs_dir = tempfile::tempdir().expect("create temp dir");
fs::create_dir_all(pkgfs_dir.path().join("packages/update/0")).expect("mkdir");
fs::write(pkgfs_dir.path().join("packages/update/0/meta"), index_merkle.to_string())
.expect("write meta");
let package_resolver = Ok(
crate::check::test_check_for_system_update_impl::PackageResolverProxyTempDir::new_with_merkle(&resolver_merkle));
let result =
load_last_update_package(&last_update_storage, pkgfs_dir.path(), package_resolver)
.await;
assert_eq!(result, Some(index_merkle))
}
#[fasync::run_singlethreaded(test)]
async fn test_load_last_update_merkle_from_package_resolver() {
let resolver_merkle = Hash::from([0x33; 32]);
let last_update_storage = FakeLastUpdateStorage::default();
let pkgfs_dir = tempfile::tempdir().expect("create temp dir");
let package_resolver = Ok(
crate::check::test_check_for_system_update_impl::PackageResolverProxyTempDir::new_with_merkle(&resolver_merkle));
let result =
load_last_update_package(&last_update_storage, pkgfs_dir.path(), package_resolver)
.await;
assert_eq!(result, Some(resolver_merkle))
}
}