[channel] Accept target_channel as current_channel

If the system_image has been verified as being up-to-date, then it is
safe to assume that the target_channel is actually our current_channel.

Bug: 35299
Change-Id: I6784c3c2061a65b18840089d71b1e14fdf05b465
No-Tree-Checks: true
diff --git a/garnet/bin/system-update-checker/src/channel.rs b/garnet/bin/system-update-checker/src/channel.rs
index e682053..fb92777 100644
--- a/garnet/bin/system-update-checker/src/channel.rs
+++ b/garnet/bin/system-update-checker/src/channel.rs
@@ -12,6 +12,12 @@
 use fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn};
 use fuchsia_url::pkg_url::PkgUrl;
 use fuchsia_zircon as zx;
+use futures::{
+    channel::mpsc,
+    future::{self, Either, FutureExt},
+    sink::SinkExt,
+    stream::StreamExt,
+};
 use serde_derive::{Deserialize, Serialize};
 use serde_json;
 use std::fs;
@@ -19,14 +25,15 @@
 use std::path::{Path, PathBuf};
 use std::time::Duration;
 
-pub struct CurrentChannelNotifier<S = ServiceConnector> {
-    service_connector: S,
-    current_channel: String,
-}
+static CURRENT_CHANNEL: &'static str = "current_channel.json";
+static TARGET_CHANNEL: &'static str = "target_channel.json";
 
