[system-update-checker] Forward progress from installer.

Bug: 55411

Change-Id: I6eb4ce1e453ba1459def575d954a8ddd25ded8bf
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/430538
Commit-Queue: Sen Jiang <senj@google.com>
Reviewed-by: Kevin Wells <kevinwells@google.com>
Testability-Review: Kevin Wells <kevinwells@google.com>
diff --git a/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx b/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx
index 043f43e..eb9c44d 100644
--- a/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx
+++ b/src/sys/pkg/bin/system-update-checker/meta/system-update-checker-for-integration-test.cmx
@@ -4,14 +4,11 @@
     },
     "sandbox": {
         "services": [
-            "fuchsia.cobalt.SystemDataUpdater",
             "fuchsia.logger.LogSink",
             "fuchsia.paver.Paver",
             "fuchsia.pkg.PackageResolver",
-            "fuchsia.pkg.rewrite.Engine",
-            "fuchsia.recovery.FactoryReset",
-            "fuchsia.sys.Launcher",
-            "fuchsia.update.channel.Provider"
+            "fuchsia.update.channel.Provider",
+            "fuchsia.update.installer.Installer"
         ]
     }
 }
diff --git a/src/sys/pkg/bin/system-update-checker/src/apply.rs b/src/sys/pkg/bin/system-update-checker/src/apply.rs
index 37f2b20..8508450 100644
--- a/src/sys/pkg/bin/system-update-checker/src/apply.rs
+++ b/src/sys/pkg/bin/system-update-checker/src/apply.rs
@@ -6,7 +6,9 @@
 use anyhow::{anyhow, Context as _};
 use fidl::endpoints::ServerEnd;
 use fidl_fuchsia_update_ext::Initiator;
-use fidl_fuchsia_update_installer::{InstallerMarker, InstallerProxy, RebootControllerMarker};
+use fidl_fuchsia_update_installer::{
+    InstallerMarker, InstallerProxy, RebootControllerMarker, RebootControllerProxy,
+};
 use fidl_fuchsia_update_installer_ext::{
     self as installer, start_update, MonitorUpdateAttemptError, Options, State, StateId,
     UpdateAttemptError,
@@ -14,41 +16,53 @@
 use fuchsia_component::client::connect_to_service;
 use fuchsia_syslog::fx_log_info;
 use fuchsia_url::pkg_url::PkgUrl;
-use futures::{future::BoxFuture, prelude::*, FutureExt};
+use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
 
 const UPDATE_URL: &str = "fuchsia-pkg://fuchsia.com/update";
 
-// On success, system may reboot before this function returns
-pub async fn apply_system_update(initiator: Initiator) -> Result<(), anyhow::Error> {
+// On success, system will reboot before this function returns
+pub async fn apply_system_update<'a>(
+    initiator: Initiator,
+) -> Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error> {
     let installer_proxy =
         connect_to_service::<InstallerMarker>().context("connecting to component Installer")?;
     let mut update_installer = RealUpdateInstaller { installer_proxy };
 
-    apply_system_update_and_reboot(&mut update_installer, initiator).await
+    apply_system_update_impl(&mut update_installer, initiator).await
 }
 
-async fn apply_system_update_and_reboot<'a>(
-    update_installer: &'a mut impl UpdateInstaller,
-    initiator: Initiator,
-) -> Result<(), anyhow::Error> {
-    let (reboot_controller, reboot_controller_server_end) =
-        fidl::endpoints::create_proxy::<RebootControllerMarker>()
-            .context("creating reboot controller")?;
+#[derive(Clone, Debug, Default, PartialEq)]
+pub struct ApplyProgress {
+    pub download_size: Option<u64>,
+    pub fraction_completed: Option<f32>,
+}
 
-    apply_system_update_impl(update_installer, initiator, Some(reboot_controller_server_end))
-        .await?;
+impl ApplyProgress {
+    #[cfg(test)]
+    pub fn new(download_size: u64, fraction_completed: f32) -> Self {
+        ApplyProgress {
+            download_size: Some(download_size),
+            fraction_completed: Some(fraction_completed),
+        }
+    }
 
-    fx_log_info!("Successful update, rebooting...");
+    pub fn none() -> Self {
+        ApplyProgress::default()
+    }
+}
 
-    reboot_controller
-        .unblock()
-        .map_err(|e| Error::RebootFailed(e))
-        .context("notify installer it can reboot when ready")
+#[derive(Clone, Debug, PartialEq)]
+pub enum ApplyState {
+    InstallingUpdate(ApplyProgress),
+    WaitingForReboot(ApplyProgress),
 }
 
 // For mocking
 trait UpdateInstaller {
-    type UpdateAttempt: Stream<Item = Result<State, MonitorUpdateAttemptError>> + Unpin;
+    type UpdateAttempt: Stream<Item = Result<State, MonitorUpdateAttemptError>>
+        + Unpin
+        + Send
+        + 'static;
 
     fn start_update(
         &mut self,
@@ -79,11 +93,10 @@
     }
 }
 
-async fn apply_system_update_impl<'a>(
-    update_installer: &'a mut impl UpdateInstaller,
+async fn apply_system_update_impl(
+    update_installer: &mut impl UpdateInstaller,
     initiator: Initiator,
-    reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
-) -> Result<(), anyhow::Error> {
+) -> Result<BoxStream<'static, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error> {
     fx_log_info!("starting system updater");
     let options = Options {
         initiator: match initiator {
@@ -94,26 +107,70 @@
         allow_attach_to_existing_attempt: true,
     };
     let update_url = PkgUrl::parse(UPDATE_URL)?;
-    let mut update_attempt = update_installer
-        .start_update(update_url, options, reboot_controller_server_end)
+    let (reboot_controller, reboot_controller_server_end) =
+        fidl::endpoints::create_proxy::<RebootControllerMarker>()
+            .context("creating reboot controller")?;
+
+    let update_attempt = update_installer
+        .start_update(update_url, options, Some(reboot_controller_server_end))
         .await
         .context(Error::SystemUpdaterFailed)?;
-    while let Some(state) = update_attempt.try_next().await.context(Error::SystemUpdaterFailed)? {
+
+    Ok(async_generator::generate(|mut co| async move {
+        monitor_update_progress(update_attempt, reboot_controller, &mut co).await
+    })
+    .into_try_stream()
+    .boxed())
+}
+
+async fn monitor_update_progress(
+    mut update_attempt: impl Stream<Item = Result<State, MonitorUpdateAttemptError>> + Unpin,
+    reboot_controller: RebootControllerProxy,
+    co: &mut async_generator::Yield<ApplyState>,
+) -> Result<(), (ApplyProgress, anyhow::Error)> {
+    let mut apply_progress = ApplyProgress::none();
+    while let Some(state) = update_attempt
+        .try_next()
+        .await
+        .context(Error::SystemUpdaterFailed)
+        .map_err(|e| (apply_progress.clone(), e))?
+    {
         fx_log_info!("Installer entered state: {}", state.name());
-        if state.id() == StateId::WaitToReboot || state.is_success() {
+
+        apply_progress.download_size = state.download_size();
+        if let Some(progress) = state.progress() {
+            apply_progress.fraction_completed = Some(progress.fraction_completed());
+        }
+        if state.is_failure() {
+            return Err((apply_progress, anyhow!(Error::SystemUpdaterFailed)));
+        }
+        if state.id() == StateId::WaitToReboot {
+            co.yield_(ApplyState::WaitingForReboot(apply_progress.clone())).await;
+
+            fx_log_info!("Successful update, rebooting...");
+
+            reboot_controller
+                .unblock()
+                .map_err(|e| Error::RebootFailed(e))
+                .context("notify installer it can reboot when ready")
+                .map_err(|e| (apply_progress, e))?;
+            // On success, wait for reboot to happen.
+            let () = future::pending().await;
+            unreachable!();
+        }
+        co.yield_(ApplyState::InstallingUpdate(apply_progress.clone())).await;
+        if state.is_success() {
             return Ok(());
-        } else if state.is_failure() {
-            return Err(anyhow!(Error::SystemUpdaterFailed));
         }
     }
-    Err(anyhow!(Error::InstallationEndedUnexpectedly))
+    Err((apply_progress, anyhow!(Error::InstallationEndedUnexpectedly)))
 }
 
 #[cfg(test)]
 mod test_apply_system_update_impl {
     use super::*;
     use fidl_fuchsia_update_installer::RebootControllerRequest;
-    use fidl_fuchsia_update_installer_ext::{UpdateInfo, UpdateInfoAndProgress};
+    use fidl_fuchsia_update_installer_ext::{Progress, UpdateInfo, UpdateInfoAndProgress};
     use fuchsia_async as fasync;
     use matches::assert_matches;
     use proptest::prelude::*;
@@ -130,7 +187,7 @@
             _reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
         ) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
             let info = UpdateInfo::builder().download_size(0).build();
-            let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+            let state = State::Complete(UpdateInfoAndProgress::done(info));
             future::ok(futures::stream::once(future::ok(state))).boxed()
         }
     }
@@ -150,7 +207,7 @@
         ) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
             self.was_called = true;
             let info = UpdateInfo::builder().download_size(0).build();
