[system-update-checker] Forward progress from installer.
Bug: 55411
Change-Id: I6eb4ce1e453ba1459def575d954a8ddd25ded8bf
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/430538
Commit-Queue: Sen Jiang <senj@google.com>
Reviewed-by: Kevin Wells <kevinwells@google.com>
Testability-Review: Kevin Wells <kevinwells@google.com>
diff --git a/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx b/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx
index 043f43e..eb9c44d 100644
--- a/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx
+++ b/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx
@@ -4,14 +4,11 @@
},
"sandbox": {
"services": [
- "fuchsia.cobalt.SystemDataUpdater",
"fuchsia.logger.LogSink",
"fuchsia.paver.Paver",
"fuchsia.pkg.PackageResolver",
- "fuchsia.pkg.rewrite.Engine",
- "fuchsia.recovery.FactoryReset",
- "fuchsia.sys.Launcher",
- "fuchsia.update.channel.Provider"
+ "fuchsia.update.channel.Provider",
+ "fuchsia.update.installer.Installer"
]
}
}
diff --git a/src/sys/pkg/bin/system-update-checker/src/apply.rs b/src/sys/pkg/bin/system-update-checker/src/apply.rs
index 37f2b20..8508450 100644
--- a/src/sys/pkg/bin/system-update-checker/src/apply.rs
+++ b/src/sys/pkg/bin/system-update-checker/src/apply.rs
@@ -6,7 +6,9 @@
use anyhow::{anyhow, Context as _};
use fidl::endpoints::ServerEnd;
use fidl_fuchsia_update_ext::Initiator;
-use fidl_fuchsia_update_installer::{InstallerMarker, InstallerProxy, RebootControllerMarker};
+use fidl_fuchsia_update_installer::{
+ InstallerMarker, InstallerProxy, RebootControllerMarker, RebootControllerProxy,
+};
use fidl_fuchsia_update_installer_ext::{
self as installer, start_update, MonitorUpdateAttemptError, Options, State, StateId,
UpdateAttemptError,
@@ -14,41 +16,53 @@
use fuchsia_component::client::connect_to_service;
use fuchsia_syslog::fx_log_info;
use fuchsia_url::pkg_url::PkgUrl;
-use futures::{future::BoxFuture, prelude::*, FutureExt};
+use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
const UPDATE_URL: &str = "fuchsia-pkg://fuchsia.com/update";
-// On success, system may reboot before this function returns
-pub async fn apply_system_update(initiator: Initiator) -> Result<(), anyhow::Error> {
+// On success, system will reboot before this function returns
+pub async fn apply_system_update<'a>(
+ initiator: Initiator,
+) -> Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error> {
let installer_proxy =
connect_to_service::<InstallerMarker>().context("connecting to component Installer")?;
let mut update_installer = RealUpdateInstaller { installer_proxy };
- apply_system_update_and_reboot(&mut update_installer, initiator).await
+ apply_system_update_impl(&mut update_installer, initiator).await
}
-async fn apply_system_update_and_reboot<'a>(
- update_installer: &'a mut impl UpdateInstaller,
- initiator: Initiator,
-) -> Result<(), anyhow::Error> {
- let (reboot_controller, reboot_controller_server_end) =
- fidl::endpoints::create_proxy::<RebootControllerMarker>()
- .context("creating reboot controller")?;
+#[derive(Clone, Debug, Default, PartialEq)]
+pub struct ApplyProgress {
+ pub download_size: Option<u64>,
+ pub fraction_completed: Option<f32>,
+}
- apply_system_update_impl(update_installer, initiator, Some(reboot_controller_server_end))
- .await?;
+impl ApplyProgress {
+ #[cfg(test)]
+ pub fn new(download_size: u64, fraction_completed: f32) -> Self {
+ ApplyProgress {
+ download_size: Some(download_size),
+ fraction_completed: Some(fraction_completed),
+ }
+ }
- fx_log_info!("Successful update, rebooting...");
+ pub fn none() -> Self {
+ ApplyProgress::default()
+ }
+}
- reboot_controller
- .unblock()
- .map_err(|e| Error::RebootFailed(e))
- .context("notify installer it can reboot when ready")
+#[derive(Clone, Debug, PartialEq)]
+pub enum ApplyState {
+ InstallingUpdate(ApplyProgress),
+ WaitingForReboot(ApplyProgress),
}
// For mocking
trait UpdateInstaller {
- type UpdateAttempt: Stream<Item = Result<State, MonitorUpdateAttemptError>> + Unpin;
+ type UpdateAttempt: Stream<Item = Result<State, MonitorUpdateAttemptError>>
+ + Unpin
+ + Send
+ + 'static;
fn start_update(
&mut self,
@@ -79,11 +93,10 @@
}
}
-async fn apply_system_update_impl<'a>(
- update_installer: &'a mut impl UpdateInstaller,
+async fn apply_system_update_impl(
+ update_installer: &mut impl UpdateInstaller,
initiator: Initiator,
- reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
-) -> Result<(), anyhow::Error> {
+) -> Result<BoxStream<'static, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error> {
fx_log_info!("starting system updater");
let options = Options {
initiator: match initiator {
@@ -94,26 +107,70 @@
allow_attach_to_existing_attempt: true,
};
let update_url = PkgUrl::parse(UPDATE_URL)?;
- let mut update_attempt = update_installer
- .start_update(update_url, options, reboot_controller_server_end)
+ let (reboot_controller, reboot_controller_server_end) =
+ fidl::endpoints::create_proxy::<RebootControllerMarker>()
+ .context("creating reboot controller")?;
+
+ let update_attempt = update_installer
+ .start_update(update_url, options, Some(reboot_controller_server_end))
.await
.context(Error::SystemUpdaterFailed)?;
- while let Some(state) = update_attempt.try_next().await.context(Error::SystemUpdaterFailed)? {
+
+ Ok(async_generator::generate(|mut co| async move {
+ monitor_update_progress(update_attempt, reboot_controller, &mut co).await
+ })
+ .into_try_stream()
+ .boxed())
+}
+
+async fn monitor_update_progress(
+ mut update_attempt: impl Stream<Item = Result<State, MonitorUpdateAttemptError>> + Unpin,
+ reboot_controller: RebootControllerProxy,
+ co: &mut async_generator::Yield<ApplyState>,
+) -> Result<(), (ApplyProgress, anyhow::Error)> {
+ let mut apply_progress = ApplyProgress::none();
+ while let Some(state) = update_attempt
+ .try_next()
+ .await
+ .context(Error::SystemUpdaterFailed)
+ .map_err(|e| (apply_progress.clone(), e))?
+ {
fx_log_info!("Installer entered state: {}", state.name());
- if state.id() == StateId::WaitToReboot || state.is_success() {
+
+ apply_progress.download_size = state.download_size();
+ if let Some(progress) = state.progress() {
+ apply_progress.fraction_completed = Some(progress.fraction_completed());
+ }
+ if state.is_failure() {
+ return Err((apply_progress, anyhow!(Error::SystemUpdaterFailed)));
+ }
+ if state.id() == StateId::WaitToReboot {
+ co.yield_(ApplyState::WaitingForReboot(apply_progress.clone())).await;
+
+ fx_log_info!("Successful update, rebooting...");
+
+ reboot_controller
+ .unblock()
+ .map_err(|e| Error::RebootFailed(e))
+ .context("notify installer it can reboot when ready")
+ .map_err(|e| (apply_progress, e))?;
+ // On success, wait for reboot to happen.
+ let () = future::pending().await;
+ unreachable!();
+ }
+ co.yield_(ApplyState::InstallingUpdate(apply_progress.clone())).await;
+ if state.is_success() {
return Ok(());
- } else if state.is_failure() {
- return Err(anyhow!(Error::SystemUpdaterFailed));
}
}
- Err(anyhow!(Error::InstallationEndedUnexpectedly))
+ Err((apply_progress, anyhow!(Error::InstallationEndedUnexpectedly)))
}
#[cfg(test)]
mod test_apply_system_update_impl {
use super::*;
use fidl_fuchsia_update_installer::RebootControllerRequest;
- use fidl_fuchsia_update_installer_ext::{UpdateInfo, UpdateInfoAndProgress};
+ use fidl_fuchsia_update_installer_ext::{Progress, UpdateInfo, UpdateInfoAndProgress};
use fuchsia_async as fasync;
use matches::assert_matches;
use proptest::prelude::*;
@@ -130,7 +187,7 @@
_reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
let info = UpdateInfo::builder().download_size(0).build();
- let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+ let state = State::Complete(UpdateInfoAndProgress::done(info));
future::ok(futures::stream::once(future::ok(state))).boxed()
}
}
@@ -150,7 +207,7 @@
) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
self.was_called = true;
let info = UpdateInfo::builder().download_size(0).build();
- let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+ let state = State::Complete(UpdateInfoAndProgress::done(info));
future::ok(futures::stream::once(future::ok(state))).boxed()
}
}
@@ -159,8 +216,7 @@
async fn test_call_installer() {
let mut update_installer = WasCalledUpdateInstaller { was_called: false };
- apply_system_update_impl(&mut update_installer, Initiator::User, None).await.unwrap();
-
+ apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
assert!(update_installer.was_called);
}
@@ -169,6 +225,7 @@
update_url: Option<PkgUrl>,
options: Option<Options>,
reboot_controller_server_end: Option<Option<ServerEnd<RebootControllerMarker>>>,
+ state: Option<State>,
}
impl UpdateInstaller for ArgumentCapturingUpdateInstaller {
type UpdateAttempt =
@@ -183,8 +240,9 @@
self.update_url = Some(update_url);
self.options = Some(options);
self.reboot_controller_server_end = Some(reboot_controller_server_end);
- let info = UpdateInfo::builder().download_size(0).build();
- let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+ let state = self.state.clone().unwrap_or(State::Complete(UpdateInfoAndProgress::done(
+ UpdateInfo::builder().download_size(0).build(),
+ )));
future::ok(futures::stream::once(future::ok(state))).boxed()
}
}
@@ -193,16 +251,7 @@
async fn test_call_install_with_right_arguments() {
let mut update_installer = ArgumentCapturingUpdateInstaller::default();
- let (_, reboot_controller_server_end) =
- fidl::endpoints::create_proxy::<RebootControllerMarker>()
- .expect("creating reboot controller");
- apply_system_update_impl(
- &mut update_installer,
- Initiator::User,
- Some(reboot_controller_server_end),
- )
- .await
- .unwrap();
+ apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
assert_eq!(update_installer.update_url, Some(PkgUrl::parse(UPDATE_URL).unwrap()));
assert_eq!(
@@ -219,15 +268,20 @@
// Test that if system updater succeeds, system-update-checker calls the reboot service.
#[fasync::run_singlethreaded(test)]
async fn test_reboot_on_success() {
- let mut update_installer = ArgumentCapturingUpdateInstaller::default();
-
- apply_system_update_and_reboot(&mut update_installer, Initiator::User).await.unwrap();
-
+ let info = UpdateInfo::builder().download_size(0).build();
+ let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+ let mut update_installer =
+ ArgumentCapturingUpdateInstaller { state: Some(state), ..Default::default() };
let mut stream =
+ apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+ assert_matches!(stream.next().await, Some(Ok(ApplyState::WaitingForReboot(_))));
+ assert_matches!(stream.next().now_or_never(), None);
+
+ let mut reboot_stream =
update_installer.reboot_controller_server_end.unwrap().unwrap().into_stream().unwrap();
assert_matches!(
- stream.try_next().await.unwrap(),
- Some(RebootControllerRequest::Unblock { .. })
+ reboot_stream.next().await,
+ Some(Ok(RebootControllerRequest::Unblock { .. }))
);
}
@@ -259,60 +313,199 @@
#[fasync::run_singlethreaded(test)]
async fn test_does_not_reboot_on_failure() {
let mut update_installer = FailingUpdateInstaller::default();
- let update_result =
- apply_system_update_and_reboot(&mut update_installer, Initiator::User).await;
+ let (_, error) = apply_system_update_impl(&mut update_installer, Initiator::User)
+ .await
+ .unwrap()
+ .next()
+ .await
+ .unwrap()
+ .unwrap_err();
- assert_matches!(
- update_result.unwrap_err().downcast::<Error>().unwrap(),
- Error::SystemUpdaterFailed
- );
+ assert_matches!(error.downcast::<Error>().unwrap(), Error::SystemUpdaterFailed);
+ }
+
+ struct RebootUpdateInstaller;
+ impl UpdateInstaller for RebootUpdateInstaller {
+ type UpdateAttempt =
+ futures::stream::Once<future::Ready<Result<State, MonitorUpdateAttemptError>>>;
+
+ fn start_update(
+ &mut self,
+ _update_url: PkgUrl,
+ _options: Options,
+ _reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
+ ) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
+ let info = UpdateInfo::builder().download_size(0).build();
+ let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+ future::ok(futures::stream::once(future::ok(state))).boxed()
+ }
}
// Test that if the reboot controller isn't working, we surface the appropriate error after
// updating. This would be a bad state to be in, but at least a user would get output.
#[fasync::run_singlethreaded(test)]
async fn test_reboot_errors_on_no_service() {
- let mut update_installer = DoNothingUpdateInstaller;
+ let mut update_installer = RebootUpdateInstaller;
- let update_result =
- apply_system_update_and_reboot(&mut update_installer, Initiator::User).await;
+ let mut results: Vec<_> = apply_system_update_impl(&mut update_installer, Initiator::User)
+ .await
+ .unwrap()
+ .collect()
+ .await;
+ assert_eq!(results.len(), 2);
// We should have errored out on calling reboot_controller.
assert_matches!(
- update_result.err().expect("system update should fail").downcast::<Error>().unwrap(),
+ results
+ .remove(1)
+ .err()
+ .expect("system update should fail")
+ .1
+ .downcast::<Error>()
+ .unwrap(),
Error::RebootFailed(_)
);
}
proptest! {
#[test]
- fn test_options_passed_to_installer(
- initiator: Initiator)
- {
+ fn test_options_passed_to_installer(initiator: Initiator) {
let mut update_installer = ArgumentCapturingUpdateInstaller::default();
let mut executor =
fasync::Executor::new().expect("create executor in test");
- let (_, reboot_controller_server_end) =
- fidl::endpoints::create_proxy::<RebootControllerMarker>()
- .expect("creating reboot controller");
- let result = executor.run_singlethreaded(apply_system_update_impl(
- &mut update_installer,
- initiator,Some(reboot_controller_server_end)
- ));
+ executor.run_singlethreaded(async move{
+ let result = apply_system_update_impl(&mut update_installer, initiator
+ ).await;
- prop_assert!(result.is_ok(), "apply_system_update_impl failed: {:?}", result);
- prop_assert_eq!(
- update_installer.options,
- Some(Options {
- initiator: match initiator {
- Initiator::Service => installer::Initiator::Service,
- Initiator::User => installer::Initiator::User,
- },
- should_write_recovery: true,
- allow_attach_to_existing_attempt: true,
- })
- );
+ prop_assert!(result.is_ok(), "apply_system_update_impl failed: {:?}", result.err());
+ prop_assert_eq!(
+ update_installer.options,
+ Some(Options {
+ initiator: match initiator {
+ Initiator::Service => installer::Initiator::Service,
+ Initiator::User => installer::Initiator::User,
+ },
+ should_write_recovery: true,
+ allow_attach_to_existing_attempt: true,
+ })
+ );
+ Ok(())}
+ ).unwrap();
}
}
+
+ struct ProgressUpdateInstaller {
+ states: Vec<State>,
+ reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
+ }
+ impl ProgressUpdateInstaller {
+ fn new(states: Vec<State>) -> Self {
+ Self { states, reboot_controller_server_end: None }
+ }
+ }
+ impl UpdateInstaller for ProgressUpdateInstaller {
+ type UpdateAttempt =
+ futures::stream::Iter<std::vec::IntoIter<Result<State, MonitorUpdateAttemptError>>>;
+
+ fn start_update(
+ &mut self,
+ _update_url: PkgUrl,
+ _options: Options,
+ reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
+ ) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
+ self.reboot_controller_server_end = reboot_controller_server_end;
+ let results: Vec<_> = self.states.clone().into_iter().map(Ok).collect();
+ future::ok(futures::stream::iter(results)).boxed()
+ }
+ }
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_yield_progress_event() {
+ let info = UpdateInfo::builder().download_size(1000).build();
+ let mut update_installer = ProgressUpdateInstaller::new(vec![
+ State::Prepare,
+ State::Fetch(UpdateInfoAndProgress::new(info, Progress::none()).unwrap()),
+ State::Stage(
+ UpdateInfoAndProgress::new(
+ info,
+ Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build(),
+ )
+ .unwrap(),
+ ),
+ State::Stage(
+ UpdateInfoAndProgress::new(
+ info,
+ Progress::builder().fraction_completed(0.7).bytes_downloaded(1000).build(),
+ )
+ .unwrap(),
+ ),
+ State::WaitToReboot(UpdateInfoAndProgress::done(info)),
+ ]);
+
+ let mut stream =
+ apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+
+ for state in &[
+ ApplyState::InstallingUpdate(ApplyProgress::none()),
+ ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.0)),
+ ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.5)),
+ ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.7)),
+ ApplyState::WaitingForReboot(ApplyProgress::new(1000, 1.0)),
+ ] {
+ assert_eq!(&stream.next().await.unwrap().unwrap(), state);
+ }
+ assert_matches!(stream.next().now_or_never(), None);
+ }
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_installer_complete_state() {
+ let info = UpdateInfo::builder().download_size(1000).build();
+ let mut update_installer = ProgressUpdateInstaller::new(vec![
+ State::Prepare,
+ State::Fetch(UpdateInfoAndProgress::new(info, Progress::none()).unwrap()),
+ State::Stage(
+ UpdateInfoAndProgress::new(
+ info,
+ Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build(),
+ )
+ .unwrap(),
+ ),
+ State::Complete(UpdateInfoAndProgress::done(info)),
+ ]);
+
+ let mut stream =
+ apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+
+ for state in &[
+ ApplyState::InstallingUpdate(ApplyProgress::none()),
+ ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.0)),
+ ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.5)),
+ ApplyState::InstallingUpdate(ApplyProgress::new(1000, 1.0)),
+ ] {
+ assert_eq!(&stream.next().await.unwrap().unwrap(), state);
+ }
+ assert_matches!(stream.next().await, None);
+ }
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_installer_failure_event() {
+ let mut update_installer =
+ ProgressUpdateInstaller::new(vec![State::Prepare, State::FailPrepare]);
+
+ let mut stream =
+ apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+
+ assert_matches!(
+ stream.next().await,
+ Some(Ok(ApplyState::InstallingUpdate(ApplyProgress {
+ download_size: None,
+ fraction_completed: None
+ })))
+ );
+ assert_matches!(stream.next().await, Some(Err((ApplyProgress {
+ download_size: None,
+ fraction_completed: None
+ }, _))));
+ }
}
diff --git a/src/sys/pkg/bin/system-update-checker/src/update_manager.rs b/src/sys/pkg/bin/system-update-checker/src/update_manager.rs
index fc61ad8..f53e91c 100644
--- a/src/sys/pkg/bin/system-update-checker/src/update_manager.rs
+++ b/src/sys/pkg/bin/system-update-checker/src/update_manager.rs
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-use crate::apply::apply_system_update;
+use crate::apply::{apply_system_update, ApplyProgress, ApplyState};
use crate::channel::{CurrentChannelManager, TargetChannelManager};
use crate::check::{check_for_system_update, SystemUpdateStatus};
use crate::connect::ServiceConnect;
@@ -14,17 +14,22 @@
use fidl_fuchsia_pkg::{PackageResolverMarker, PackageResolverProxyInterface};
use fidl_fuchsia_update::CheckNotStartedReason;
use fidl_fuchsia_update_ext::{
- CheckOptions, Initiator, InstallationErrorData, InstallingData, State, UpdateInfo,
+ CheckOptions, Initiator, InstallationErrorData, InstallationProgress, InstallingData, State,
+ UpdateInfo,
};
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_service;
use fuchsia_hash::Hash;
use fuchsia_inspect as finspect;
use fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn};
-use futures::channel::{mpsc, oneshot};
-use futures::future::BoxFuture;
-use futures::prelude::*;
-use futures::{pin_mut, select};
+use futures::{
+ channel::{mpsc, oneshot},
+ future::BoxFuture,
+ pin_mut,
+ prelude::*,
+ select,
+ stream::BoxStream,
+};
use std::fs;
use std::path::Path;
use std::sync::Arc;
@@ -442,12 +447,12 @@
}) => {
fx_log_info!("current system_image merkle: {}", current_system_image);
fx_log_info!("new system_image available: {}", latest_system_image);
+ let version_available = latest_system_image.to_string();
{
- co.yield_(StatusEvent::VersionAvailableKnown(latest_system_image.to_string()))
- .await;
+ co.yield_(StatusEvent::VersionAvailableKnown(version_available.clone())).await;
co.yield_(StatusEvent::State(State::InstallingUpdate(InstallingData {
update: Some(UpdateInfo {
- version_available: Some(latest_system_image.to_string()),
+ version_available: Some(version_available.clone()),
download_size: None,
}),
installation_progress: None,
@@ -457,33 +462,86 @@
self.last_update_storage.store(&latest_update_package);
- if let Err(e) =
- self.update_applier.apply(initiator).await.context("apply_system_update failed")
+ match self
+ .update_applier
+ .apply(initiator)
+ .await
+ .context("apply_system_update failed")
{
- co.yield_(StatusEvent::State(State::InstallationError(
- InstallationErrorData {
- update: Some(UpdateInfo {
- version_available: Some(latest_system_image.to_string()),
- download_size: None,
- }),
- installation_progress: None,
- },
- )))
- .await;
- return Err(e);
- };
- // On success, system-updater will reboots the system when ready, so this code may
- // or may not be run. The only way to leave WaitingForReboot state is to restart
- // the component.
- co.yield_(StatusEvent::State(State::WaitingForReboot(InstallingData {
- update: Some(UpdateInfo {
- version_available: Some(latest_system_image.to_string()),
- download_size: None,
- }),
- installation_progress: None,
- })))
- .await;
- let () = future::pending().await;
+ Ok(mut stream) => {
+ let mut waiting_for_reboot = false;
+ while let Some(result) = stream.next().await {
+ match result {
+ Ok(apply_state) => {
+ let state = match apply_state {
+ ApplyState::InstallingUpdate(ApplyProgress {
+ download_size,
+ fraction_completed,
+ }) => State::InstallingUpdate(InstallingData {
+ update: Some(UpdateInfo {
+ version_available: Some(version_available.clone()),
+ download_size,
+ }),
+ installation_progress: Some(InstallationProgress {
+ fraction_completed,
+ }),
+ }),
+ ApplyState::WaitingForReboot(ApplyProgress {
+ download_size,
+ fraction_completed,
+ }) => {
+ waiting_for_reboot = true;
+ State::WaitingForReboot(InstallingData {
+ update: Some(UpdateInfo {
+ version_available: Some(
+ version_available.clone(),
+ ),
+ download_size,
+ }),
+ installation_progress: Some(InstallationProgress {
+ fraction_completed,
+ }),
+ })
+ }
+ };
+ co.yield_(StatusEvent::State(state)).await;
+ }
+ Err((ApplyProgress { download_size, fraction_completed }, e)) => {
+ // If we failed to unblock reboot, it will ends up here and we
+ // should not go back to InstallationError.
+ if !waiting_for_reboot {
+ co.yield_(StatusEvent::State(State::InstallationError(
+ InstallationErrorData {
+ update: Some(UpdateInfo {
+ version_available: Some(version_available),
+ download_size,
+ }),
+ installation_progress: Some(InstallationProgress {
+ fraction_completed,
+ }),
+ },
+ )))
+ .await;
+ }
+ return Err(e);
+ }
+ }
+ }
+ }
+ Err(e) => {
+ co.yield_(StatusEvent::State(State::InstallationError(
+ InstallationErrorData {
+ update: Some(UpdateInfo {
+ version_available: Some(version_available),
+ download_size: None,
+ }),
+ installation_progress: None,
+ },
+ )))
+ .await;
+ return Err(e);
+ }
+ }
}
}
Ok(())
@@ -537,13 +595,25 @@
// For mocking
pub trait UpdateApplier: Send + Sync + 'static {
- fn apply(&self, initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>>;
+ fn apply<'a>(
+ &self,
+ initiator: Initiator,
+ ) -> BoxFuture<
+ 'a,
+ Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>,
+ >;
}
pub struct RealUpdateApplier;
impl UpdateApplier for RealUpdateApplier {
- fn apply(&self, initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>> {
+ fn apply<'a>(
+ &self,
+ initiator: Initiator,
+ ) -> BoxFuture<
+ 'a,
+ Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>,
+ > {
apply_system_update(initiator).boxed()
}
}
@@ -701,12 +771,24 @@
#[derive(Clone)]
pub struct UnreachableUpdateApplier;
impl UpdateApplier for UnreachableUpdateApplier {
- fn apply(&self, _initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>> {
+ fn apply<'a>(
+ &self,
+ _initiator: Initiator,
+ ) -> BoxFuture<
+ 'a,
+ Result<
+ BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
+ anyhow::Error,
+ >,
+ > {
unreachable!();
}
}
- type ApplyResultFactory = fn() -> Result<(), crate::errors::Error>;
+ type ApplyResultFactory = fn() -> Result<
+ BoxStream<'static, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
+ crate::errors::Error,
+ >;
#[derive(Clone)]
pub struct FakeUpdateApplier {
@@ -715,7 +797,17 @@
}
impl FakeUpdateApplier {
pub fn new_success() -> Self {
- Self { result: || Ok(()), call_count: Arc::new(AtomicU64::new(0)) }
+ Self {
+ result: || {
+ Ok(futures::stream::iter(vec![
+ Ok(ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.42))),
+ Ok(ApplyState::WaitingForReboot(ApplyProgress::new(1000, 1.0))),
+ ])
+ .chain(futures::stream::pending())
+ .boxed())
+ },
+ call_count: Arc::new(AtomicU64::new(0)),
+ }
}
pub fn new_error() -> Self {
Self {
@@ -728,7 +820,16 @@
}
}
impl UpdateApplier for FakeUpdateApplier {
- fn apply(&self, _initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>> {
+ fn apply<'a>(
+ &self,
+ _initiator: Initiator,
+ ) -> BoxFuture<
+ 'a,
+ Result<
+ BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
+ anyhow::Error,
+ >,
+ > {
self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
future::ready((self.result)().map_err(|e| e.into())).boxed()
}
@@ -952,23 +1053,39 @@
.await
.spawn();
let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver();
- let expected_installing_data = InstallingData {
- update: Some(UpdateInfo {
- version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
- download_size: None,
- }),
- installation_progress: None,
- };
let options = CheckOptions::builder().initiator(Initiator::User).build();
manager.try_start_update(options, Some(callback)).await.unwrap();
assert_eq!(
- next_n_states(&mut receiver, 3).await,
+ next_n_states(&mut receiver, 4).await,
vec![
State::CheckingForUpdates,
- State::InstallingUpdate(expected_installing_data.clone()),
- State::WaitingForReboot(expected_installing_data),
+ State::InstallingUpdate(InstallingData {
+ update: Some(UpdateInfo {
+ version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
+ download_size: None,
+ }),
+ installation_progress: None
+ }),
+ State::InstallingUpdate(InstallingData {
+ update: Some(UpdateInfo {
+ version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
+ download_size: Some(1000),
+ }),
+ installation_progress: Some(InstallationProgress {
+ fraction_completed: Some(0.42)
+ })
+ }),
+ State::WaitingForReboot(InstallingData {
+ update: Some(UpdateInfo {
+ version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
+ download_size: Some(1000),
+ }),
+ installation_progress: Some(InstallationProgress {
+ fraction_completed: Some(1.0)
+ })
+ }),
]
);
diff --git a/src/sys/pkg/tests/system-update-checker/BUILD.gn b/src/sys/pkg/tests/system-update-checker/BUILD.gn
index b99a2cf..f357518 100644
--- a/src/sys/pkg/tests/system-update-checker/BUILD.gn
+++ b/src/sys/pkg/tests/system-update-checker/BUILD.gn
@@ -14,13 +14,16 @@
"//sdk/fidl/fuchsia.logger:fuchsia.logger-rustc",
"//sdk/fidl/fuchsia.paver:fuchsia.paver-rustc",
"//sdk/fidl/fuchsia.update:fuchsia.update-rustc",
- "//sdk/fidl/fuchsia.update:fuchsia.update-rustc",
"//sdk/fidl/fuchsia.update.channel:fuchsia.update.channel-rustc",
"//src/lib/fidl/rust/fidl",
"//src/lib/fuchsia-async",
"//src/lib/fuchsia-component",
"//src/lib/zircon/rust:fuchsia-zircon",
+ "//src/sys/pkg/lib/fidl-fuchsia-update-installer-ext",
+ "//src/sys/pkg/lib/fuchsia-pkg-testing",
+ "//src/sys/pkg/testing/mock-installer",
"//src/sys/pkg/testing/mock-paver",
+ "//src/sys/pkg/testing/mock-resolver",
"//third_party/rust_crates:futures",
"//third_party/rust_crates:parking_lot",
"//third_party/rust_crates:tempfile",
@@ -55,6 +58,9 @@
name = "system_update_checker_integration_test"
dest = "system-update-checker-integration-test"
environments = basic_envs
+ log_settings = {
+ max_severity = "ERROR"
+ }
},
]
}
diff --git a/src/sys/pkg/tests/system-update-checker/src/lib.rs b/src/sys/pkg/tests/system-update-checker/src/lib.rs
index 20a0025..fd6f8c5f 100644
--- a/src/sys/pkg/tests/system-update-checker/src/lib.rs
+++ b/src/sys/pkg/tests/system-update-checker/src/lib.rs
@@ -5,16 +5,23 @@
#![cfg(test)]
use {
fidl_fuchsia_paver::PaverRequestStream,
- fidl_fuchsia_update::{ManagerMarker, ManagerProxy, MonitorRequest, State},
+ fidl_fuchsia_update::{
+ CheckOptions, CheckingForUpdatesData, Initiator, InstallationProgress, InstallingData,
+ ManagerMarker, ManagerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, State,
+ UpdateInfo,
+ },
fidl_fuchsia_update_channel::{ProviderMarker, ProviderProxy},
- fuchsia_async as fasync,
+ fidl_fuchsia_update_installer_ext as installer, fuchsia_async as fasync,
fuchsia_component::{
client::{App, AppBuilder},
server::{NestedEnvironment, ServiceFs},
},
+ fuchsia_pkg_testing::make_packages_json,
fuchsia_zircon::Status,
futures::{channel::mpsc, prelude::*},
+ mock_installer::MockUpdateInstallerService,
mock_paver::{MockPaverService, MockPaverServiceBuilder, PaverEvent},
+ mock_resolver::MockResolverService,
std::{fs::File, sync::Arc},
tempfile::TempDir,
};
@@ -23,45 +30,68 @@
struct Mounts {
misc_ota: TempDir,
+ pkgfs_system: TempDir,
}
struct Proxies {
_paver: Arc<MockPaverService>,
paver_events: mpsc::UnboundedReceiver<PaverEvent>,
+ resolver: Arc<MockResolverService>,
channel_provider: ProviderProxy,
update_manager: ManagerProxy,
}
impl Mounts {
fn new() -> Self {
- Self { misc_ota: tempfile::tempdir().expect("/tmp to exist") }
+ Self {
+ misc_ota: tempfile::tempdir().expect("/tmp to exist"),
+ pkgfs_system: tempfile::tempdir().expect("/tmp to exist"),
+ }
}
}
-struct TestEnv {
- _env: NestedEnvironment,
- _mounts: Mounts,
- proxies: Proxies,
- _system_update_checker: App,
+struct TestEnvBuilder {
+ paver_builder: MockPaverServiceBuilder,
+ paver_events: mpsc::UnboundedReceiver<PaverEvent>,
+ installer: MockUpdateInstallerService,
}
-impl TestEnv {
- fn new<F>(paver_init: F) -> Self
- where
- F: FnOnce(MockPaverServiceBuilder) -> MockPaverServiceBuilder,
- {
+impl TestEnvBuilder {
+ fn new() -> Self {
let (events_tx, events_rx) = mpsc::unbounded();
let paver_builder = MockPaverServiceBuilder::new().event_hook(move |event| {
events_tx.unbounded_send(event.to_owned()).expect("to write to events channel")
});
+ Self {
+ paver_builder,
+ paver_events: events_rx,
+ installer: MockUpdateInstallerService::builder().build(),
+ }
+ }
- let paver = paver_init(paver_builder).build();
+ fn paver_init<F>(self, paver_init: F) -> Self
+ where
+ F: FnOnce(MockPaverServiceBuilder) -> MockPaverServiceBuilder,
+ {
+ Self { paver_builder: paver_init(self.paver_builder), ..self }
+ }
+ fn installer(self, installer: MockUpdateInstallerService) -> Self {
+ Self { installer, ..self }
+ }
+
+ fn build(self) -> TestEnv {
let mounts = Mounts::new();
+ std::fs::write(
+ mounts.pkgfs_system.path().join("meta"),
+ "0000000000000000000000000000000000000000000000000000000000000001",
+ )
+ .expect("write pkgfs/system/meta");
let mut fs = ServiceFs::new();
fs.add_proxy_service::<fidl_fuchsia_logger::LogSinkMarker, _>();
+ let paver = self.paver_builder.build();
let paver = Arc::new(paver);
let paver_clone = Arc::clone(&paver);
fs.add_fidl_service(move |stream: PaverRequestStream| {
@@ -73,6 +103,23 @@
.detach();
});
+ let resolver = Arc::new(MockResolverService::new(None));
+ let resolver_clone = Arc::clone(&resolver);
+ fs.add_fidl_service(move |stream| {
+ fasync::Task::spawn(
+ Arc::clone(&resolver_clone)
+ .run_resolver_service(stream)
+ .unwrap_or_else(|e| panic!("error running resolver service {:?}", e)),
+ )
+ .detach()
+ });
+
+ let installer = Arc::new(self.installer);
+ let installer_clone = Arc::clone(&installer);
+ fs.add_fidl_service(move |stream| {
+ fasync::Task::spawn(Arc::clone(&installer_clone).run_service(stream)).detach()
+ });
+
let env = fs
.create_salted_nested_environment("system-update-checker_integration_test_env")
.expect("nested environment to create successfully");
@@ -84,15 +131,21 @@
File::open(mounts.misc_ota.path()).expect("/misc/ota tempdir to open"),
)
.expect("/misc/ota to mount")
+ .add_dir_to_namespace(
+ "/pkgfs/system".to_string(),
+ File::open(mounts.pkgfs_system.path()).expect("/pkgfs/system tempdir to open"),
+ )
+ .expect("/pkgfs/system to mount")
.spawn(env.launcher())
.expect("system_update_checker to launch");
- Self {
+ TestEnv {
_env: env,
_mounts: mounts,
proxies: Proxies {
_paver: paver,
- paver_events: events_rx,
+ paver_events: self.paver_events,
+ resolver,
channel_provider: system_update_checker
.connect_to_service::<ProviderMarker>()
.expect("connect to channel provider"),
@@ -105,10 +158,58 @@
}
}
+struct TestEnv {
+ _env: NestedEnvironment,
+ _mounts: Mounts,
+ proxies: Proxies,
+ _system_update_checker: App,
+}
+
+impl TestEnv {
+ async fn check_now(&self) -> MonitorRequestStream {
+ let options = CheckOptions {
+ initiator: Some(Initiator::User),
+ allow_attaching_to_existing_update_check: Some(false),
+ };
+ let (client_end, stream) =
+ fidl::endpoints::create_request_stream::<MonitorMarker>().unwrap();
+ self.proxies
+ .update_manager
+ .check_now(options, Some(client_end))
+ .await
+ .expect("make check_now call")
+ .expect("check started");
+ stream
+ }
+}
+
+async fn expect_states(stream: &mut MonitorRequestStream, expected_states: &[State]) {
+ for expected_state in expected_states {
+ let MonitorRequest::OnState { state, responder } =
+ stream.try_next().await.unwrap().unwrap();
+ assert_eq!(&state, expected_state);
+ responder.send().unwrap();
+ }
+}
+
+fn update_info(download_size: Option<u64>) -> Option<UpdateInfo> {
+ Some(UpdateInfo {
+ version_available: Some(
+ "beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdead".to_string(),
+ ),
+ download_size,
+ })
+}
+
+fn progress(fraction_completed: Option<f32>) -> Option<InstallationProgress> {
+ Some(InstallationProgress { fraction_completed })
+}
+
#[fasync::run_singlethreaded(test)]
// Test will hang if system-update-checker does not call paver service
async fn test_calls_paver_service() {
- let mut env = TestEnv::new(|p| p);
+ let mut env = TestEnvBuilder::new().build();
+
assert_eq!(
env.proxies.paver_events.next().await,
Some(PaverEvent::SetActiveConfigurationHealthy)
@@ -118,7 +219,7 @@
#[fasync::run_singlethreaded(test)]
// Test will hang if system-update-checker does not call paver service
async fn test_channel_provider_get_current_works_after_paver_service_fails() {
- let mut env = TestEnv::new(|p| p.call_hook(|_| Status::INTERNAL));
+ let mut env = TestEnvBuilder::new().paver_init(|p| p.call_hook(|_| Status::INTERNAL)).build();
assert_eq!(
env.proxies.paver_events.next().await,
@@ -134,7 +235,7 @@
#[fasync::run_singlethreaded(test)]
// Test will hang if system-update-checker does not call paver service
async fn test_update_manager_check_now_works_after_paver_service_fails() {
- let mut env = TestEnv::new(|p| p.call_hook(|_| Status::INTERNAL));
+ let mut env = TestEnvBuilder::new().paver_init(|p| p.call_hook(|_| Status::INTERNAL)).build();
assert_eq!(
env.proxies.paver_events.next().await,
@@ -171,3 +272,105 @@
]
);
}
+
+#[fasync::run_singlethreaded(test)]
+async fn test_update_manager_progress() {
+ let (mut sender, receiver) = mpsc::channel(0);
+ let installer = MockUpdateInstallerService::builder().states_receiver(receiver).build();
+ let env = TestEnvBuilder::new().installer(installer).build();
+
+ env.proxies.resolver.url("fuchsia-pkg://fuchsia.com/update/0").resolve(
+ &env.proxies
+ .resolver
+ .package("update", "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef")
+ .add_file(
+ "packages.json",
+ make_packages_json(["fuchsia-pkg://fuchsia.com/system_image/0?hash=beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdead"]),
+ )
+ .add_file("zbi", "fake zbi"),
+ );
+ env.proxies
+ .resolver.url("fuchsia-pkg://fuchsia.com/system_image/0?hash=beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdead")
+ .resolve(
+ &env.proxies
+ .resolver
+ .package("system_image", "beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeada")
+ );
+
+ let mut stream = env.check_now().await;
+
+ expect_states(
+ &mut stream,
+ &[
+ State::CheckingForUpdates(CheckingForUpdatesData {}),
+ State::InstallingUpdate(InstallingData {
+ update: update_info(None),
+ installation_progress: None,
+ }),
+ ],
+ )
+ .await;
+ sender.send(installer::State::Prepare).await.unwrap();
+ expect_states(
+ &mut stream,
+ &[State::InstallingUpdate(InstallingData {
+ update: update_info(None),
+ installation_progress: progress(None),
+ })],
+ )
+ .await;
+ let installer_update_info = installer::UpdateInfo::builder().download_size(1000).build();
+ sender
+ .send(installer::State::Fetch(
+ installer::UpdateInfoAndProgress::new(
+ installer_update_info,
+ installer::Progress::none(),
+ )
+ .unwrap(),
+ ))
+ .await
+ .unwrap();
+ expect_states(
+ &mut stream,
+ &[State::InstallingUpdate(InstallingData {
+ update: update_info(Some(1000)),
+ installation_progress: progress(Some(0.0)),
+ })],
+ )
+ .await;
+ sender
+ .send(installer::State::Stage(
+ installer::UpdateInfoAndProgress::new(
+ installer_update_info,
+ installer::Progress::builder()
+ .fraction_completed(0.5)
+ .bytes_downloaded(500)
+ .build(),
+ )
+ .unwrap(),
+ ))
+ .await
+ .unwrap();
+ expect_states(
+ &mut stream,
+ &[State::InstallingUpdate(InstallingData {
+ update: update_info(Some(1000)),
+ installation_progress: progress(Some(0.5)),
+ })],
+ )
+ .await;
+ sender
+ .send(installer::State::WaitToReboot(installer::UpdateInfoAndProgress::done(
+ installer_update_info,
+ )))
+ .await
+ .unwrap();
+ expect_states(
+ &mut stream,
+ &[State::WaitingForReboot(InstallingData {
+ update: update_info(Some(1000)),
+ installation_progress: progress(Some(1.0)),
+ })],
+ )
+ .await;
+}