-impl<S: ServiceConnect> CurrentChannelNotifier<S> {
-    pub fn new(service_connector: S, dir: impl AsRef<Path>) -> Self {
-        let current_channel = read_current_channel(dir.as_ref()).unwrap_or_else(|err| {
+pub fn build_current_channel_manager_and_notifier<S: ServiceConnect>(
+    service_connector: S,
+    dir: impl Into<PathBuf>,
+) -> Result<(CurrentChannelManager, CurrentChannelNotifier<S>), failure::Error> {
+    let path = dir.into();
+    let current_channel = read_current_channel(path.as_ref()).unwrap_or_else(|err| {
             fx_log_err!(
                 "Error reading current_channel, defaulting to the empty string. This is expected before the first OTA. {}",
                 err
@@ -34,57 +41,135 @@
             String::new()
         });
 
-        CurrentChannelNotifier { service_connector, current_channel }
+    let (channel_sender, channel_receiver) = mpsc::channel(100);
+
+    Ok((
+        CurrentChannelManager::new(path, channel_sender),
+        CurrentChannelNotifier::new(service_connector, current_channel, channel_receiver),
+    ))
+}
+
+pub struct CurrentChannelNotifier<S = ServiceConnector> {
+    service_connector: S,
+    initial_channel: String,
+    channel_receiver: mpsc::Receiver<String>,
+}
+
+impl<S: ServiceConnect> CurrentChannelNotifier<S> {
+    fn new(
+        service_connector: S,
+        initial_channel: String,
+        channel_receiver: mpsc::Receiver<String>,
+    ) -> Self {
+        CurrentChannelNotifier { service_connector, initial_channel, channel_receiver }
     }
 
-    pub async fn run(self) {
+    async fn notify_cobalt(service_connector: &S, current_channel: String) {
         loop {
-            let cobalt = self.connect().await;
+            let cobalt = Self::connect(service_connector).await;
 
-            fx_log_info!("calling cobalt.SetChannel(\"{}\")", self.current_channel);
+            fx_log_info!("calling cobalt.SetChannel(\"{}\")", current_channel);
 
-            match cobalt.set_channel(&self.current_channel).await {
+            match cobalt.set_channel(&current_channel).await {
                 Ok(CobaltStatus::Ok) => {
-                    break;
+                    return;
                 }
                 Ok(CobaltStatus::EventTooBig) => {
                     fx_log_warn!("cobalt.SetChannel returned Status.EVENT_TOO_BIG, retrying");
-                    self.sleep().await;
                 }
                 Ok(status) => {
                     // Not much we can do about the other status codes but log.
                     fx_log_err!("cobalt.SetChannel returned non-OK status: {:?}", status);
-                    break;
+                    return;
                 }
                 Err(err) => {
                     // channel broken, so log the error and reconnect.
                     fx_log_warn!("cobalt.SetChannel returned error: {}, retrying", err);
-                    self.sleep().await;
+                }
+            }
+
+            Self::sleep().await;
+        }
+    }
+
+    pub async fn run(self) {
+        let Self { service_connector, initial_channel, mut channel_receiver } = self;
+        let mut notify_cobalt_task =
+            Self::notify_cobalt(&service_connector, initial_channel).boxed();
+
+        loop {
+            match future::select(channel_receiver.next(), notify_cobalt_task).await {
+                Either::Left((Some(current_channel), _)) => {
+                    fx_log_warn!(
+                        "notify_cobalt() overrun. Starting again with new channel: `{}`",
+                        current_channel
+                    );
+                    notify_cobalt_task =
+                        Self::notify_cobalt(&service_connector, current_channel).boxed();
+                }
+                Either::Left((None, notify_cobalt_future)) => {
+                    fx_log_warn!(
+                        "all channel_senders have been closed. No new messages will arrive."
+                    );
+                    notify_cobalt_future.await;
+                    return;
+                }
+                Either::Right((_, next_channel_fut)) => {
+                    if let Some(current_channel) = next_channel_fut.await {
+                        notify_cobalt_task =
+                            Self::notify_cobalt(&service_connector, current_channel).boxed();
+                    } else {
+                        fx_log_warn!(
+                            "all channel_senders have been closed. No new messages will arrive."
+                        );
+                        return;
+                    }
                 }
             }
         }
     }
 
-    async fn connect(&self) -> SystemDataUpdaterProxy {
+    async fn connect(service_connector: &S) -> SystemDataUpdaterProxy {
         loop {
-            match self.service_connector.connect_to_service::<SystemDataUpdaterMarker>() {
+            match service_connector.connect_to_service::<SystemDataUpdaterMarker>() {
                 Ok(cobalt) => {
                     return cobalt;
                 }
                 Err(err) => {
                     fx_log_err!("error connecting to cobalt: {}", err);
-                    self.sleep().await
+                    Self::sleep().await
                 }
             }
         }
     }
 
-    async fn sleep(&self) {
+    async fn sleep() {
         let delay = fasync::Time::after(Duration::from_secs(5).into());
         fasync::Timer::new(delay).await;
     }
 }
 
+#[derive(Clone)]
+pub struct CurrentChannelManager {
+    path: PathBuf,
+    channel_sender: mpsc::Sender<String>,
+}
+
+impl CurrentChannelManager {
+    fn new(path: PathBuf, channel_sender: mpsc::Sender<String>) -> Self {
+        CurrentChannelManager { path, channel_sender }
+    }
+
+    pub async fn update(&mut self) -> Result<(), failure::Error> {
+        let target_channel = read_channel(&self.path.join(TARGET_CHANNEL))?;
+        if target_channel != read_current_channel(&self.path).ok().unwrap_or_else(String::new) {
+            write_channel(&self.path.join(CURRENT_CHANNEL), &target_channel)?;
+            self.channel_sender.send(target_channel).await?;
+        }
+        Ok(())
+    }
+}
+
 pub struct TargetChannelManager<S = ServiceConnector> {
     service_connector: S,
     path: PathBuf,
@@ -94,7 +179,7 @@
 impl<S: ServiceConnect> TargetChannelManager<S> {
     pub fn new(service_connector: S, dir: impl Into<PathBuf>) -> Self {
         let mut path = dir.into();
-        path.push("target_channel.json");
+        path.push(TARGET_CHANNEL);
         let target_channel = read_channel(&path).ok();
 
         Self { service_connector, path, target_channel }
@@ -132,7 +217,7 @@
 }
 
 fn read_current_channel(p: &Path) -> Result<String, Error> {
-    read_channel(p.join("current_channel.json"))
+    read_channel(p.join(CURRENT_CHANNEL))
 }
 
 fn read_channel(path: impl AsRef<Path>) -> Result<String, Error> {
@@ -230,7 +315,7 @@
         let dir = tempfile::tempdir().unwrap();
 
         fs::write(
-            dir.path().join("current_channel.json"),
+            dir.path().join(CURRENT_CHANNEL),
             r#"{"version":"1","content":{"legacy_amber_source_name":"stable"}}"#,
         )
         .unwrap();
@@ -282,7 +367,7 @@
     fn test_read_current_channel_rejects_invalid_json() {
         let dir = tempfile::tempdir().unwrap();
 
-        fs::write(dir.path().join("current_channel.json"), "no channel here").unwrap();
+        fs::write(dir.path().join(CURRENT_CHANNEL), "no channel here").unwrap();
 
         assert_matches!(read_current_channel(dir.path()), Err(Error::Json(_)));
     }
@@ -290,7 +375,7 @@
     #[fuchsia_async::run_singlethreaded(test)]
     async fn test_current_channel_notifier() {
         let dir = tempfile::tempdir().unwrap();
-        let current_channel_path = dir.path().join("current_channel.json");
+        let current_channel_path = dir.path().join(CURRENT_CHANNEL);
 
         fs::write(
             &current_channel_path,
@@ -299,9 +384,9 @@
         .unwrap();
 
         let (connector, svc_dir) =
-            NamespacedServiceConnector::bind("/test/current_channel_notifier/svc")
+            NamespacedServiceConnector::bind("/test/current_channel_manager/svc")
                 .expect("ns to bind");
-        let c = CurrentChannelNotifier::new(connector, dir.path());
+        let (_, c) = build_current_channel_manager_and_notifier(connector, dir.path()).unwrap();
 
         let mut fs = ServiceFs::new_local();
         let channel = Arc::new(Mutex::new(None));
@@ -337,7 +422,7 @@
         let mut executor = fasync::Executor::new_with_fake_time().unwrap();
 
         let dir = tempfile::tempdir().unwrap();
-        let current_channel_path = dir.path().join("current_channel.json");
+        let current_channel_path = dir.path().join(CURRENT_CHANNEL);
 
         write_channel(&current_channel_path, "stable").unwrap();
 
@@ -448,7 +533,8 @@
         }
 
         let connector = FlakeyServiceConnector::new();
-        let c = CurrentChannelNotifier::new(connector.clone(), dir.path());
+        let (_, c) = build_current_channel_manager_and_notifier(connector.clone(), dir.path())
+            .expect("failed to construct channel_manager");
         let mut task = c.run().boxed();
         assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
 
@@ -487,7 +573,8 @@
 
         // Bails out if Cobalt responds with an unexpected status code
         let connector = FlakeyServiceConnector::new();
-        let c = CurrentChannelNotifier::new(connector.clone(), dir.path());
+        let (_, c) = build_current_channel_manager_and_notifier(connector.clone(), dir.path())
+            .expect("failed to construct channel_manager");
         let mut task = c.run().boxed();
         connector.set_flake_mode(FlakeMode::StatusOnCall(CobaltStatus::InvalidArguments));
         assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
@@ -495,10 +582,77 @@
         assert_eq!(connector.call_count(), 1);
     }
 
+    #[test]
+    fn test_current_channel_manager_writes_channel() {
+        let mut exec = fasync::Executor::new().expect("Unable to create executor");
+
+        let dir = tempfile::tempdir().unwrap();
+        let target_channel_path = dir.path().join(TARGET_CHANNEL);
+        let current_channel_path = dir.path().join(CURRENT_CHANNEL);
+
+        let target_connector = RewriteServiceConnector::new("fuchsia-pkg://devhost/update/0");
+        let mut target_channel_manager =
+            TargetChannelManager::new(target_connector.clone(), dir.path());
+
+        let (current_connector, svc_dir) =
+            NamespacedServiceConnector::bind("/test/current_channel_manager2/svc")
+                .expect("ns to bind");
+        let (mut current_channel_manager, current_channel_notifier) =
+            build_current_channel_manager_and_notifier(current_connector, dir.path()).unwrap();
+
+        let mut fs = ServiceFs::new_local();
+        let channel = Arc::new(Mutex::new(None));
+        let chan = channel.clone();
+
+        fs.add_fidl_service(move |mut stream: SystemDataUpdaterRequestStream| {
+            let chan = chan.clone();
+
+            fasync::spawn_local(async move {
+                while let Some(req) = stream.try_next().await.unwrap_or(None) {
+                    match req {
+                        SystemDataUpdaterRequest::SetChannel { current_channel, responder } => {
+                            *chan.lock() = Some(current_channel);
+                            responder.send(CobaltStatus::Ok).unwrap();
+                        }
+                        _ => unreachable!(),
+                    }
+                }
+            })
+        })
+        .serve_connection(svc_dir)
+        .expect("serve_connection");
+
+        fasync::spawn_local(fs.collect());
+
+        let mut notify_fut = current_channel_notifier.run().boxed();
+        assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending);
+
+        assert_matches!(read_channel(&current_channel_path), Err(_));
+        assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some(""));
+
+        exec.run_singlethreaded(target_channel_manager.update())
+            .expect("channel update to succeed");
+        exec.run_singlethreaded(current_channel_manager.update())
+            .expect("current channel update to succeed");
+        assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending);
+
+        assert_eq!(read_channel(&current_channel_path).unwrap(), "devhost");
+        assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("devhost"));
+
+        // Even if the current_channel is already known, it should be overwritten.
+        write_channel(&target_channel_path, "different").unwrap();
+        exec.run_singlethreaded(current_channel_manager.update())
+            .expect("current channel update to succeed");
+        assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending);
+
+        assert_eq!(read_channel(&current_channel_path).unwrap(), "different");
+        assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("different"));
+    }
+
     #[fuchsia_async::run_singlethreaded(test)]
     async fn test_target_channel_manager_writes_channel() {
         let dir = tempfile::tempdir().unwrap();
-        let target_channel_path = dir.path().join("target_channel.json");
+        let target_channel_path = dir.path().join(TARGET_CHANNEL);
 
         let connector = RewriteServiceConnector::new("fuchsia-pkg://devhost/update/0");
         let mut channel_manager = TargetChannelManager::new(connector.clone(), dir.path());
@@ -522,7 +676,7 @@
     #[fuchsia_async::run_singlethreaded(test)]
     async fn test_target_channel_manager_recovers_from_corrupt_data() {
         let dir = tempfile::tempdir().unwrap();
-        let target_channel_path = dir.path().join("target_channel.json");
+        let target_channel_path = dir.path().join(TARGET_CHANNEL);
 
         fs::write(&target_channel_path, r#"invalid json"#).unwrap();
 
diff --git a/garnet/bin/system-update-checker/src/info_handler.rs b/garnet/bin/system-update-checker/src/info_handler.rs
index fa0ff0d..cc8cad5 100644
--- a/garnet/bin/system-update-checker/src/info_handler.rs
+++ b/garnet/bin/system-update-checker/src/info_handler.rs
@@ -121,5 +121,4 @@
 
         assert_eq!(res.map_err(|e| e.to_string()), Ok("".into()));
     }
-
 }
diff --git a/garnet/bin/system-update-checker/src/main.rs b/garnet/bin/system-update-checker/src/main.rs
index e0f2cbb..4552abc 100644
--- a/garnet/bin/system-update-checker/src/main.rs
+++ b/garnet/bin/system-update-checker/src/main.rs
@@ -50,12 +50,16 @@
         fx_log_err!("while updating the target channel: {}", e);
     }
 
-    let channel_notifier =
-        channel::CurrentChannelNotifier::new(connect::ServiceConnector, "/misc/ota");
-    let channel_fut = channel_notifier.run();
+    let (current_channel_manager, current_channel_notifier) =
+        channel::build_current_channel_manager_and_notifier(
+            connect::ServiceConnector,
+            "/misc/ota",
+        )?;
+    let channel_fut = current_channel_notifier.run();
 
     let update_manager = Arc::new(RealUpdateManager::new(
         target_channel_manager,
+        current_channel_manager,
         inspector.root().create_child("update-manager"),
     ));
     let info_handler = InfoHandler::default();
diff --git a/garnet/bin/system-update-checker/src/poller.rs b/garnet/bin/system-update-checker/src/poller.rs
index 8bb7039..3542bf7 100644
--- a/garnet/bin/system-update-checker/src/poller.rs
+++ b/garnet/bin/system-update-checker/src/poller.rs
@@ -4,7 +4,9 @@
 
 use crate::apply::Initiator;
 use crate::config::Config;
-use crate::update_manager::{TargetChannelUpdater, UpdateApplier, UpdateChecker, UpdateManager};
+use crate::update_manager::{
+    CurrentChannelUpdater, TargetChannelUpdater, UpdateApplier, UpdateChecker, UpdateManager,
+};
 use crate::update_monitor::StateChangeCallback;
 use fidl_fuchsia_update::CheckStartedResult;
 use fuchsia_async as fasync;
@@ -12,12 +14,13 @@
 use futures::prelude::*;
 use std::sync::Arc;
 
-pub fn run_periodic_update_check<T, C, A, S>(
-    manager: Arc<UpdateManager<T, C, A, S>>,
+pub fn run_periodic_update_check<T, Ch, C, A, S>(
+    manager: Arc<UpdateManager<T, Ch, C, A, S>>,
     config: &Config,
 ) -> impl Future<Output = ()>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
     S: StateChangeCallback,
@@ -49,8 +52,8 @@
     use super::*;
     use crate::config::ConfigBuilder;
     use crate::update_manager::tests::{
-        FakeTargetChannelUpdater, FakeUpdateChecker, StateChangeCollector,
-        UnreachableStateChangeCallback, UnreachableUpdateApplier,
+        FakeCurrentChannelUpdater, FakeTargetChannelUpdater, FakeUpdateChecker,
+        StateChangeCollector, UnreachableStateChangeCallback, UnreachableUpdateApplier,
     };
     use fidl_fuchsia_update::ManagerState;
     use fuchsia_async::DurationExt;
@@ -62,9 +65,10 @@
         let mut executor = fasync::Executor::new_with_fake_time().unwrap();
 
         let checker = FakeUpdateChecker::new_up_to_date();
-        let manager: UpdateManager<_, _, _, UnreachableStateChangeCallback> =
+        let manager: UpdateManager<_, _, _, _, UnreachableStateChangeCallback> =
             UpdateManager::from_checker_and_applier(
                 FakeTargetChannelUpdater::new(),
+                FakeCurrentChannelUpdater::new(),
                 checker.clone(),
                 UnreachableUpdateApplier,
             );
@@ -84,6 +88,7 @@
         let callback = StateChangeCollector::new();
         let manager = UpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             checker.clone(),
             UnreachableUpdateApplier,
         );
@@ -133,6 +138,7 @@
         let callback = StateChangeCollector::new();
         let manager = UpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             checker.clone(),
             UnreachableUpdateApplier,
         );
diff --git a/garnet/bin/system-update-checker/src/update_manager.rs b/garnet/bin/system-update-checker/src/update_manager.rs
index 55e87c2..d06192e 100644
--- a/garnet/bin/system-update-checker/src/update_manager.rs
+++ b/garnet/bin/system-update-checker/src/update_manager.rs
@@ -3,7 +3,7 @@
 // found in the LICENSE file.
 
 use crate::apply::{apply_system_update, Initiator};
-use crate::channel::TargetChannelManager;
+use crate::channel::{CurrentChannelManager, TargetChannelManager};
 use crate::check::{check_for_system_update, SystemUpdateStatus};
 use crate::connect::ServiceConnect;
 use crate::update_monitor::{State, StateChangeCallback, UpdateMonitor};
@@ -27,38 +27,47 @@
 // periodically lock `monitor` to send status updates. Before an async task or thread can lock
 // `updater`, it must release any locks on `monitor`.
 //
-pub struct UpdateManager<T, C, A, S>
+pub struct UpdateManager<T, Ch, C, A, S>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
     S: StateChangeCallback,
 {
     monitor: Arc<Mutex<UpdateMonitor<S>>>,
-    updater: Arc<AsyncMutex<SystemInterface<T, C, A>>>,
+    updater: Arc<AsyncMutex<SystemInterface<T, Ch, C, A>>>,
 }
 
-struct SystemInterface<T, C, A>
+struct SystemInterface<T, Ch, C, A>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
 {
-    channel_updater: T,
+    target_channel_updater: T,
+    current_channel_updater: Ch,
     update_checker: C,
     update_applier: A,
 }
 
-impl<T, S> UpdateManager<T, RealUpdateChecker, RealUpdateApplier, S>
+impl<T, Ch, S> UpdateManager<T, Ch, RealUpdateChecker, RealUpdateApplier, S>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     S: StateChangeCallback,
 {
-    pub fn new(channel_updater: T, node: finspect::Node) -> Self {
+    pub fn new(
+        target_channel_updater: T,
+        current_channel_updater: Ch,
+        node: finspect::Node,
+    ) -> Self {
         Self {
             monitor: Arc::new(Mutex::new(UpdateMonitor::from_inspect_node(node))),
             updater: Arc::new(AsyncMutex::new(SystemInterface::new(
-                channel_updater,
+                target_channel_updater,
+                current_channel_updater,
                 RealUpdateChecker,
                 RealUpdateApplier,
             ))),
@@ -66,23 +75,26 @@
     }
 }
 
-impl<T, C, A, S> UpdateManager<T, C, A, S>
+impl<T, Ch, C, A, S> UpdateManager<T, Ch, C, A, S>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
     S: StateChangeCallback,
 {
     #[cfg(test)]
     pub fn from_checker_and_applier(
-        channel_updater: T,
+        target_channel_updater: T,
+        current_channel_updater: Ch,
         update_checker: C,
         update_applier: A,
     ) -> Self {
         Self {
             monitor: Arc::new(Mutex::new(UpdateMonitor::new())),
             updater: Arc::new(AsyncMutex::new(SystemInterface::new(
-                channel_updater,
+                target_channel_updater,
+                current_channel_updater,
                 update_checker,
                 update_applier,
             ))),
@@ -127,14 +139,20 @@
     }
 }
 
-impl<T, C, A> SystemInterface<T, C, A>
+impl<T, Ch, C, A> SystemInterface<T, Ch, C, A>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
 {
-    pub fn new(channel_updater: T, update_checker: C, update_applier: A) -> Self {
-        Self { channel_updater, update_checker, update_applier }
+    pub fn new(
+        target_channel_updater: T,
+        current_channel_updater: Ch,
+        update_checker: C,
+        update_applier: A,
+    ) -> Self {
+        Self { target_channel_updater, current_channel_updater, update_checker, update_applier }
     }
 
     async fn do_system_update_check_and_return_to_idle<S: StateChangeCallback>(
@@ -172,12 +190,13 @@
             }
         );
 
-        self.channel_updater.update().await;
+        self.target_channel_updater.update().await;
 
         match self.update_checker.check().await.context("check_for_system_update failed")? {
             SystemUpdateStatus::UpToDate { system_image } => {
                 fx_log_info!("current system_image merkle: {}", system_image);
                 fx_log_info!("system_image is already up-to-date");
+                self.current_channel_updater.update().await;
                 return Ok(());
             }
             SystemUpdateStatus::UpdateAvailable { current_system_image, latest_system_image } => {
@@ -229,6 +248,19 @@
 }
 
 // For mocking
+pub trait CurrentChannelUpdater: Send + Sync + 'static {
+    fn update(&mut self) -> BoxFuture<'_, ()>;
+}
+
+impl CurrentChannelUpdater for CurrentChannelManager {
+    fn update(&mut self) -> BoxFuture<'_, ()> {
+        CurrentChannelManager::update(self)
+            .unwrap_or_else(|e| fx_log_err!("while updating current channel: {:?}", e))
+            .boxed()
+    }
+}
+
+// For mocking
 pub trait UpdateApplier: Send + Sync + 'static {
     fn apply(
         &self,
@@ -337,6 +369,28 @@
     }
 
     #[derive(Clone)]
+    pub struct FakeCurrentChannelUpdater {
+        call_count: Arc<AtomicU64>,
+    }
+    impl FakeCurrentChannelUpdater {
+        pub fn new() -> Self {
+            Self { call_count: Arc::new(AtomicU64::new(0)) }
+        }
+        pub fn call_count(&self) -> u64 {
+            self.call_count.load(Ordering::SeqCst)
+        }
+    }
+    impl CurrentChannelUpdater for FakeCurrentChannelUpdater {
+        fn update(&mut self) -> BoxFuture<'_, ()> {
+            let call_count = self.call_count.clone();
+            async move {
+                call_count.fetch_add(1, Ordering::SeqCst);
+            }
+                .boxed()
+        }
+    }
+
+    #[derive(Clone)]
     pub struct UnreachableUpdateApplier;
     impl UpdateApplier for UnreachableUpdateApplier {
         fn apply(
@@ -429,6 +483,7 @@
 
     type FakeUpdateManager = UpdateManager<
         FakeTargetChannelUpdater,
+        FakeCurrentChannelUpdater,
         FakeUpdateChecker,
         FakeUpdateApplier,
         FakeStateChangeCallback,
@@ -436,6 +491,7 @@
 
     type BlockingManagerManager = UpdateManager<
         FakeTargetChannelUpdater,
+        FakeCurrentChannelUpdater,
         BlockingUpdateChecker,
         FakeUpdateApplier,
         FakeStateChangeCallback,
@@ -486,6 +542,7 @@
     fn test_correct_initial_state() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_up_to_date(),
             FakeUpdateApplier::new_success(),
         );
@@ -498,6 +555,7 @@
         let _executor = fasync::Executor::new().expect("create test executor");
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_up_to_date(),
             FakeUpdateApplier::new_success(),
         );
@@ -509,6 +567,7 @@
     async fn test_temporary_callbacks_dropped_after_update_attempt() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_up_to_date(),
             FakeUpdateApplier::new_success(),
         );
@@ -536,6 +595,7 @@
     async fn test_try_start_update_callback_when_up_to_date() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_up_to_date(),
             FakeUpdateApplier::new_success(),
         );
@@ -557,6 +617,7 @@
     async fn test_try_start_update_callback_when_update_available_and_apply_errors() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         );
@@ -580,6 +641,7 @@
     async fn test_try_start_update_callback_when_update_available_and_apply_succeeds() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_success(),
         );
@@ -602,6 +664,7 @@
     async fn test_permanent_callback_is_called() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         );
@@ -626,6 +689,7 @@
     async fn test_permanent_callback_persists_across_attempts() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         );
@@ -665,6 +729,7 @@
         let channel_updater = FakeTargetChannelUpdater::new();
         let manager = UpdateManager::from_checker_and_applier(
             channel_updater.clone(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_up_to_date(),
             UnreachableUpdateApplier,
         );
@@ -681,6 +746,7 @@
         let update_applier = FakeUpdateApplier::new_error();
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             update_applier.clone(),
         );
@@ -695,8 +761,10 @@
     #[fasync::run_singlethreaded(test)]
     async fn test_update_applier_not_called_if_up_to_date() {
         let update_applier = FakeUpdateApplier::new_error();
+        let current_channel_updater = FakeCurrentChannelUpdater::new();
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            current_channel_updater.clone(),
             FakeUpdateChecker::new_up_to_date(),
             update_applier.clone(),
         );
@@ -706,12 +774,14 @@
         receiver.collect::<Vec<State>>().await;
 
         assert_eq!(update_applier.call_count(), 0);
+        assert_eq!(current_channel_updater.call_count(), 1);
     }
 
     #[fasync::run_singlethreaded(test)]
     async fn test_return_to_initial_state_on_update_check_error() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_error(),
             FakeUpdateApplier::new_error(),
         );