-            let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+            let state = State::Complete(UpdateInfoAndProgress::done(info));
             future::ok(futures::stream::once(future::ok(state))).boxed()
         }
     }
@@ -159,8 +216,7 @@
     async fn test_call_installer() {
         let mut update_installer = WasCalledUpdateInstaller { was_called: false };
 
-        apply_system_update_impl(&mut update_installer, Initiator::User, None).await.unwrap();
-
+        apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
         assert!(update_installer.was_called);
     }
 
@@ -169,6 +225,7 @@
         update_url: Option<PkgUrl>,
         options: Option<Options>,
         reboot_controller_server_end: Option<Option<ServerEnd<RebootControllerMarker>>>,
+        state: Option<State>,
     }
     impl UpdateInstaller for ArgumentCapturingUpdateInstaller {
         type UpdateAttempt =
@@ -183,8 +240,9 @@
             self.update_url = Some(update_url);
             self.options = Some(options);
             self.reboot_controller_server_end = Some(reboot_controller_server_end);
-            let info = UpdateInfo::builder().download_size(0).build();
-            let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+            let state = self.state.clone().unwrap_or(State::Complete(UpdateInfoAndProgress::done(
+                UpdateInfo::builder().download_size(0).build(),
+            )));
             future::ok(futures::stream::once(future::ok(state))).boxed()
         }
     }
