[update] implement `update wait-for-commit`

This change adds the `wait-for-commit` subcommand to to the update CLI
tool. In a future CL, we'll call this in `fx ota`.

Fixed: 64593
Test: fx test -o update-lib-tests
Test: fx test -o update-integration-tests
Test: temporarily added a 40 second timer to the `wait_for_commit` fn, observed
both warning and ending log.

MULTIPLY: update-lib-tests

Change-Id: I76bb34e9cfe02d1f7aae5a75aa1c3712c7ce2234
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/458219
Commit-Queue: Zach Kirschenbaum <zkbaum@google.com>
Reviewed-by: Ben Keller <galbanum@google.com>
Testability-Review: Ben Keller <galbanum@google.com>
diff --git a/src/sys/pkg/bin/update/BUILD.gn b/src/sys/pkg/bin/update/BUILD.gn
index b796b7c..d4ac991 100644
--- a/src/sys/pkg/bin/update/BUILD.gn
+++ b/src/sys/pkg/bin/update/BUILD.gn
@@ -19,6 +19,7 @@
     "//src/lib/fuchsia-async",
     "//src/lib/fuchsia-component",
     "//src/lib/fuchsia-url",
+    "//src/lib/zircon/rust:fuchsia-zircon",
     "//src/sys/pkg/fidl/fuchsia.update.installer:fuchsia.update.installer-rustc",
     "//src/sys/pkg/lib/fidl-fuchsia-update-ext",
     "//src/sys/pkg/lib/fidl-fuchsia-update-installer-ext",
@@ -27,7 +28,10 @@
     "//third_party/rust_crates:futures",
   ]
 
-  test_deps = [ "//third_party/rust_crates:matches" ]
+  test_deps = [
+    "//third_party/rust_crates:matches",
+    "//third_party/rust_crates:parking_lot",
+  ]
 
   source_root = "src/main.rs"
   sources = [
diff --git a/src/sys/pkg/bin/update/meta/update.cmx b/src/sys/pkg/bin/update/meta/update.cmx
index 51c43df..0056948 100644
--- a/src/sys/pkg/bin/update/meta/update.cmx
+++ b/src/sys/pkg/bin/update/meta/update.cmx
@@ -7,6 +7,7 @@
     },
     "sandbox": {
         "services": [
+            "fuchsia.update.CommitStatusProvider",
             "fuchsia.update.Manager",
             "fuchsia.update.channelcontrol.ChannelControl",
             "fuchsia.update.installer.Installer"
diff --git a/src/sys/pkg/bin/update/src/args.rs b/src/sys/pkg/bin/update/src/args.rs
index 3549e89..b09b5f2 100644
--- a/src/sys/pkg/bin/update/src/args.rs
+++ b/src/sys/pkg/bin/update/src/args.rs
@@ -22,6 +22,9 @@
 
     // fuchsia.update.installer protocol:
     ForceInstall(ForceInstall),
+
+    // fuchsia.update CommitStatusProvider protocol:
+    WaitForCommit(WaitForCommit),
 }
 
 #[derive(Debug, Eq, FromArgs, PartialEq)]
@@ -99,6 +102,11 @@
     pub service_initiated: bool,
 }
 
+#[derive(Debug, Eq, FromArgs, PartialEq)]
+#[argh(subcommand, name = "wait-for-commit")]
+/// Wait for the update to be committed.
+pub struct WaitForCommit {}
+
 #[cfg(test)]
 mod tests {
     use {super::*, matches::assert_matches};
@@ -242,4 +250,10 @@
             }
         );
     }
+
+    #[test]
+    fn test_wait_for_commit() {
+        let update = Update::from_args(&["update"], &["wait-for-commit"]).unwrap();
+        assert_eq!(update, Update { cmd: Command::WaitForCommit(WaitForCommit {}) });
+    }
 }