@@ -727,6 +797,7 @@
     async fn test_return_to_initial_state_on_update_apply_error() {
         let manager = FakeUpdateManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         );
@@ -768,6 +839,7 @@
         let (blocking_update_checker, _sender) = BlockingUpdateChecker::new_checker_and_sender();
         let manager = BlockingManagerManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             blocking_update_checker,
             FakeUpdateApplier::new_error(),
         );
@@ -783,6 +855,7 @@
         let update_applier = FakeUpdateApplier::new_error();
         let manager = BlockingManagerManager::from_checker_and_applier(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             blocking_update_checker,
             update_applier.clone(),
         );
diff --git a/garnet/bin/system-update-checker/src/update_monitor.rs b/garnet/bin/system-update-checker/src/update_monitor.rs
index 903d6bf..2bc7707 100644
--- a/garnet/bin/system-update-checker/src/update_monitor.rs
+++ b/garnet/bin/system-update-checker/src/update_monitor.rs
@@ -276,7 +276,6 @@
             );
         }
     }
-
 }
 
 #[cfg(test)]
diff --git a/garnet/bin/system-update-checker/src/update_service.rs b/garnet/bin/system-update-checker/src/update_service.rs
index e004f63..1629e09 100644
--- a/garnet/bin/system-update-checker/src/update_service.rs
+++ b/garnet/bin/system-update-checker/src/update_service.rs
@@ -3,11 +3,11 @@
 // found in the LICENSE file.
 
 use crate::apply::Initiator;