@@ -193,16 +251,7 @@
     async fn test_call_install_with_right_arguments() {
         let mut update_installer = ArgumentCapturingUpdateInstaller::default();
 
-        let (_, reboot_controller_server_end) =
-            fidl::endpoints::create_proxy::<RebootControllerMarker>()
-                .expect("creating reboot controller");
-        apply_system_update_impl(
-            &mut update_installer,
-            Initiator::User,
-            Some(reboot_controller_server_end),
-        )
-        .await
-        .unwrap();
+        apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
 
         assert_eq!(update_installer.update_url, Some(PkgUrl::parse(UPDATE_URL).unwrap()));
         assert_eq!(
@@ -219,15 +268,20 @@
     // Test that if system updater succeeds, system-update-checker calls the reboot service.
     #[fasync::run_singlethreaded(test)]
     async fn test_reboot_on_success() {
-        let mut update_installer = ArgumentCapturingUpdateInstaller::default();
-
-        apply_system_update_and_reboot(&mut update_installer, Initiator::User).await.unwrap();
-
+        let info = UpdateInfo::builder().download_size(0).build();
+        let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+        let mut update_installer =
+            ArgumentCapturingUpdateInstaller { state: Some(state), ..Default::default() };
         let mut stream =
+            apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+        assert_matches!(stream.next().await, Some(Ok(ApplyState::WaitingForReboot(_))));
+        assert_matches!(stream.next().now_or_never(), None);
+
+        let mut reboot_stream =
             update_installer.reboot_controller_server_end.unwrap().unwrap().into_stream().unwrap();
         assert_matches!(
-            stream.try_next().await.unwrap(),
-            Some(RebootControllerRequest::Unblock { .. })
+            reboot_stream.next().await,
+            Some(Ok(RebootControllerRequest::Unblock { .. }))
         );
     }
 
@@ -259,60 +313,199 @@
     #[fasync::run_singlethreaded(test)]
     async fn test_does_not_reboot_on_failure() {
         let mut update_installer = FailingUpdateInstaller::default();
-        let update_result =
-            apply_system_update_and_reboot(&mut update_installer, Initiator::User).await;
+        let (_, error) = apply_system_update_impl(&mut update_installer, Initiator::User)
+            .await
+            .unwrap()
+            .next()
+            .await
+            .unwrap()
+            .unwrap_err();
 
-        assert_matches!(
-            update_result.unwrap_err().downcast::<Error>().unwrap(),
-            Error::SystemUpdaterFailed
-        );
+        assert_matches!(error.downcast::<Error>().unwrap(), Error::SystemUpdaterFailed);
+    }
+
+    struct RebootUpdateInstaller;
+    impl UpdateInstaller for RebootUpdateInstaller {
+        type UpdateAttempt =
+            futures::stream::Once<future::Ready<Result<State, MonitorUpdateAttemptError>>>;
+
+        fn start_update(
+            &mut self,
+            _update_url: PkgUrl,
+            _options: Options,
+            _reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
+        ) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
+            let info = UpdateInfo::builder().download_size(0).build();
+            let state = State::WaitToReboot(UpdateInfoAndProgress::done(info));
+            future::ok(futures::stream::once(future::ok(state))).boxed()
+        }
     }
 
     // Test that if the reboot controller isn't working, we surface the appropriate error after
     // updating. This would be a bad state to be in, but at least a user would get output.
     #[fasync::run_singlethreaded(test)]
     async fn test_reboot_errors_on_no_service() {
-        let mut update_installer = DoNothingUpdateInstaller;
+        let mut update_installer = RebootUpdateInstaller;
 
-        let update_result =
-            apply_system_update_and_reboot(&mut update_installer, Initiator::User).await;
+        let mut results: Vec<_> = apply_system_update_impl(&mut update_installer, Initiator::User)
+            .await
+            .unwrap()
+            .collect()
+            .await;
 
+        assert_eq!(results.len(), 2);
         // We should have errored out on calling reboot_controller.
         assert_matches!(
-            update_result.err().expect("system update should fail").downcast::<Error>().unwrap(),
+            results
+                .remove(1)
+                .err()
+                .expect("system update should fail")
+                .1
+                .downcast::<Error>()
+                .unwrap(),
             Error::RebootFailed(_)
         );
     }
 
     proptest! {
         #[test]
-        fn test_options_passed_to_installer(
-            initiator: Initiator)
-        {
+        fn test_options_passed_to_installer(initiator: Initiator) {
             let mut update_installer = ArgumentCapturingUpdateInstaller::default();
 
             let mut executor =
                 fasync::Executor::new().expect("create executor in test");
-                let (_, reboot_controller_server_end) =
-                fidl::endpoints::create_proxy::<RebootControllerMarker>()
-                .expect("creating reboot controller");
-            let result = executor.run_singlethreaded(apply_system_update_impl(
-                &mut update_installer,
-                initiator,Some(reboot_controller_server_end)
-            ));
+            executor.run_singlethreaded(async move{
+                let result = apply_system_update_impl(&mut update_installer, initiator
+                    ).await;
 
-            prop_assert!(result.is_ok(), "apply_system_update_impl failed: {:?}", result);
-            prop_assert_eq!(
-                update_installer.options,
-                Some(Options {
-                    initiator:  match initiator {
-                        Initiator::Service => installer::Initiator::Service,
-                        Initiator::User => installer::Initiator::User,
-                    },
-                    should_write_recovery: true,
-                    allow_attach_to_existing_attempt: true,
-                })
-            );
+                prop_assert!(result.is_ok(), "apply_system_update_impl failed: {:?}", result.err());
+                prop_assert_eq!(
+                    update_installer.options,
+                    Some(Options {
+                        initiator:  match initiator {
+                            Initiator::Service => installer::Initiator::Service,
+                            Initiator::User => installer::Initiator::User,
+                        },
+                        should_write_recovery: true,
+                        allow_attach_to_existing_attempt: true,
+                    })
+                );
+                Ok(())}
+            ).unwrap();
         }
     }
+
+    struct ProgressUpdateInstaller {
+        states: Vec<State>,
+        reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
+    }
+    impl ProgressUpdateInstaller {
+        fn new(states: Vec<State>) -> Self {
+            Self { states, reboot_controller_server_end: None }
+        }
+    }
+    impl UpdateInstaller for ProgressUpdateInstaller {
+        type UpdateAttempt =
+            futures::stream::Iter<std::vec::IntoIter<Result<State, MonitorUpdateAttemptError>>>;
+
+        fn start_update(
+            &mut self,
+            _update_url: PkgUrl,
+            _options: Options,
+            reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
+        ) -> BoxFuture<'_, Result<Self::UpdateAttempt, UpdateAttemptError>> {
+            self.reboot_controller_server_end = reboot_controller_server_end;
+            let results: Vec<_> = self.states.clone().into_iter().map(Ok).collect();
+            future::ok(futures::stream::iter(results)).boxed()
+        }
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_yield_progress_event() {
+        let info = UpdateInfo::builder().download_size(1000).build();
+        let mut update_installer = ProgressUpdateInstaller::new(vec![
+            State::Prepare,
+            State::Fetch(UpdateInfoAndProgress::new(info, Progress::none()).unwrap()),
+            State::Stage(
+                UpdateInfoAndProgress::new(
+                    info,
+                    Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build(),
+                )
+                .unwrap(),
+            ),
+            State::Stage(
+                UpdateInfoAndProgress::new(
+                    info,
+                    Progress::builder().fraction_completed(0.7).bytes_downloaded(1000).build(),
+                )
+                .unwrap(),
+            ),
+            State::WaitToReboot(UpdateInfoAndProgress::done(info)),
+        ]);
+
+        let mut stream =
+            apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+
+        for state in &[
+            ApplyState::InstallingUpdate(ApplyProgress::none()),
+            ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.0)),
+            ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.5)),
+            ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.7)),
+            ApplyState::WaitingForReboot(ApplyProgress::new(1000, 1.0)),
+        ] {
+            assert_eq!(&stream.next().await.unwrap().unwrap(), state);
+        }
+        assert_matches!(stream.next().now_or_never(), None);
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_installer_complete_state() {
+        let info = UpdateInfo::builder().download_size(1000).build();
+        let mut update_installer = ProgressUpdateInstaller::new(vec![
+            State::Prepare,
+            State::Fetch(UpdateInfoAndProgress::new(info, Progress::none()).unwrap()),
+            State::Stage(
+                UpdateInfoAndProgress::new(
+                    info,
+                    Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build(),
+                )
+                .unwrap(),
+            ),
+            State::Complete(UpdateInfoAndProgress::done(info)),
+        ]);
+
+        let mut stream =
+            apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+
+        for state in &[
+            ApplyState::InstallingUpdate(ApplyProgress::none()),
+            ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.0)),
+            ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.5)),
+            ApplyState::InstallingUpdate(ApplyProgress::new(1000, 1.0)),
+        ] {
+            assert_eq!(&stream.next().await.unwrap().unwrap(), state);
+        }
+        assert_matches!(stream.next().await, None);
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_installer_failure_event() {
+        let mut update_installer =
+            ProgressUpdateInstaller::new(vec![State::Prepare, State::FailPrepare]);
+
+        let mut stream =
+            apply_system_update_impl(&mut update_installer, Initiator::User).await.unwrap();
+
+        assert_matches!(
+            stream.next().await,
+            Some(Ok(ApplyState::InstallingUpdate(ApplyProgress {
+                download_size: None,
+                fraction_completed: None
+            })))
+        );
+        assert_matches!(stream.next().await, Some(Err((ApplyProgress {
+            download_size: None,
+            fraction_completed: None
+        }, _))));
+    }
 }