diff --git a/src/sys/pkg/bin/update/src/main.rs b/src/sys/pkg/bin/update/src/main.rs
index 6541d90..72eec6f 100644
--- a/src/sys/pkg/bin/update/src/main.rs
+++ b/src/sys/pkg/bin/update/src/main.rs
@@ -5,8 +5,8 @@
 use {
     anyhow::{anyhow, Context as _, Error},
     fidl_fuchsia_update::{
-        CheckOptions, Initiator, ManagerMarker, ManagerProxy, MonitorMarker, MonitorRequest,
-        MonitorRequestStream,
+        CheckOptions, CommitStatusProviderMarker, CommitStatusProviderProxy, Initiator,
+        ManagerMarker, ManagerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream,
     },
     fidl_fuchsia_update_channelcontrol::{ChannelControlMarker, ChannelControlProxy},
     fidl_fuchsia_update_ext::State,
@@ -15,11 +15,15 @@
     fuchsia_async as fasync,
     fuchsia_component::client::connect_to_service,
     fuchsia_url::pkg_url::PkgUrl,
-    futures::prelude::*,
+    fuchsia_zircon as zx,
+    futures::{future::FusedFuture, prelude::*},
+    std::time::Duration,
 };
 
 mod args;
 
+const WARNING_DURATION: Duration = Duration::from_secs(30);
+
 fn print_state(state: &State) {
     println!("State: {:?}", state);
 }
@@ -151,6 +155,77 @@
     Err(anyhow!("Installation ended unexpectedly"))
 }
 
+/// The set of events associated with the `wait-for-commit` path.
+#[derive(Debug, PartialEq)]
+enum CommitEvent {
+    Begin,
+    Warning,
+    End,
+}
+
+/// An observer of `update wait-for-commit`.
+trait CommitObserver {
+    fn on_event(&self, event: CommitEvent);
+}
+
+/// A `CommitObserver` that forwards the events to stdout.
+struct Printer;
+impl CommitObserver for Printer {
+    fn on_event(&self, event: CommitEvent) {
+        let text = match event {
+            CommitEvent::Begin => "Waiting for commit.",
+            // TODO(fxbug.dev/64590) update warning message to be more helpful.
+            CommitEvent::Warning => "It's been 30 seconds. Something is probably wrong.",
+            CommitEvent::End => "Committed!",
+        };
+        println!("{}", text);
+    }
+}
+
+/// Waits for the system to commit (e.g. when the EventPair observes a signal).
+async fn wait_for_commit(proxy: &CommitStatusProviderProxy) -> Result<(), Error> {
+    let p = proxy.is_current_system_committed().await.context("while obtaining EventPair")?;
+    fasync::OnSignals::new(&p, zx::Signals::USER_0)
+        .await
+        .context("while waiting for the commit")?;
+    Ok(())
+}
+
+/// Waits for the commit and sends updates to the observer. This is abstracted from the regular
+/// `handle_wait_for_commit` fn so we can test events without having to wait the `WARNING_DURATION`.
+/// The [testability rubric](https://fuchsia.dev/fuchsia-src/concepts/testing/testability_rubric)
+/// exempts logs from testing, but in this case we test them anyway because of the additional layer
+/// of complexity that the warning timeout introduces.
+async fn handle_wait_for_commit_impl(
+    proxy: &CommitStatusProviderProxy,
+    observer: impl CommitObserver,
+) -> Result<(), Error> {
+    let () = observer.on_event(CommitEvent::Begin);
+
+    let commit_fut = wait_for_commit(&proxy).fuse();
+    futures::pin_mut!(commit_fut);
+    let mut timer_fut = fasync::Timer::new(WARNING_DURATION).fuse();
+
+    // Send a warning after the WARNING_DURATION.
+    let () = futures::select! {
+        commit_res = commit_fut => commit_res?,
+        _ = timer_fut => observer.on_event(CommitEvent::Warning),
+    };
+
+    // If we timed out on WARNING_DURATION, try again.
+    if !commit_fut.is_terminated() {
+        let () = commit_fut.await.context("while calling wait_for_commit second")?;
+    }
+
+    let () = observer.on_event(CommitEvent::End);
+    Ok(())
+}
+
+/// Waits for the commit and prints updates to stdout.
+async fn handle_wait_for_commit(proxy: &CommitStatusProviderProxy) -> Result<(), Error> {
+    handle_wait_for_commit_impl(&proxy, Printer).await
+}
+
 async fn handle_cmd(cmd: args::Command) -> Result<(), Error> {
     match cmd {
         args::Command::Channel(args::Channel { cmd }) => {
@@ -167,6 +242,11 @@
         args::Command::ForceInstall(args) => {
             force_install(args.update_pkg_url, args.reboot, args.service_initiated).await?;
         }
+        args::Command::WaitForCommit(_) => {
+            let proxy = connect_to_service::<CommitStatusProviderMarker>()
+                .context("while connecting to fuchsia.update/CommitStatusProvider")?;
+            handle_wait_for_commit(&proxy).await?;
+        }
     }
     Ok(())
 }
@@ -179,10 +259,16 @@
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use fidl::endpoints::create_proxy_and_stream;
-    use fidl_fuchsia_update_channelcontrol::ChannelControlRequest;
-    use matches::assert_matches;
+    use {
+        super::*,
+        fidl::endpoints::create_proxy_and_stream,
+        fidl_fuchsia_update::CommitStatusProviderRequest,
+        fidl_fuchsia_update_channelcontrol::ChannelControlRequest,
+        fuchsia_zircon::{DurationNum, EventPair, HandleBased, Peered},
+        futures::{pin_mut, task::Poll},
+        matches::assert_matches,
+        parking_lot::Mutex,
+    };
 
     async fn perform_channel_control_test<V>(argument: args::channel::Command, verifier: V)
     where
@@ -256,4 +342,78 @@
         })
         .await;
     }