-use crate::channel::TargetChannelManager;
+use crate::channel::{CurrentChannelManager, TargetChannelManager};
 use crate::connect::ServiceConnector;
 use crate::update_manager::{
-    RealUpdateApplier, RealUpdateChecker, TargetChannelUpdater, UpdateApplier, UpdateChecker,
-    UpdateManager,
+    CurrentChannelUpdater, RealUpdateApplier, RealUpdateChecker, TargetChannelUpdater,
+    UpdateApplier, UpdateChecker, UpdateManager,
 };
 use crate::update_monitor::{State, StateChangeCallback};
 use failure::{bail, Error, ResultExt};
@@ -20,28 +20,36 @@
 use std::sync::Arc;
 
 pub type RealTargetChannelUpdater = TargetChannelManager<ServiceConnector>;
-pub type RealUpdateService =
-    UpdateService<RealTargetChannelUpdater, RealUpdateChecker, RealUpdateApplier>;
+pub type RealCurrentChannelUpdater = CurrentChannelManager;
+pub type RealUpdateService = UpdateService<
+    RealTargetChannelUpdater,
+    RealCurrentChannelUpdater,
+    RealUpdateChecker,
+    RealUpdateApplier,
+>;
 pub type RealStateChangeCallback = MonitorControlHandle;
 pub type RealUpdateManager = UpdateManager<
     RealTargetChannelUpdater,