diff --git a/src/sys/pkg/bin/system-update-checker/src/update_manager.rs b/src/sys/pkg/bin/system-update-checker/src/update_manager.rs
index fc61ad8..f53e91c 100644
--- a/src/sys/pkg/bin/system-update-checker/src/update_manager.rs
+++ b/src/sys/pkg/bin/system-update-checker/src/update_manager.rs
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use crate::apply::apply_system_update;
+use crate::apply::{apply_system_update, ApplyProgress, ApplyState};
 use crate::channel::{CurrentChannelManager, TargetChannelManager};
 use crate::check::{check_for_system_update, SystemUpdateStatus};
 use crate::connect::ServiceConnect;
@@ -14,17 +14,22 @@
 use fidl_fuchsia_pkg::{PackageResolverMarker, PackageResolverProxyInterface};
 use fidl_fuchsia_update::CheckNotStartedReason;
 use fidl_fuchsia_update_ext::{
-    CheckOptions, Initiator, InstallationErrorData, InstallingData, State, UpdateInfo,
+    CheckOptions, Initiator, InstallationErrorData, InstallationProgress, InstallingData, State,
+    UpdateInfo,
 };
 use fuchsia_async as fasync;
 use fuchsia_component::client::connect_to_service;
 use fuchsia_hash::Hash;
 use fuchsia_inspect as finspect;
 use fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn};
-use futures::channel::{mpsc, oneshot};
-use futures::future::BoxFuture;
-use futures::prelude::*;
-use futures::{pin_mut, select};
+use futures::{
+    channel::{mpsc, oneshot},
+    future::BoxFuture,
+    pin_mut,
+    prelude::*,
+    select,
+    stream::BoxStream,
+};
 use std::fs;
 use std::path::Path;
 use std::sync::Arc;