+
+    struct TestObserver {
+        events: Mutex<Vec<CommitEvent>>,
+    }
+    impl TestObserver {
+        fn new() -> Self {
+            Self { events: Mutex::new(vec![]) }
+        }
+        fn assert_events(&self, expected_events: &[CommitEvent]) {
+            assert_eq!(self.events.lock().as_slice(), expected_events);
+        }
+    }
+    impl CommitObserver for &TestObserver {
+        fn on_event(&self, event: CommitEvent) {
+            self.events.lock().push(event);
+        }
+    }
+
+    #[test]
+    fn wait_for_commit() {
+        let mut executor = fasync::Executor::new_with_fake_time().unwrap();
+
+        let (proxy, mut stream) =
+            fidl::endpoints::create_proxy_and_stream::<CommitStatusProviderMarker>().unwrap();
+        let (p, p_stream) = EventPair::create().unwrap();
+        fasync::Task::spawn(async move {
+            while let Some(req) = stream.try_next().await.unwrap() {
+                let CommitStatusProviderRequest::IsCurrentSystemCommitted { responder } = req;
+                let pair = p_stream.duplicate_handle(zx::Rights::BASIC).unwrap();
+                let () = responder.send(pair).unwrap();
+            }
+        })
+        .detach();
+
+        let observer = TestObserver::new();
+
+        let fut = handle_wait_for_commit_impl(&proxy, &observer);
+        pin_mut!(fut);
+
+        // Begin the `wait_for_commit`.
+        match executor.run_until_stalled(&mut fut) {
+            Poll::Ready(res) => panic!("future unexpectedly completed with: {:?}", res),
+            Poll::Pending => (),
+        };
+        observer.assert_events(&[CommitEvent::Begin]);
+
+        // We should observe no new events when both the system is not committed and we are within
+        // the warning duration.
+        executor
+            .set_fake_time(fasync::Time::after((WARNING_DURATION - Duration::from_secs(1)).into()));
+        assert!(!executor.wake_expired_timers());
+        match executor.run_until_stalled(&mut fut) {
+            Poll::Ready(res) => panic!("future unexpectedly completed with: {:?}", res),
+            Poll::Pending => (),
+        };
+        observer.assert_events(&[CommitEvent::Begin]);
+
+        // Once we hit the warning duration, we should get a warning event.
+        executor.set_fake_time(fasync::Time::after(1.seconds()));
+        assert!(executor.wake_expired_timers());
+        match executor.run_until_stalled(&mut fut) {
+            Poll::Ready(res) => panic!("future unexpectedly completed with: {:?}", res),
+            Poll::Pending => (),
+        };
+        observer.assert_events(&[CommitEvent::Begin, CommitEvent::Warning]);
+
+        // Once we get the commit signal, the future should complete.
+        let () = p.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
+        match executor.run_until_stalled(&mut fut) {
+            Poll::Ready(res) => res.unwrap(),
+            Poll::Pending => panic!("future unexpectedly pending"),
+        };
+        observer.assert_events(&[CommitEvent::Begin, CommitEvent::Warning, CommitEvent::End]);
+    }
 }