+    RealCurrentChannelUpdater,
     RealUpdateChecker,
     RealUpdateApplier,
     RealStateChangeCallback,
 >;
 
-pub struct UpdateService<T, C, A>
+pub struct UpdateService<T, Ch, C, A>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
 {
-    update_manager: Arc<UpdateManager<T, C, A, RealStateChangeCallback>>,
+    update_manager: Arc<UpdateManager<T, Ch, C, A, RealStateChangeCallback>>,
 }
 
-impl<T, C, A> Clone for UpdateService<T, C, A>
+impl<T, Ch, C, A> Clone for UpdateService<T, Ch, C, A>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
 {
@@ -56,9 +64,10 @@
     }
 }
 
-impl<T, C, A> UpdateService<T, C, A>
+impl<T, Ch, C, A> UpdateService<T, Ch, C, A>
 where
     T: TargetChannelUpdater,
+    Ch: CurrentChannelUpdater,
     C: UpdateChecker,
     A: UpdateApplier,
 {
@@ -162,7 +171,8 @@
 mod tests {
     use super::*;
     use crate::update_manager::tests::{
-        BlockingUpdateChecker, FakeTargetChannelUpdater, FakeUpdateApplier, FakeUpdateChecker,
+        BlockingUpdateChecker, FakeCurrentChannelUpdater, FakeTargetChannelUpdater,
+        FakeUpdateApplier, FakeUpdateChecker,
     };
     use fidl::endpoints::create_proxy_and_stream;
     use fidl_fuchsia_update::ManagerState;
@@ -184,20 +194,23 @@
         fidl_fuchsia_update::Options { initiator: Some(fidl_fuchsia_update::Initiator::User) }
     }
 
-    fn spawn_update_service<T, C, A>(
+    fn spawn_update_service<T, Ch, C, A>(
         channel_updater: T,
+        current_channel_updater: Ch,
         update_checker: C,
         update_applier: A,
-    ) -> (ManagerProxy, UpdateService<T, C, A>)
+    ) -> (ManagerProxy, UpdateService<T, Ch, C, A>)
     where
         T: TargetChannelUpdater,
+        Ch: CurrentChannelUpdater,
         C: UpdateChecker,
         A: UpdateApplier,
     {
-        let update_service = UpdateService::<T, C, A> {
+        let update_service = UpdateService::<T, Ch, C, A> {
             update_manager: Arc::new(
-                UpdateManager::<T, C, A, RealStateChangeCallback>::from_checker_and_applier(
+                UpdateManager::<T, Ch, C, A, RealStateChangeCallback>::from_checker_and_applier(
                     channel_updater,
+                    current_channel_updater,
                     update_checker,
                     update_applier,
                 ),
@@ -235,6 +248,7 @@
     async fn test_check_now_monitor_sees_on_state_events() {
         let proxy = spawn_update_service(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         )
@@ -259,6 +273,7 @@
     async fn test_add_monitor_sees_on_state_events() {
         let proxy = spawn_update_service(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         )
@@ -285,6 +300,7 @@
     async fn test_get_state() {
         let proxy = spawn_update_service(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         )
@@ -300,6 +316,7 @@
     async fn test_multiple_clients_see_on_state_events() {
         let (proxy0, service) = spawn_update_service(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             FakeUpdateChecker::new_update_available(),
             FakeUpdateApplier::new_error(),
         );
@@ -346,6 +363,7 @@
         let fake_update_applier = FakeUpdateApplier::new_error();
         let (proxy0, service) = spawn_update_service(
             FakeTargetChannelUpdater::new(),
+            FakeCurrentChannelUpdater::new(),
             blocking_update_checker,
             fake_update_applier.clone(),
         );