@@ -442,12 +447,12 @@
             }) => {
                 fx_log_info!("current system_image merkle: {}", current_system_image);
                 fx_log_info!("new system_image available: {}", latest_system_image);
+                let version_available = latest_system_image.to_string();
                 {
-                    co.yield_(StatusEvent::VersionAvailableKnown(latest_system_image.to_string()))
-                        .await;
+                    co.yield_(StatusEvent::VersionAvailableKnown(version_available.clone())).await;
                     co.yield_(StatusEvent::State(State::InstallingUpdate(InstallingData {
                         update: Some(UpdateInfo {
-                            version_available: Some(latest_system_image.to_string()),
+                            version_available: Some(version_available.clone()),
                             download_size: None,
                         }),
                         installation_progress: None,
@@ -457,33 +462,86 @@
 
                 self.last_update_storage.store(&latest_update_package);
 
-                if let Err(e) =
-                    self.update_applier.apply(initiator).await.context("apply_system_update failed")
+                match self
+                    .update_applier
+                    .apply(initiator)
+                    .await
+                    .context("apply_system_update failed")
                 {
-                    co.yield_(StatusEvent::State(State::InstallationError(
-                        InstallationErrorData {
-                            update: Some(UpdateInfo {
-                                version_available: Some(latest_system_image.to_string()),
-                                download_size: None,
-                            }),
-                            installation_progress: None,
-                        },
-                    )))
-                    .await;
-                    return Err(e);
-                };
-                // On success, system-updater will reboots the system when ready, so this code may
-                // or may not be run. The only way to leave WaitingForReboot state is to restart
-                // the component.
-                co.yield_(StatusEvent::State(State::WaitingForReboot(InstallingData {
-                    update: Some(UpdateInfo {
-                        version_available: Some(latest_system_image.to_string()),
-                        download_size: None,
-                    }),
-                    installation_progress: None,
-                })))
-                .await;
-                let () = future::pending().await;
+                    Ok(mut stream) => {
+                        let mut waiting_for_reboot = false;
+                        while let Some(result) = stream.next().await {
+                            match result {
+                                Ok(apply_state) => {
+                                    let state = match apply_state {
+                                        ApplyState::InstallingUpdate(ApplyProgress {
+                                            download_size,
+                                            fraction_completed,
+                                        }) => State::InstallingUpdate(InstallingData {
+                                            update: Some(UpdateInfo {
+                                                version_available: Some(version_available.clone()),
+                                                download_size,
+                                            }),
+                                            installation_progress: Some(InstallationProgress {
+                                                fraction_completed,
+                                            }),
+                                        }),
+                                        ApplyState::WaitingForReboot(ApplyProgress {
+                                            download_size,
+                                            fraction_completed,
+                                        }) => {
+                                            waiting_for_reboot = true;
+                                            State::WaitingForReboot(InstallingData {
+                                                update: Some(UpdateInfo {
+                                                    version_available: Some(
+                                                        version_available.clone(),
+                                                    ),
+                                                    download_size,
+                                                }),
+                                                installation_progress: Some(InstallationProgress {
+                                                    fraction_completed,
+                                                }),
+                                            })
+                                        }
+                                    };
+                                    co.yield_(StatusEvent::State(state)).await;
+                                }
+                                Err((ApplyProgress { download_size, fraction_completed }, e)) => {
+                                    // If we failed to unblock reboot, it will ends up here and we
+                                    // should not go back to InstallationError.
+                                    if !waiting_for_reboot {
+                                        co.yield_(StatusEvent::State(State::InstallationError(
+                                            InstallationErrorData {
+                                                update: Some(UpdateInfo {
+                                                    version_available: Some(version_available),
+                                                    download_size,
+                                                }),
+                                                installation_progress: Some(InstallationProgress {
+                                                    fraction_completed,
+                                                }),
+                                            },
+                                        )))
+                                        .await;
+                                    }
+                                    return Err(e);
+                                }
+                            }
+                        }
+                    }
+                    Err(e) => {
+                        co.yield_(StatusEvent::State(State::InstallationError(
+                            InstallationErrorData {
+                                update: Some(UpdateInfo {
+                                    version_available: Some(version_available),
+                                    download_size: None,
+                                }),
+                                installation_progress: None,
+                            },
+                        )))
+                        .await;
+                        return Err(e);
+                    }
+                }
             }
         }
         Ok(())
@@ -537,13 +595,25 @@
 
 // For mocking
 pub trait UpdateApplier: Send + Sync + 'static {
-    fn apply(&self, initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>>;
+    fn apply<'a>(
+        &self,
+        initiator: Initiator,
+    ) -> BoxFuture<
+        'a,
+        Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>,
+    >;
 }
 
 pub struct RealUpdateApplier;
 
 impl UpdateApplier for RealUpdateApplier {
-    fn apply(&self, initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>> {
+    fn apply<'a>(
+        &self,
+        initiator: Initiator,
+    ) -> BoxFuture<
+        'a,
+        Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>,
+    > {
         apply_system_update(initiator).boxed()
     }
 }
@@ -701,12 +771,24 @@
     #[derive(Clone)]
     pub struct UnreachableUpdateApplier;
     impl UpdateApplier for UnreachableUpdateApplier {
-        fn apply(&self, _initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>> {
+        fn apply<'a>(
+            &self,
+            _initiator: Initiator,
+        ) -> BoxFuture<
+            'a,
+            Result<
+                BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
+                anyhow::Error,
+            >,
+        > {
             unreachable!();
         }
     }
 
-    type ApplyResultFactory = fn() -> Result<(), crate::errors::Error>;
+    type ApplyResultFactory = fn() -> Result<
+        BoxStream<'static, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
+        crate::errors::Error,
+    >;
 
     #[derive(Clone)]
     pub struct FakeUpdateApplier {
@@ -715,7 +797,17 @@
     }
     impl FakeUpdateApplier {
         pub fn new_success() -> Self {
-            Self { result: || Ok(()), call_count: Arc::new(AtomicU64::new(0)) }
+            Self {
+                result: || {
+                    Ok(futures::stream::iter(vec![
+                        Ok(ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.42))),
+                        Ok(ApplyState::WaitingForReboot(ApplyProgress::new(1000, 1.0))),
+                    ])
+                    .chain(futures::stream::pending())
+                    .boxed())
+                },
+                call_count: Arc::new(AtomicU64::new(0)),
+            }
         }
         pub fn new_error() -> Self {
             Self {
@@ -728,7 +820,16 @@
         }
     }
     impl UpdateApplier for FakeUpdateApplier {
-        fn apply(&self, _initiator: Initiator) -> BoxFuture<'_, Result<(), anyhow::Error>> {
+        fn apply<'a>(
+            &self,
+            _initiator: Initiator,
+        ) -> BoxFuture<
+            'a,
+            Result<
+                BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
+                anyhow::Error,
+            >,
+        > {
             self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
             future::ready((self.result)().map_err(|e| e.into())).boxed()
         }
@@ -952,23 +1053,39 @@
         .await
         .spawn();
         let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver();
-        let expected_installing_data = InstallingData {
-            update: Some(UpdateInfo {
-                version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
-                download_size: None,
-            }),
-            installation_progress: None,
-        };
 
         let options = CheckOptions::builder().initiator(Initiator::User).build();
         manager.try_start_update(options, Some(callback)).await.unwrap();
 
         assert_eq!(
-            next_n_states(&mut receiver, 3).await,
+            next_n_states(&mut receiver, 4).await,
             vec![
                 State::CheckingForUpdates,
-                State::InstallingUpdate(expected_installing_data.clone()),
-                State::WaitingForReboot(expected_installing_data),
+                State::InstallingUpdate(InstallingData {
+                    update: Some(UpdateInfo {
+                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
+                        download_size: None,
+                    }),
+                    installation_progress: None
+                }),
+                State::InstallingUpdate(InstallingData {
+                    update: Some(UpdateInfo {
+                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
+                        download_size: Some(1000),
+                    }),
+                    installation_progress: Some(InstallationProgress {
+                        fraction_completed: Some(0.42)
+                    })
+                }),
+                State::WaitingForReboot(InstallingData {
+                    update: Some(UpdateInfo {
+                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
+                        download_size: Some(1000),
+                    }),
+                    installation_progress: Some(InstallationProgress {
+                        fraction_completed: Some(1.0)
+                    })
+                }),
             ]
         );
 
diff --git a/src/sys/pkg/tests/system-update-checker/BUILD.gn b/src/sys/pkg/tests/system-update-checker/BUILD.gn
index b99a2cf..f357518 100644
--- a/src/sys/pkg/tests/system-update-checker/BUILD.gn
+++ b/src/sys/pkg/tests/system-update-checker/BUILD.gn
@@ -14,13 +14,16 @@
     "//sdk/fidl/fuchsia.logger:fuchsia.logger-rustc",
     "//sdk/fidl/fuchsia.paver:fuchsia.paver-rustc",
     "//sdk/fidl/fuchsia.update:fuchsia.update-rustc",
-    "//sdk/fidl/fuchsia.update:fuchsia.update-rustc",
     "//sdk/fidl/fuchsia.update.channel:fuchsia.update.channel-rustc",
     "//src/lib/fidl/rust/fidl",
     "//src/lib/fuchsia-async",
     "//src/lib/fuchsia-component",
     "//src/lib/zircon/rust:fuchsia-zircon",
+    "//src/sys/pkg/lib/fidl-fuchsia-update-installer-ext",
+    "//src/sys/pkg/lib/fuchsia-pkg-testing",
+    "//src/sys/pkg/testing/mock-installer",
     "//src/sys/pkg/testing/mock-paver",
+    "//src/sys/pkg/testing/mock-resolver",
     "//third_party/rust_crates:futures",
     "//third_party/rust_crates:parking_lot",
     "//third_party/rust_crates:tempfile",
@@ -55,6 +58,9 @@
       name = "system_update_checker_integration_test"
       dest = "system-update-checker-integration-test"
       environments = basic_envs
+      log_settings = {
+        max_severity = "ERROR"
+      }
     },
   ]
 }