diff --git a/src/sys/pkg/tests/update/BUILD.gn b/src/sys/pkg/tests/update/BUILD.gn
index 3abb0f0..a25aa6d 100644
--- a/src/sys/pkg/tests/update/BUILD.gn
+++ b/src/sys/pkg/tests/update/BUILD.gn
@@ -16,6 +16,7 @@
     "//sdk/fidl/fuchsia.update:fuchsia.update-rustc",
     "//src/lib/fuchsia-async",
     "//src/lib/fuchsia-component",
+    "//src/lib/zircon/rust:fuchsia-zircon",
     "//src/sys/pkg/fidl/fuchsia.update.installer:fuchsia.update.installer-rustc",
     "//src/sys/pkg/lib/fidl-fuchsia-update-ext",
     "//src/sys/pkg/lib/fidl-fuchsia-update-installer-ext",
diff --git a/src/sys/pkg/tests/update/src/lib.rs b/src/sys/pkg/tests/update/src/lib.rs
index 3da71eb..b6e19f9 100644
--- a/src/sys/pkg/tests/update/src/lib.rs
+++ b/src/sys/pkg/tests/update/src/lib.rs
@@ -16,6 +16,7 @@
         client::{AppBuilder, Output},
         server::{NestedEnvironment, ServiceFs},
     },
+    fuchsia_zircon::{self as zx, EventPair, HandleBased, Peered},
     futures::prelude::*,
     matches::assert_matches,
     mock_installer::{
@@ -26,10 +27,22 @@
     std::sync::Arc,
 };
 
+async fn run_commit_status_provider_service(
+    mut stream: fidl_update::CommitStatusProviderRequestStream,
+    p: Arc<EventPair>,
+) {
+    while let Some(req) = stream.try_next().await.unwrap() {
+        let fidl_update::CommitStatusProviderRequest::IsCurrentSystemCommitted { responder } = req;
+        let pair = p.duplicate_handle(zx::Rights::BASIC).unwrap();
+        let () = responder.send(pair).unwrap();
+    }
+}
+
 #[derive(Default)]
 struct TestEnvBuilder {
     manager_states: Vec<State>,
     installer_states: Vec<installer::State>,
+    commit_status_provider_response: Option<EventPair>,
 }
 
 impl TestEnvBuilder {
@@ -41,6 +54,10 @@
         Self { installer_states, ..self }
     }
 
+    fn commit_status_provider_response(self, response: EventPair) -> Self {
+        Self { commit_status_provider_response: Some(response), ..self }
+    }
+
     fn build(self) -> TestEnv {
         let mut fs = ServiceFs::new();
 
@@ -57,6 +74,17 @@
             fasync::Task::spawn(Arc::clone(&update_installer_clone).run_service(stream)).detach()
         });
 
+        if let Some(response) = self.commit_status_provider_response {
+            let response = Arc::new(response);
+            fs.add_fidl_service(move |stream| {
+                fasync::Task::spawn(run_commit_status_provider_service(
+                    stream,
+                    Arc::clone(&response),
+                ))
+                .detach()
+            });
+        }
+
         let env = fs
             .create_salted_nested_environment("update_env")
             .expect("nested environment to create successfully");
@@ -519,3 +547,21 @@
         monitor_present: true,
     }]);
 }
+
+#[fasync::run_singlethreaded(test)]
+async fn wait_for_commit_success() {
+    let (p0, p1) = EventPair::create().unwrap();
+    let env = TestEnv::builder().commit_status_provider_response(p1).build();
+
+    let () = p0.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
+
+    let output = env.run_update(vec!["wait-for-commit"]).await;
+
+    assert_output(
+        &output,
+        "Waiting for commit.\n\
+         Committed!\n",
+        "",
+        0,
+    );
+}