[channel] Accept target_channel as current_channel
If the system_image has been verified as being up-to-date, then it is
safe to assume that the target_channel is actually our current_channel.
Bug: 35299
Change-Id: I6784c3c2061a65b18840089d71b1e14fdf05b465
No-Tree-Checks: true
diff --git a/garnet/bin/system-update-checker/src/channel.rs b/garnet/bin/system-update-checker/src/channel.rs
index e682053..fb92777 100644
--- a/garnet/bin/system-update-checker/src/channel.rs
+++ b/garnet/bin/system-update-checker/src/channel.rs
@@ -12,6 +12,12 @@
use fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn};
use fuchsia_url::pkg_url::PkgUrl;
use fuchsia_zircon as zx;
+use futures::{
+ channel::mpsc,
+ future::{self, Either, FutureExt},
+ sink::SinkExt,
+ stream::StreamExt,
+};
use serde_derive::{Deserialize, Serialize};
use serde_json;
use std::fs;
@@ -19,14 +25,15 @@
use std::path::{Path, PathBuf};
use std::time::Duration;
-pub struct CurrentChannelNotifier<S = ServiceConnector> {
- service_connector: S,
- current_channel: String,
-}
+static CURRENT_CHANNEL: &'static str = "current_channel.json";
+static TARGET_CHANNEL: &'static str = "target_channel.json";
-impl<S: ServiceConnect> CurrentChannelNotifier<S> {
- pub fn new(service_connector: S, dir: impl AsRef<Path>) -> Self {
- let current_channel = read_current_channel(dir.as_ref()).unwrap_or_else(|err| {
+pub fn build_current_channel_manager_and_notifier<S: ServiceConnect>(
+ service_connector: S,
+ dir: impl Into<PathBuf>,
+) -> Result<(CurrentChannelManager, CurrentChannelNotifier<S>), failure::Error> {
+ let path = dir.into();
+ let current_channel = read_current_channel(path.as_ref()).unwrap_or_else(|err| {
fx_log_err!(
"Error reading current_channel, defaulting to the empty string. This is expected before the first OTA. {}",
err
@@ -34,57 +41,135 @@
String::new()
});
- CurrentChannelNotifier { service_connector, current_channel }
+ let (channel_sender, channel_receiver) = mpsc::channel(100);
+
+ Ok((
+ CurrentChannelManager::new(path, channel_sender),
+ CurrentChannelNotifier::new(service_connector, current_channel, channel_receiver),
+ ))
+}
+
+pub struct CurrentChannelNotifier<S = ServiceConnector> {
+ service_connector: S,
+ initial_channel: String,
+ channel_receiver: mpsc::Receiver<String>,
+}
+
+impl<S: ServiceConnect> CurrentChannelNotifier<S> {
+ fn new(
+ service_connector: S,
+ initial_channel: String,
+ channel_receiver: mpsc::Receiver<String>,
+ ) -> Self {
+ CurrentChannelNotifier { service_connector, initial_channel, channel_receiver }
}
- pub async fn run(self) {
+ async fn notify_cobalt(service_connector: &S, current_channel: String) {
loop {
- let cobalt = self.connect().await;
+ let cobalt = Self::connect(service_connector).await;
- fx_log_info!("calling cobalt.SetChannel(\"{}\")", self.current_channel);
+ fx_log_info!("calling cobalt.SetChannel(\"{}\")", current_channel);
- match cobalt.set_channel(&self.current_channel).await {
+ match cobalt.set_channel(¤t_channel).await {
Ok(CobaltStatus::Ok) => {
- break;
+ return;
}
Ok(CobaltStatus::EventTooBig) => {
fx_log_warn!("cobalt.SetChannel returned Status.EVENT_TOO_BIG, retrying");
- self.sleep().await;
}
Ok(status) => {
// Not much we can do about the other status codes but log.
fx_log_err!("cobalt.SetChannel returned non-OK status: {:?}", status);
- break;
+ return;
}
Err(err) => {
// channel broken, so log the error and reconnect.
fx_log_warn!("cobalt.SetChannel returned error: {}, retrying", err);
- self.sleep().await;
+ }
+ }
+
+ Self::sleep().await;
+ }
+ }
+
+ pub async fn run(self) {
+ let Self { service_connector, initial_channel, mut channel_receiver } = self;
+ let mut notify_cobalt_task =
+ Self::notify_cobalt(&service_connector, initial_channel).boxed();
+
+ loop {
+ match future::select(channel_receiver.next(), notify_cobalt_task).await {
+ Either::Left((Some(current_channel), _)) => {
+ fx_log_warn!(
+ "notify_cobalt() overrun. Starting again with new channel: `{}`",
+ current_channel
+ );
+ notify_cobalt_task =
+ Self::notify_cobalt(&service_connector, current_channel).boxed();
+ }
+ Either::Left((None, notify_cobalt_future)) => {
+ fx_log_warn!(
+ "all channel_senders have been closed. No new messages will arrive."
+ );
+ notify_cobalt_future.await;
+ return;
+ }
+ Either::Right((_, next_channel_fut)) => {
+ if let Some(current_channel) = next_channel_fut.await {
+ notify_cobalt_task =
+ Self::notify_cobalt(&service_connector, current_channel).boxed();
+ } else {
+ fx_log_warn!(
+ "all channel_senders have been closed. No new messages will arrive."
+ );
+ return;
+ }
}
}
}
}
- async fn connect(&self) -> SystemDataUpdaterProxy {
+ async fn connect(service_connector: &S) -> SystemDataUpdaterProxy {
loop {
- match self.service_connector.connect_to_service::<SystemDataUpdaterMarker>() {
+ match service_connector.connect_to_service::<SystemDataUpdaterMarker>() {
Ok(cobalt) => {
return cobalt;
}
Err(err) => {
fx_log_err!("error connecting to cobalt: {}", err);
- self.sleep().await
+ Self::sleep().await
}
}
}
}
- async fn sleep(&self) {
+ async fn sleep() {
let delay = fasync::Time::after(Duration::from_secs(5).into());
fasync::Timer::new(delay).await;
}
}
+#[derive(Clone)]
+pub struct CurrentChannelManager {
+ path: PathBuf,
+ channel_sender: mpsc::Sender<String>,
+}
+
+impl CurrentChannelManager {
+ fn new(path: PathBuf, channel_sender: mpsc::Sender<String>) -> Self {
+ CurrentChannelManager { path, channel_sender }
+ }
+
+ pub async fn update(&mut self) -> Result<(), failure::Error> {
+ let target_channel = read_channel(&self.path.join(TARGET_CHANNEL))?;
+ if target_channel != read_current_channel(&self.path).ok().unwrap_or_else(String::new) {
+ write_channel(&self.path.join(CURRENT_CHANNEL), &target_channel)?;
+ self.channel_sender.send(target_channel).await?;
+ }
+ Ok(())
+ }
+}
+
pub struct TargetChannelManager<S = ServiceConnector> {
service_connector: S,
path: PathBuf,
@@ -94,7 +179,7 @@
impl<S: ServiceConnect> TargetChannelManager<S> {
pub fn new(service_connector: S, dir: impl Into<PathBuf>) -> Self {
let mut path = dir.into();
- path.push("target_channel.json");
+ path.push(TARGET_CHANNEL);
let target_channel = read_channel(&path).ok();
Self { service_connector, path, target_channel }
@@ -132,7 +217,7 @@
}
fn read_current_channel(p: &Path) -> Result<String, Error> {
- read_channel(p.join("current_channel.json"))
+ read_channel(p.join(CURRENT_CHANNEL))
}
fn read_channel(path: impl AsRef<Path>) -> Result<String, Error> {
@@ -230,7 +315,7 @@
let dir = tempfile::tempdir().unwrap();
fs::write(
- dir.path().join("current_channel.json"),
+ dir.path().join(CURRENT_CHANNEL),
r#"{"version":"1","content":{"legacy_amber_source_name":"stable"}}"#,
)
.unwrap();
@@ -282,7 +367,7 @@
fn test_read_current_channel_rejects_invalid_json() {
let dir = tempfile::tempdir().unwrap();
- fs::write(dir.path().join("current_channel.json"), "no channel here").unwrap();
+ fs::write(dir.path().join(CURRENT_CHANNEL), "no channel here").unwrap();
assert_matches!(read_current_channel(dir.path()), Err(Error::Json(_)));
}
@@ -290,7 +375,7 @@
#[fuchsia_async::run_singlethreaded(test)]
async fn test_current_channel_notifier() {
let dir = tempfile::tempdir().unwrap();
- let current_channel_path = dir.path().join("current_channel.json");
+ let current_channel_path = dir.path().join(CURRENT_CHANNEL);
fs::write(
¤t_channel_path,
@@ -299,9 +384,9 @@
.unwrap();
let (connector, svc_dir) =
- NamespacedServiceConnector::bind("/test/current_channel_notifier/svc")
+ NamespacedServiceConnector::bind("/test/current_channel_manager/svc")
.expect("ns to bind");
- let c = CurrentChannelNotifier::new(connector, dir.path());
+ let (_, c) = build_current_channel_manager_and_notifier(connector, dir.path()).unwrap();
let mut fs = ServiceFs::new_local();
let channel = Arc::new(Mutex::new(None));
@@ -337,7 +422,7 @@
let mut executor = fasync::Executor::new_with_fake_time().unwrap();
let dir = tempfile::tempdir().unwrap();
- let current_channel_path = dir.path().join("current_channel.json");
+ let current_channel_path = dir.path().join(CURRENT_CHANNEL);
write_channel(¤t_channel_path, "stable").unwrap();
@@ -448,7 +533,8 @@
}
let connector = FlakeyServiceConnector::new();
- let c = CurrentChannelNotifier::new(connector.clone(), dir.path());
+ let (_, c) = build_current_channel_manager_and_notifier(connector.clone(), dir.path())
+ .expect("failed to construct channel_manager");
let mut task = c.run().boxed();
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
@@ -487,7 +573,8 @@
// Bails out if Cobalt responds with an unexpected status code
let connector = FlakeyServiceConnector::new();
- let c = CurrentChannelNotifier::new(connector.clone(), dir.path());
+ let (_, c) = build_current_channel_manager_and_notifier(connector.clone(), dir.path())
+ .expect("failed to construct channel_manager");
let mut task = c.run().boxed();
connector.set_flake_mode(FlakeMode::StatusOnCall(CobaltStatus::InvalidArguments));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
@@ -495,10 +582,77 @@
assert_eq!(connector.call_count(), 1);
}
+ #[test]
+ fn test_current_channel_manager_writes_channel() {
+ let mut exec = fasync::Executor::new().expect("Unable to create executor");
+
+ let dir = tempfile::tempdir().unwrap();
+ let target_channel_path = dir.path().join(TARGET_CHANNEL);
+ let current_channel_path = dir.path().join(CURRENT_CHANNEL);
+
+ let target_connector = RewriteServiceConnector::new("fuchsia-pkg://devhost/update/0");
+ let mut target_channel_manager =
+ TargetChannelManager::new(target_connector.clone(), dir.path());
+
+ let (current_connector, svc_dir) =
+ NamespacedServiceConnector::bind("/test/current_channel_manager2/svc")
+ .expect("ns to bind");
+ let (mut current_channel_manager, current_channel_notifier) =
+ build_current_channel_manager_and_notifier(current_connector, dir.path()).unwrap();
+
+ let mut fs = ServiceFs::new_local();
+ let channel = Arc::new(Mutex::new(None));
+ let chan = channel.clone();
+
+ fs.add_fidl_service(move |mut stream: SystemDataUpdaterRequestStream| {
+ let chan = chan.clone();
+
+ fasync::spawn_local(async move {
+ while let Some(req) = stream.try_next().await.unwrap_or(None) {
+ match req {
+ SystemDataUpdaterRequest::SetChannel { current_channel, responder } => {
+ *chan.lock() = Some(current_channel);
+ responder.send(CobaltStatus::Ok).unwrap();
+ }
+ _ => unreachable!(),
+ }
+ }
+ })
+ })
+ .serve_connection(svc_dir)
+ .expect("serve_connection");
+
+ fasync::spawn_local(fs.collect());
+
+ let mut notify_fut = current_channel_notifier.run().boxed();
+ assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending);
+
+ assert_matches!(read_channel(¤t_channel_path), Err(_));
+ assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some(""));
+
+ exec.run_singlethreaded(target_channel_manager.update())
+ .expect("channel update to succeed");
+ exec.run_singlethreaded(current_channel_manager.update())
+ .expect("current channel update to succeed");
+ assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending);
+
+ assert_eq!(read_channel(¤t_channel_path).unwrap(), "devhost");
+ assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("devhost"));
+
+ // Even if the current_channel is already known, it should be overwritten.
+ write_channel(&target_channel_path, "different").unwrap();
+ exec.run_singlethreaded(current_channel_manager.update())
+ .expect("current channel update to succeed");
+ assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending);
+
+ assert_eq!(read_channel(¤t_channel_path).unwrap(), "different");
+ assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("different"));
+ }
+
#[fuchsia_async::run_singlethreaded(test)]
async fn test_target_channel_manager_writes_channel() {
let dir = tempfile::tempdir().unwrap();
- let target_channel_path = dir.path().join("target_channel.json");
+ let target_channel_path = dir.path().join(TARGET_CHANNEL);
let connector = RewriteServiceConnector::new("fuchsia-pkg://devhost/update/0");
let mut channel_manager = TargetChannelManager::new(connector.clone(), dir.path());
@@ -522,7 +676,7 @@
#[fuchsia_async::run_singlethreaded(test)]
async fn test_target_channel_manager_recovers_from_corrupt_data() {
let dir = tempfile::tempdir().unwrap();
- let target_channel_path = dir.path().join("target_channel.json");
+ let target_channel_path = dir.path().join(TARGET_CHANNEL);
fs::write(&target_channel_path, r#"invalid json"#).unwrap();
diff --git a/garnet/bin/system-update-checker/src/info_handler.rs b/garnet/bin/system-update-checker/src/info_handler.rs
index fa0ff0d..cc8cad5 100644
--- a/garnet/bin/system-update-checker/src/info_handler.rs
+++ b/garnet/bin/system-update-checker/src/info_handler.rs
@@ -121,5 +121,4 @@
assert_eq!(res.map_err(|e| e.to_string()), Ok("".into()));
}
-
}
diff --git a/garnet/bin/system-update-checker/src/main.rs b/garnet/bin/system-update-checker/src/main.rs
index e0f2cbb..4552abc 100644
--- a/garnet/bin/system-update-checker/src/main.rs
+++ b/garnet/bin/system-update-checker/src/main.rs
@@ -50,12 +50,16 @@
fx_log_err!("while updating the target channel: {}", e);
}
- let channel_notifier =
- channel::CurrentChannelNotifier::new(connect::ServiceConnector, "/misc/ota");
- let channel_fut = channel_notifier.run();
+ let (current_channel_manager, current_channel_notifier) =
+ channel::build_current_channel_manager_and_notifier(
+ connect::ServiceConnector,
+ "/misc/ota",
+ )?;
+ let channel_fut = current_channel_notifier.run();
let update_manager = Arc::new(RealUpdateManager::new(
target_channel_manager,
+ current_channel_manager,
inspector.root().create_child("update-manager"),
));
let info_handler = InfoHandler::default();
diff --git a/garnet/bin/system-update-checker/src/poller.rs b/garnet/bin/system-update-checker/src/poller.rs
index 8bb7039..3542bf7 100644
--- a/garnet/bin/system-update-checker/src/poller.rs
+++ b/garnet/bin/system-update-checker/src/poller.rs
@@ -4,7 +4,9 @@
use crate::apply::Initiator;
use crate::config::Config;
-use crate::update_manager::{TargetChannelUpdater, UpdateApplier, UpdateChecker, UpdateManager};
+use crate::update_manager::{
+ CurrentChannelUpdater, TargetChannelUpdater, UpdateApplier, UpdateChecker, UpdateManager,
+};
use crate::update_monitor::StateChangeCallback;
use fidl_fuchsia_update::CheckStartedResult;
use fuchsia_async as fasync;
@@ -12,12 +14,13 @@
use futures::prelude::*;
use std::sync::Arc;
-pub fn run_periodic_update_check<T, C, A, S>(
- manager: Arc<UpdateManager<T, C, A, S>>,
+pub fn run_periodic_update_check<T, Ch, C, A, S>(
+ manager: Arc<UpdateManager<T, Ch, C, A, S>>,
config: &Config,
) -> impl Future<Output = ()>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
S: StateChangeCallback,
@@ -49,8 +52,8 @@
use super::*;
use crate::config::ConfigBuilder;
use crate::update_manager::tests::{
- FakeTargetChannelUpdater, FakeUpdateChecker, StateChangeCollector,
- UnreachableStateChangeCallback, UnreachableUpdateApplier,
+ FakeCurrentChannelUpdater, FakeTargetChannelUpdater, FakeUpdateChecker,
+ StateChangeCollector, UnreachableStateChangeCallback, UnreachableUpdateApplier,
};
use fidl_fuchsia_update::ManagerState;
use fuchsia_async::DurationExt;
@@ -62,9 +65,10 @@
let mut executor = fasync::Executor::new_with_fake_time().unwrap();
let checker = FakeUpdateChecker::new_up_to_date();
- let manager: UpdateManager<_, _, _, UnreachableStateChangeCallback> =
+ let manager: UpdateManager<_, _, _, _, UnreachableStateChangeCallback> =
UpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
checker.clone(),
UnreachableUpdateApplier,
);
@@ -84,6 +88,7 @@
let callback = StateChangeCollector::new();
let manager = UpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
checker.clone(),
UnreachableUpdateApplier,
);
@@ -133,6 +138,7 @@
let callback = StateChangeCollector::new();
let manager = UpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
checker.clone(),
UnreachableUpdateApplier,
);
diff --git a/garnet/bin/system-update-checker/src/update_manager.rs b/garnet/bin/system-update-checker/src/update_manager.rs
index 55e87c2..d06192e 100644
--- a/garnet/bin/system-update-checker/src/update_manager.rs
+++ b/garnet/bin/system-update-checker/src/update_manager.rs
@@ -3,7 +3,7 @@
// found in the LICENSE file.
use crate::apply::{apply_system_update, Initiator};
-use crate::channel::TargetChannelManager;
+use crate::channel::{CurrentChannelManager, TargetChannelManager};
use crate::check::{check_for_system_update, SystemUpdateStatus};
use crate::connect::ServiceConnect;
use crate::update_monitor::{State, StateChangeCallback, UpdateMonitor};
@@ -27,38 +27,47 @@
// periodically lock `monitor` to send status updates. Before an async task or thread can lock
// `updater`, it must release any locks on `monitor`.
//
-pub struct UpdateManager<T, C, A, S>
+pub struct UpdateManager<T, Ch, C, A, S>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
S: StateChangeCallback,
{
monitor: Arc<Mutex<UpdateMonitor<S>>>,
- updater: Arc<AsyncMutex<SystemInterface<T, C, A>>>,
+ updater: Arc<AsyncMutex<SystemInterface<T, Ch, C, A>>>,
}
-struct SystemInterface<T, C, A>
+struct SystemInterface<T, Ch, C, A>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
- channel_updater: T,
+ target_channel_updater: T,
+ current_channel_updater: Ch,
update_checker: C,
update_applier: A,
}
-impl<T, S> UpdateManager<T, RealUpdateChecker, RealUpdateApplier, S>
+impl<T, Ch, S> UpdateManager<T, Ch, RealUpdateChecker, RealUpdateApplier, S>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
S: StateChangeCallback,
{
- pub fn new(channel_updater: T, node: finspect::Node) -> Self {
+ pub fn new(
+ target_channel_updater: T,
+ current_channel_updater: Ch,
+ node: finspect::Node,
+ ) -> Self {
Self {
monitor: Arc::new(Mutex::new(UpdateMonitor::from_inspect_node(node))),
updater: Arc::new(AsyncMutex::new(SystemInterface::new(
- channel_updater,
+ target_channel_updater,
+ current_channel_updater,
RealUpdateChecker,
RealUpdateApplier,
))),
@@ -66,23 +75,26 @@
}
}
-impl<T, C, A, S> UpdateManager<T, C, A, S>
+impl<T, Ch, C, A, S> UpdateManager<T, Ch, C, A, S>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
S: StateChangeCallback,
{
#[cfg(test)]
pub fn from_checker_and_applier(
- channel_updater: T,
+ target_channel_updater: T,
+ current_channel_updater: Ch,
update_checker: C,
update_applier: A,
) -> Self {
Self {
monitor: Arc::new(Mutex::new(UpdateMonitor::new())),
updater: Arc::new(AsyncMutex::new(SystemInterface::new(
- channel_updater,
+ target_channel_updater,
+ current_channel_updater,
update_checker,
update_applier,
))),
@@ -127,14 +139,20 @@
}
}
-impl<T, C, A> SystemInterface<T, C, A>
+impl<T, Ch, C, A> SystemInterface<T, Ch, C, A>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
- pub fn new(channel_updater: T, update_checker: C, update_applier: A) -> Self {
- Self { channel_updater, update_checker, update_applier }
+ pub fn new(
+ target_channel_updater: T,
+ current_channel_updater: Ch,
+ update_checker: C,
+ update_applier: A,
+ ) -> Self {
+ Self { target_channel_updater, current_channel_updater, update_checker, update_applier }
}
async fn do_system_update_check_and_return_to_idle<S: StateChangeCallback>(
@@ -172,12 +190,13 @@
}
);
- self.channel_updater.update().await;
+ self.target_channel_updater.update().await;
match self.update_checker.check().await.context("check_for_system_update failed")? {
SystemUpdateStatus::UpToDate { system_image } => {
fx_log_info!("current system_image merkle: {}", system_image);
fx_log_info!("system_image is already up-to-date");
+ self.current_channel_updater.update().await;
return Ok(());
}
SystemUpdateStatus::UpdateAvailable { current_system_image, latest_system_image } => {
@@ -229,6 +248,19 @@
}
// For mocking
+pub trait CurrentChannelUpdater: Send + Sync + 'static {
+ fn update(&mut self) -> BoxFuture<'_, ()>;
+}
+
+impl CurrentChannelUpdater for CurrentChannelManager {
+ fn update(&mut self) -> BoxFuture<'_, ()> {
+ CurrentChannelManager::update(self)
+ .unwrap_or_else(|e| fx_log_err!("while updating current channel: {:?}", e))
+ .boxed()
+ }
+}
+
+// For mocking
pub trait UpdateApplier: Send + Sync + 'static {
fn apply(
&self,
@@ -337,6 +369,28 @@
}
#[derive(Clone)]
+ pub struct FakeCurrentChannelUpdater {
+ call_count: Arc<AtomicU64>,
+ }
+ impl FakeCurrentChannelUpdater {
+ pub fn new() -> Self {
+ Self { call_count: Arc::new(AtomicU64::new(0)) }
+ }
+ pub fn call_count(&self) -> u64 {
+ self.call_count.load(Ordering::SeqCst)
+ }
+ }
+ impl CurrentChannelUpdater for FakeCurrentChannelUpdater {
+ fn update(&mut self) -> BoxFuture<'_, ()> {
+ let call_count = self.call_count.clone();
+ async move {
+ call_count.fetch_add(1, Ordering::SeqCst);
+ }
+ .boxed()
+ }
+ }
+
+ #[derive(Clone)]
pub struct UnreachableUpdateApplier;
impl UpdateApplier for UnreachableUpdateApplier {
fn apply(
@@ -429,6 +483,7 @@
type FakeUpdateManager = UpdateManager<
FakeTargetChannelUpdater,
+ FakeCurrentChannelUpdater,
FakeUpdateChecker,
FakeUpdateApplier,
FakeStateChangeCallback,
@@ -436,6 +491,7 @@
type BlockingManagerManager = UpdateManager<
FakeTargetChannelUpdater,
+ FakeCurrentChannelUpdater,
BlockingUpdateChecker,
FakeUpdateApplier,
FakeStateChangeCallback,
@@ -486,6 +542,7 @@
fn test_correct_initial_state() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
@@ -498,6 +555,7 @@
let _executor = fasync::Executor::new().expect("create test executor");
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
@@ -509,6 +567,7 @@
async fn test_temporary_callbacks_dropped_after_update_attempt() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
@@ -536,6 +595,7 @@
async fn test_try_start_update_callback_when_up_to_date() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
FakeUpdateApplier::new_success(),
);
@@ -557,6 +617,7 @@
async fn test_try_start_update_callback_when_update_available_and_apply_errors() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
@@ -580,6 +641,7 @@
async fn test_try_start_update_callback_when_update_available_and_apply_succeeds() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_success(),
);
@@ -602,6 +664,7 @@
async fn test_permanent_callback_is_called() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
@@ -626,6 +689,7 @@
async fn test_permanent_callback_persists_across_attempts() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
@@ -665,6 +729,7 @@
let channel_updater = FakeTargetChannelUpdater::new();
let manager = UpdateManager::from_checker_and_applier(
channel_updater.clone(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_up_to_date(),
UnreachableUpdateApplier,
);
@@ -681,6 +746,7 @@
let update_applier = FakeUpdateApplier::new_error();
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
update_applier.clone(),
);
@@ -695,8 +761,10 @@
#[fasync::run_singlethreaded(test)]
async fn test_update_applier_not_called_if_up_to_date() {
let update_applier = FakeUpdateApplier::new_error();
+ let current_channel_updater = FakeCurrentChannelUpdater::new();
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ current_channel_updater.clone(),
FakeUpdateChecker::new_up_to_date(),
update_applier.clone(),
);
@@ -706,12 +774,14 @@
receiver.collect::<Vec<State>>().await;
assert_eq!(update_applier.call_count(), 0);
+ assert_eq!(current_channel_updater.call_count(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn test_return_to_initial_state_on_update_check_error() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_error(),
FakeUpdateApplier::new_error(),
);
@@ -727,6 +797,7 @@
async fn test_return_to_initial_state_on_update_apply_error() {
let manager = FakeUpdateManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
@@ -768,6 +839,7 @@
let (blocking_update_checker, _sender) = BlockingUpdateChecker::new_checker_and_sender();
let manager = BlockingManagerManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
blocking_update_checker,
FakeUpdateApplier::new_error(),
);
@@ -783,6 +855,7 @@
let update_applier = FakeUpdateApplier::new_error();
let manager = BlockingManagerManager::from_checker_and_applier(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
blocking_update_checker,
update_applier.clone(),
);
diff --git a/garnet/bin/system-update-checker/src/update_monitor.rs b/garnet/bin/system-update-checker/src/update_monitor.rs
index 903d6bf..2bc7707 100644
--- a/garnet/bin/system-update-checker/src/update_monitor.rs
+++ b/garnet/bin/system-update-checker/src/update_monitor.rs
@@ -276,7 +276,6 @@
);
}
}
-
}
#[cfg(test)]
diff --git a/garnet/bin/system-update-checker/src/update_service.rs b/garnet/bin/system-update-checker/src/update_service.rs
index e004f63..1629e09 100644
--- a/garnet/bin/system-update-checker/src/update_service.rs
+++ b/garnet/bin/system-update-checker/src/update_service.rs
@@ -3,11 +3,11 @@
// found in the LICENSE file.
use crate::apply::Initiator;
-use crate::channel::TargetChannelManager;
+use crate::channel::{CurrentChannelManager, TargetChannelManager};
use crate::connect::ServiceConnector;
use crate::update_manager::{
- RealUpdateApplier, RealUpdateChecker, TargetChannelUpdater, UpdateApplier, UpdateChecker,
- UpdateManager,
+ CurrentChannelUpdater, RealUpdateApplier, RealUpdateChecker, TargetChannelUpdater,
+ UpdateApplier, UpdateChecker, UpdateManager,
};
use crate::update_monitor::{State, StateChangeCallback};
use failure::{bail, Error, ResultExt};
@@ -20,28 +20,36 @@
use std::sync::Arc;
pub type RealTargetChannelUpdater = TargetChannelManager<ServiceConnector>;
-pub type RealUpdateService =
- UpdateService<RealTargetChannelUpdater, RealUpdateChecker, RealUpdateApplier>;
+pub type RealCurrentChannelUpdater = CurrentChannelManager;
+pub type RealUpdateService = UpdateService<
+ RealTargetChannelUpdater,
+ RealCurrentChannelUpdater,
+ RealUpdateChecker,
+ RealUpdateApplier,
+>;
pub type RealStateChangeCallback = MonitorControlHandle;
pub type RealUpdateManager = UpdateManager<
RealTargetChannelUpdater,
+ RealCurrentChannelUpdater,
RealUpdateChecker,
RealUpdateApplier,
RealStateChangeCallback,
>;
-pub struct UpdateService<T, C, A>
+pub struct UpdateService<T, Ch, C, A>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
- update_manager: Arc<UpdateManager<T, C, A, RealStateChangeCallback>>,
+ update_manager: Arc<UpdateManager<T, Ch, C, A, RealStateChangeCallback>>,
}
-impl<T, C, A> Clone for UpdateService<T, C, A>
+impl<T, Ch, C, A> Clone for UpdateService<T, Ch, C, A>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
@@ -56,9 +64,10 @@
}
}
-impl<T, C, A> UpdateService<T, C, A>
+impl<T, Ch, C, A> UpdateService<T, Ch, C, A>
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
@@ -162,7 +171,8 @@
mod tests {
use super::*;
use crate::update_manager::tests::{
- BlockingUpdateChecker, FakeTargetChannelUpdater, FakeUpdateApplier, FakeUpdateChecker,
+ BlockingUpdateChecker, FakeCurrentChannelUpdater, FakeTargetChannelUpdater,
+ FakeUpdateApplier, FakeUpdateChecker,
};
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_update::ManagerState;
@@ -184,20 +194,23 @@
fidl_fuchsia_update::Options { initiator: Some(fidl_fuchsia_update::Initiator::User) }
}
- fn spawn_update_service<T, C, A>(
+ fn spawn_update_service<T, Ch, C, A>(
channel_updater: T,
+ current_channel_updater: Ch,
update_checker: C,
update_applier: A,
- ) -> (ManagerProxy, UpdateService<T, C, A>)
+ ) -> (ManagerProxy, UpdateService<T, Ch, C, A>)
where
T: TargetChannelUpdater,
+ Ch: CurrentChannelUpdater,
C: UpdateChecker,
A: UpdateApplier,
{
- let update_service = UpdateService::<T, C, A> {
+ let update_service = UpdateService::<T, Ch, C, A> {
update_manager: Arc::new(
- UpdateManager::<T, C, A, RealStateChangeCallback>::from_checker_and_applier(
+ UpdateManager::<T, Ch, C, A, RealStateChangeCallback>::from_checker_and_applier(
channel_updater,
+ current_channel_updater,
update_checker,
update_applier,
),
@@ -235,6 +248,7 @@
async fn test_check_now_monitor_sees_on_state_events() {
let proxy = spawn_update_service(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
)
@@ -259,6 +273,7 @@
async fn test_add_monitor_sees_on_state_events() {
let proxy = spawn_update_service(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
)
@@ -285,6 +300,7 @@
async fn test_get_state() {
let proxy = spawn_update_service(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
)
@@ -300,6 +316,7 @@
async fn test_multiple_clients_see_on_state_events() {
let (proxy0, service) = spawn_update_service(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
FakeUpdateChecker::new_update_available(),
FakeUpdateApplier::new_error(),
);
@@ -346,6 +363,7 @@
let fake_update_applier = FakeUpdateApplier::new_error();
let (proxy0, service) = spawn_update_service(
FakeTargetChannelUpdater::new(),
+ FakeCurrentChannelUpdater::new(),
blocking_update_checker,
fake_update_applier.clone(),
);