diff --git a/src/sys/pkg/tests/system-update-checker/src/lib.rs b/src/sys/pkg/tests/system-update-checker/src/lib.rs
index 20a0025..fd6f8c5f 100644
--- a/src/sys/pkg/tests/system-update-checker/src/lib.rs
+++ b/src/sys/pkg/tests/system-update-checker/src/lib.rs
@@ -5,16 +5,23 @@
 #![cfg(test)]
 use {
     fidl_fuchsia_paver::PaverRequestStream,
-    fidl_fuchsia_update::{ManagerMarker, ManagerProxy, MonitorRequest, State},
+    fidl_fuchsia_update::{
+        CheckOptions, CheckingForUpdatesData, Initiator, InstallationProgress, InstallingData,
+        ManagerMarker, ManagerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, State,
+        UpdateInfo,
+    },
     fidl_fuchsia_update_channel::{ProviderMarker, ProviderProxy},
-    fuchsia_async as fasync,
+    fidl_fuchsia_update_installer_ext as installer, fuchsia_async as fasync,
     fuchsia_component::{
         client::{App, AppBuilder},
         server::{NestedEnvironment, ServiceFs},
     },
+    fuchsia_pkg_testing::make_packages_json,
     fuchsia_zircon::Status,
     futures::{channel::mpsc, prelude::*},
+    mock_installer::MockUpdateInstallerService,
     mock_paver::{MockPaverService, MockPaverServiceBuilder, PaverEvent},
+    mock_resolver::MockResolverService,
     std::{fs::File, sync::Arc},
     tempfile::TempDir,
 };
@@ -23,45 +30,68 @@
 
 struct Mounts {
     misc_ota: TempDir,
+    pkgfs_system: TempDir,
 }
 
 struct Proxies {
     _paver: Arc<MockPaverService>,
     paver_events: mpsc::UnboundedReceiver<PaverEvent>,
+    resolver: Arc<MockResolverService>,
     channel_provider: ProviderProxy,
     update_manager: ManagerProxy,
 }
 
 impl Mounts {
     fn new() -> Self {
-        Self { misc_ota: tempfile::tempdir().expect("/tmp to exist") }
+        Self {
+            misc_ota: tempfile::tempdir().expect("/tmp to exist"),
+            pkgfs_system: tempfile::tempdir().expect("/tmp to exist"),
+        }
     }
 }
 
-struct TestEnv {
-    _env: NestedEnvironment,
-    _mounts: Mounts,
-    proxies: Proxies,
-    _system_update_checker: App,
+struct TestEnvBuilder {
+    paver_builder: MockPaverServiceBuilder,
+    paver_events: mpsc::UnboundedReceiver<PaverEvent>,
+    installer: MockUpdateInstallerService,
 }
 
-impl TestEnv {
-    fn new<F>(paver_init: F) -> Self
-    where
-        F: FnOnce(MockPaverServiceBuilder) -> MockPaverServiceBuilder,
-    {
+impl TestEnvBuilder {
+    fn new() -> Self {
         let (events_tx, events_rx) = mpsc::unbounded();
         let paver_builder = MockPaverServiceBuilder::new().event_hook(move |event| {
             events_tx.unbounded_send(event.to_owned()).expect("to write to events channel")
         });
+        Self {
+            paver_builder,
+            paver_events: events_rx,
+            installer: MockUpdateInstallerService::builder().build(),
+        }
+    }
 
-        let paver = paver_init(paver_builder).build();
+    fn paver_init<F>(self, paver_init: F) -> Self
+    where
+        F: FnOnce(MockPaverServiceBuilder) -> MockPaverServiceBuilder,
+    {
+        Self { paver_builder: paver_init(self.paver_builder), ..self }
+    }
 
+    fn installer(self, installer: MockUpdateInstallerService) -> Self {
+        Self { installer, ..self }
+    }
+
+    fn build(self) -> TestEnv {
         let mounts = Mounts::new();
+        std::fs::write(
+            mounts.pkgfs_system.path().join("meta"),
+            "0000000000000000000000000000000000000000000000000000000000000001",
+        )
+        .expect("write pkgfs/system/meta");
 
         let mut fs = ServiceFs::new();
         fs.add_proxy_service::<fidl_fuchsia_logger::LogSinkMarker, _>();
 
+        let paver = self.paver_builder.build();
         let paver = Arc::new(paver);
         let paver_clone = Arc::clone(&paver);
         fs.add_fidl_service(move |stream: PaverRequestStream| {
@@ -73,6 +103,23 @@
             .detach();
         });
 
+        let resolver = Arc::new(MockResolverService::new(None));
+        let resolver_clone = Arc::clone(&resolver);
+        fs.add_fidl_service(move |stream| {
+            fasync::Task::spawn(
+                Arc::clone(&resolver_clone)
+                    .run_resolver_service(stream)
+                    .unwrap_or_else(|e| panic!("error running resolver service {:?}", e)),
+            )
+            .detach()
+        });
+
+        let installer = Arc::new(self.installer);
+        let installer_clone = Arc::clone(&installer);
+        fs.add_fidl_service(move |stream| {
+            fasync::Task::spawn(Arc::clone(&installer_clone).run_service(stream)).detach()
+        });
+
         let env = fs
             .create_salted_nested_environment("system-update-checker_integration_test_env")
             .expect("nested environment to create successfully");
@@ -84,15 +131,21 @@
                 File::open(mounts.misc_ota.path()).expect("/misc/ota tempdir to open"),
             )
             .expect("/misc/ota to mount")
+            .add_dir_to_namespace(
+                "/pkgfs/system".to_string(),
+                File::open(mounts.pkgfs_system.path()).expect("/pkgfs/system tempdir to open"),
+            )
+            .expect("/pkgfs/system to mount")
             .spawn(env.launcher())
             .expect("system_update_checker to launch");
 
-        Self {
+        TestEnv {
             _env: env,
             _mounts: mounts,
             proxies: Proxies {
                 _paver: paver,
-                paver_events: events_rx,
+                paver_events: self.paver_events,
+                resolver,
                 channel_provider: system_update_checker
                     .connect_to_service::<ProviderMarker>()
                     .expect("connect to channel provider"),
@@ -105,10 +158,58 @@
     }
 }
 
+struct TestEnv {
+    _env: NestedEnvironment,
+    _mounts: Mounts,
+    proxies: Proxies,
+    _system_update_checker: App,
+}
+
+impl TestEnv {
+    async fn check_now(&self) -> MonitorRequestStream {
+        let options = CheckOptions {
+            initiator: Some(Initiator::User),
+            allow_attaching_to_existing_update_check: Some(false),
+        };
+        let (client_end, stream) =
+            fidl::endpoints::create_request_stream::<MonitorMarker>().unwrap();
+        self.proxies
+            .update_manager
+            .check_now(options, Some(client_end))
+            .await
+            .expect("make check_now call")
+            .expect("check started");
+        stream
+    }
+}
+
+async fn expect_states(stream: &mut MonitorRequestStream, expected_states: &[State]) {
+    for expected_state in expected_states {
+        let MonitorRequest::OnState { state, responder } =
+            stream.try_next().await.unwrap().unwrap();
+        assert_eq!(&state, expected_state);
+        responder.send().unwrap();
+    }
+}
+
+fn update_info(download_size: Option<u64>) -> Option<UpdateInfo> {
+    Some(UpdateInfo {
+        version_available: Some(
+            "beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdead".to_string(),
+        ),
+        download_size,
+    })
+}
+
+fn progress(fraction_completed: Option<f32>) -> Option<InstallationProgress> {
+    Some(InstallationProgress { fraction_completed })
+}
+
 #[fasync::run_singlethreaded(test)]
 // Test will hang if system-update-checker does not call paver service
 async fn test_calls_paver_service() {
-    let mut env = TestEnv::new(|p| p);
+    let mut env = TestEnvBuilder::new().build();
+
     assert_eq!(
         env.proxies.paver_events.next().await,
         Some(PaverEvent::SetActiveConfigurationHealthy)
@@ -118,7 +219,7 @@
 #[fasync::run_singlethreaded(test)]
 // Test will hang if system-update-checker does not call paver service
 async fn test_channel_provider_get_current_works_after_paver_service_fails() {
-    let mut env = TestEnv::new(|p| p.call_hook(|_| Status::INTERNAL));
+    let mut env = TestEnvBuilder::new().paver_init(|p| p.call_hook(|_| Status::INTERNAL)).build();
 
     assert_eq!(
         env.proxies.paver_events.next().await,
@@ -134,7 +235,7 @@
 #[fasync::run_singlethreaded(test)]
 // Test will hang if system-update-checker does not call paver service
 async fn test_update_manager_check_now_works_after_paver_service_fails() {
-    let mut env = TestEnv::new(|p| p.call_hook(|_| Status::INTERNAL));
+    let mut env = TestEnvBuilder::new().paver_init(|p| p.call_hook(|_| Status::INTERNAL)).build();
 
     assert_eq!(
         env.proxies.paver_events.next().await,
@@ -171,3 +272,105 @@
         ]
     );
 }
+
+#[fasync::run_singlethreaded(test)]
+async fn test_update_manager_progress() {
+    let (mut sender, receiver) = mpsc::channel(0);
+    let installer = MockUpdateInstallerService::builder().states_receiver(receiver).build();
+    let env = TestEnvBuilder::new().installer(installer).build();
+
+    env.proxies.resolver.url("fuchsia-pkg://fuchsia.com/update/0").resolve(
+        &env.proxies
+            .resolver
+            .package("update", "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef")
+            .add_file(
+                "packages.json",
+                make_packages_json(["fuchsia-pkg://fuchsia.com/system_image/0?hash=beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdead"]),
+            )
+            .add_file("zbi", "fake zbi"),
+    );
+    env.proxies
+        .resolver.url("fuchsia-pkg://fuchsia.com/system_image/0?hash=beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdead")
+        .resolve(
+        &env.proxies
+            .resolver
+            .package("system_image", "beefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeada")
+    );
+
+    let mut stream = env.check_now().await;
+
+    expect_states(
+        &mut stream,
+        &[
+            State::CheckingForUpdates(CheckingForUpdatesData {}),
+            State::InstallingUpdate(InstallingData {
+                update: update_info(None),
+                installation_progress: None,
+            }),
+        ],
+    )
+    .await;
+    sender.send(installer::State::Prepare).await.unwrap();
+    expect_states(
+        &mut stream,
+        &[State::InstallingUpdate(InstallingData {
+            update: update_info(None),
+            installation_progress: progress(None),
+        })],
+    )
+    .await;
+    let installer_update_info = installer::UpdateInfo::builder().download_size(1000).build();
+    sender
+        .send(installer::State::Fetch(
+            installer::UpdateInfoAndProgress::new(
+                installer_update_info,
+                installer::Progress::none(),
+            )
+            .unwrap(),
+        ))
+        .await
+        .unwrap();
+    expect_states(
+        &mut stream,
+        &[State::InstallingUpdate(InstallingData {
+            update: update_info(Some(1000)),
+            installation_progress: progress(Some(0.0)),
+        })],
+    )
+    .await;
+    sender
+        .send(installer::State::Stage(
+            installer::UpdateInfoAndProgress::new(
+                installer_update_info,
+                installer::Progress::builder()
+                    .fraction_completed(0.5)
+                    .bytes_downloaded(500)
+                    .build(),
+            )
+            .unwrap(),
+        ))
+        .await
+        .unwrap();
+    expect_states(
+        &mut stream,
+        &[State::InstallingUpdate(InstallingData {
+            update: update_info(Some(1000)),
+            installation_progress: progress(Some(0.5)),
+        })],
+    )
+    .await;
+    sender
+        .send(installer::State::WaitToReboot(installer::UpdateInfoAndProgress::done(
+            installer_update_info,
+        )))
+        .await
+        .unwrap();
+    expect_states(
+        &mut stream,
+        &[State::WaitingForReboot(InstallingData {
+            update: update_info(Some(1000)),
+            installation_progress: progress(Some(1.0)),
+        })],
+    )
+    .await;
+}