| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::connect::*; |
| use failure::Fail; |
| use fidl_fuchsia_cobalt::{ |
| Status as CobaltStatus, SystemDataUpdaterMarker, SystemDataUpdaterProxy, |
| }; |
| use fidl_fuchsia_pkg_rewrite::EngineMarker; |
| use fuchsia_async as fasync; |
| use fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn}; |
| use fuchsia_url::pkg_url::PkgUrl; |
| use fuchsia_zircon as zx; |
| use futures::{ |
| channel::mpsc, |
| future::{self, Either, FutureExt}, |
| sink::SinkExt, |
| stream::StreamExt, |
| }; |
| use serde_derive::{Deserialize, Serialize}; |
| use serde_json; |
| use std::fs; |
| use std::io; |
| use std::path::{Path, PathBuf}; |
| use std::time::Duration; |
| |
| static CURRENT_CHANNEL: &'static str = "current_channel.json"; |
| static TARGET_CHANNEL: &'static str = "target_channel.json"; |
| |
| pub fn build_current_channel_manager_and_notifier<S: ServiceConnect>( |
| service_connector: S, |
| dir: impl Into<PathBuf>, |
| ) -> Result<(CurrentChannelManager, CurrentChannelNotifier<S>), failure::Error> { |
| let path = dir.into(); |
| let current_channel = read_current_channel(path.as_ref()).unwrap_or_else(|err| { |
| fx_log_err!( |
| "Error reading current_channel, defaulting to the empty string. This is expected before the first OTA. {}", |
| err |
| ); |
| String::new() |
| }); |
| |
| let (channel_sender, channel_receiver) = mpsc::channel(100); |
| |
| Ok(( |
| CurrentChannelManager::new(path, channel_sender), |
| CurrentChannelNotifier::new(service_connector, current_channel, channel_receiver), |
| )) |
| } |
| |
| pub struct CurrentChannelNotifier<S = ServiceConnector> { |
| service_connector: S, |
| initial_channel: String, |
| channel_receiver: mpsc::Receiver<String>, |
| } |
| |
| impl<S: ServiceConnect> CurrentChannelNotifier<S> { |
| fn new( |
| service_connector: S, |
| initial_channel: String, |
| channel_receiver: mpsc::Receiver<String>, |
| ) -> Self { |
| CurrentChannelNotifier { service_connector, initial_channel, channel_receiver } |
| } |
| |
| async fn notify_cobalt(service_connector: &S, current_channel: String) { |
| loop { |
| let cobalt = Self::connect(service_connector).await; |
| |
| fx_log_info!("calling cobalt.SetChannel(\"{}\")", current_channel); |
| |
| match cobalt.set_channel(¤t_channel).await { |
| Ok(CobaltStatus::Ok) => { |
| return; |
| } |
| Ok(CobaltStatus::EventTooBig) => { |
| fx_log_warn!("cobalt.SetChannel returned Status.EVENT_TOO_BIG, retrying"); |
| } |
| Ok(status) => { |
| // Not much we can do about the other status codes but log. |
| fx_log_err!("cobalt.SetChannel returned non-OK status: {:?}", status); |
| return; |
| } |
| Err(err) => { |
| // channel broken, so log the error and reconnect. |
| fx_log_warn!("cobalt.SetChannel returned error: {}, retrying", err); |
| } |
| } |
| |
| Self::sleep().await; |
| } |
| } |
| |
| pub async fn run(self) { |
| let Self { service_connector, initial_channel, mut channel_receiver } = self; |
| let mut notify_cobalt_task = |
| Self::notify_cobalt(&service_connector, initial_channel).boxed(); |
| |
| loop { |
| match future::select(channel_receiver.next(), notify_cobalt_task).await { |
| Either::Left((Some(current_channel), _)) => { |
| fx_log_warn!( |
| "notify_cobalt() overrun. Starting again with new channel: `{}`", |
| current_channel |
| ); |
| notify_cobalt_task = |
| Self::notify_cobalt(&service_connector, current_channel).boxed(); |
| } |
| Either::Left((None, notify_cobalt_future)) => { |
| fx_log_warn!( |
| "all channel_senders have been closed. No new messages will arrive." |
| ); |
| notify_cobalt_future.await; |
| return; |
| } |
| Either::Right((_, next_channel_fut)) => { |
| if let Some(current_channel) = next_channel_fut.await { |
| notify_cobalt_task = |
| Self::notify_cobalt(&service_connector, current_channel).boxed(); |
| } else { |
| fx_log_warn!( |
| "all channel_senders have been closed. No new messages will arrive." |
| ); |
| return; |
| } |
| } |
| } |
| } |
| } |
| |
| async fn connect(service_connector: &S) -> SystemDataUpdaterProxy { |
| loop { |
| match service_connector.connect_to_service::<SystemDataUpdaterMarker>() { |
| Ok(cobalt) => { |
| return cobalt; |
| } |
| Err(err) => { |
| fx_log_err!("error connecting to cobalt: {}", err); |
| Self::sleep().await |
| } |
| } |
| } |
| } |
| |
| async fn sleep() { |
| let delay = fasync::Time::after(Duration::from_secs(5).into()); |
| fasync::Timer::new(delay).await; |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct CurrentChannelManager { |
| path: PathBuf, |
| channel_sender: mpsc::Sender<String>, |
| } |
| |
| impl CurrentChannelManager { |
| fn new(path: PathBuf, channel_sender: mpsc::Sender<String>) -> Self { |
| CurrentChannelManager { path, channel_sender } |
| } |
| |
| pub async fn update(&mut self) -> Result<(), failure::Error> { |
| let target_channel = read_channel(&self.path.join(TARGET_CHANNEL))?; |
| if target_channel != read_current_channel(&self.path).ok().unwrap_or_else(String::new) { |
| write_channel(&self.path.join(CURRENT_CHANNEL), &target_channel)?; |
| self.channel_sender.send(target_channel).await?; |
| } |
| Ok(()) |
| } |
| } |
| |
| pub struct TargetChannelManager<S = ServiceConnector> { |
| service_connector: S, |
| path: PathBuf, |
| target_channel: Option<String>, |
| } |
| |
| impl<S: ServiceConnect> TargetChannelManager<S> { |
| pub fn new(service_connector: S, dir: impl Into<PathBuf>) -> Self { |
| let mut path = dir.into(); |
| path.push(TARGET_CHANNEL); |
| let target_channel = read_channel(&path).ok(); |
| |
| Self { service_connector, path, target_channel } |
| } |
| |
| pub async fn update(&mut self) -> Result<(), failure::Error> { |
| let target_channel = self.lookup_target_channel().await?; |
| if self.target_channel.as_ref().map_or(false, |c| c == &target_channel) { |
| return Ok(()); |
| } |
| |
| write_channel(&self.path, target_channel.clone())?; |
| self.target_channel = Some(target_channel); |
| Ok(()) |
| } |
| |
| async fn lookup_target_channel(&self) -> Result<String, failure::Error> { |
| let rewrite_engine = self.service_connector.connect_to_service::<EngineMarker>()?; |
| let rewritten: PkgUrl = rewrite_engine |
| .test_apply("fuchsia-pkg://fuchsia.com/update/0") |
| .await? |
| .map_err(|s| zx::Status::from_raw(s))? |
| .parse()?; |
| let channel = host_to_channel(rewritten.host()); |
| |
| Ok(channel.to_owned()) |
| } |
| } |
| |
| #[derive(Serialize, Deserialize)] |
| #[serde(tag = "version", content = "content", deny_unknown_fields)] |
| enum Channel { |
| #[serde(rename = "1")] |
| Version1 { legacy_amber_source_name: String }, |
| } |
| |
| fn read_current_channel(p: &Path) -> Result<String, Error> { |
| read_channel(p.join(CURRENT_CHANNEL)) |
| } |
| |
| fn read_channel(path: impl AsRef<Path>) -> Result<String, Error> { |
| let f = fs::File::open(path.as_ref())?; |
| match serde_json::from_reader(f)? { |
| Channel::Version1 { legacy_amber_source_name } => Ok(legacy_amber_source_name), |
| } |
| } |
| |
| fn write_channel(path: impl AsRef<Path>, channel: impl Into<String>) -> Result<(), io::Error> { |
| let path = path.as_ref(); |
| let channel = Channel::Version1 { legacy_amber_source_name: channel.into() }; |
| |
| let mut temp_path = path.to_owned().into_os_string(); |
| temp_path.push(".new"); |
| let temp_path = PathBuf::from(temp_path); |
| { |
| if let Some(dir) = temp_path.parent() { |
| fs::create_dir_all(dir)?; |
| } |
| let f = fs::File::create(&temp_path)?; |
| serde_json::to_writer(f, &channel)?; |
| }; |
| fs::rename(temp_path, path) |
| } |
| |
| #[derive(Debug, Fail)] |
| enum Error { |
| #[fail(display = "io error: {}", _0)] |
| Io(#[cause] io::Error), |
| |
| #[fail(display = "json error: {}", _0)] |
| Json(#[cause] serde_json::Error), |
| } |
| |
| impl From<io::Error> for Error { |
| fn from(err: io::Error) -> Self { |
| Error::Io(err) |
| } |
| } |
| |
| impl From<serde_json::Error> for Error { |
| fn from(err: serde_json::Error) -> Self { |
| Error::Json(err) |
| } |
| } |
| |
| fn host_to_channel(host: &str) -> &str { |
| if let Some(n) = host.rfind(".fuchsia.com") { |
| let (prefix, _) = host.split_at(n); |
| prefix.split('.').nth(1).unwrap_or(host) |
| } else { |
| host |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use fidl::endpoints::{DiscoverableService, RequestStream}; |
| use fidl_fuchsia_cobalt::{SystemDataUpdaterRequest, SystemDataUpdaterRequestStream}; |
| use fidl_fuchsia_pkg_rewrite::{EngineRequest, EngineRequestStream}; |
| use fuchsia_async::DurationExt; |
| use fuchsia_component::server::ServiceFs; |
| use fuchsia_zircon::DurationNum; |
| use futures::prelude::*; |
| use futures::task::Poll; |
| use matches::assert_matches; |
| use parking_lot::Mutex; |
| use serde_json::{json, Value}; |
| use std::sync::Arc; |
| use tempfile; |
| |
| #[test] |
| fn test_host_to_channel_identities() { |
| for s in vec![ |
| "devhost", |
| "fuchsia.com", |
| "example.com", |
| "test.fuchsia.com", |
| "test.example.com", |
| "a.b-c.d.example.com", |
| ] { |
| assert_eq!(host_to_channel(s), s); |
| } |
| } |
| |
| #[test] |
| fn test_host_to_channel_extracts_proper_subdomain() { |
| assert_eq!(host_to_channel("a.b-c.d.fuchsia.com"), "b-c"); |
| } |
| |
| #[test] |
| fn test_read_current_channel() { |
| let dir = tempfile::tempdir().unwrap(); |
| |
| fs::write( |
| dir.path().join(CURRENT_CHANNEL), |
| r#"{"version":"1","content":{"legacy_amber_source_name":"stable"}}"#, |
| ) |
| .unwrap(); |
| |
| assert_matches!(read_current_channel(dir.path()), Ok(ref channel) if channel == "stable"); |
| } |
| |
| #[test] |
| fn test_write_channel() { |
| let dir = tempfile::tempdir().unwrap(); |
| let path = dir.path().join("channel.json"); |
| |
| assert_matches!(write_channel(&path, "test"), Ok(())); |
| |
| let f = fs::File::open(path).expect("file to exist"); |
| let value: Value = serde_json::from_reader(f).expect("valid json"); |
| assert_eq!( |
| value, |
| json!({ |
| "version": "1", |
| "content": { |
| "legacy_amber_source_name": "test", |
| } |
| }) |
| ); |
| } |
| |
| #[test] |
| fn test_write_channel_create_subdir() { |
| let dir = tempfile::tempdir().unwrap(); |
| let path = dir.path().join("subdir").join("channel.json"); |
| |
| assert_matches!(write_channel(&path, "test"), Ok(())); |
| |
| let f = fs::File::open(path).expect("file to exist"); |
| let value: Value = serde_json::from_reader(f).expect("valid json"); |
| assert_eq!( |
| value, |
| json!({ |
| "version": "1", |
| "content": { |
| "legacy_amber_source_name": "test", |
| } |
| }) |
| ); |
| } |
| |
| #[test] |
| fn test_read_current_channel_rejects_invalid_json() { |
| let dir = tempfile::tempdir().unwrap(); |
| |
| fs::write(dir.path().join(CURRENT_CHANNEL), "no channel here").unwrap(); |
| |
| assert_matches!(read_current_channel(dir.path()), Err(Error::Json(_))); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_current_channel_notifier() { |
| let dir = tempfile::tempdir().unwrap(); |
| let current_channel_path = dir.path().join(CURRENT_CHANNEL); |
| |
| fs::write( |
| ¤t_channel_path, |
| r#"{"version":"1","content":{"legacy_amber_source_name":"stable"}}"#, |
| ) |
| .unwrap(); |
| |
| let (connector, svc_dir) = |
| NamespacedServiceConnector::bind("/test/current_channel_manager/svc") |
| .expect("ns to bind"); |
| let (_, c) = build_current_channel_manager_and_notifier(connector, dir.path()).unwrap(); |
| |
| let mut fs = ServiceFs::new_local(); |
| let channel = Arc::new(Mutex::new(None)); |
| let chan = channel.clone(); |
| |
| fs.add_fidl_service(move |mut stream: SystemDataUpdaterRequestStream| { |
| let chan = chan.clone(); |
| |
| fasync::spawn_local(async move { |
| while let Some(req) = stream.try_next().await.unwrap_or(None) { |
| match req { |
| SystemDataUpdaterRequest::SetChannel { current_channel, responder } => { |
| *chan.lock() = Some(current_channel); |
| responder.send(CobaltStatus::Ok).unwrap(); |
| } |
| _ => unreachable!(), |
| } |
| } |
| }) |
| }) |
| .serve_connection(svc_dir) |
| .expect("serve_connection"); |
| |
| fasync::spawn_local(fs.collect()); |
| |
| c.run().await; |
| |
| assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("stable")); |
| } |
| |
| #[test] |
| fn test_current_channel_notifier_retries() { |
| let mut executor = fasync::Executor::new_with_fake_time().unwrap(); |
| |
| let dir = tempfile::tempdir().unwrap(); |
| let current_channel_path = dir.path().join(CURRENT_CHANNEL); |
| |
| write_channel(¤t_channel_path, "stable").unwrap(); |
| |
| #[derive(Debug, Clone)] |
| enum FlakeMode { |
| ErrorOnConnect, |
| DropConnection, |
| StatusOnCall(CobaltStatus), |
| } |
| |
| #[derive(Debug, Clone)] |
| struct State { |
| mode: Option<FlakeMode>, |
| channel: Option<String>, |
| connect_count: u64, |
| call_count: u64, |
| } |
| |
| #[derive(Clone, Debug)] |
| struct FlakeyServiceConnector { |
| state: Arc<Mutex<State>>, |
| }; |
| |
| impl FlakeyServiceConnector { |
| fn new() -> Self { |
| Self { |
| state: Arc::new(Mutex::new(State { |
| mode: Some(FlakeMode::ErrorOnConnect), |
| channel: None, |
| connect_count: 0, |
| call_count: 0, |
| })), |
| } |
| } |
| fn set_flake_mode(&self, mode: impl Into<Option<FlakeMode>>) { |
| self.state.lock().mode = mode.into(); |
| } |
| fn channel(&self) -> Option<String> { |
| self.state.lock().channel.clone() |
| } |
| fn connect_count(&self) -> u64 { |
| self.state.lock().connect_count |
| } |
| fn call_count(&self) -> u64 { |
| self.state.lock().call_count |
| } |
| } |
| |
| impl ServiceConnect for FlakeyServiceConnector { |
| fn connect_to_service<S: DiscoverableService>( |
| &self, |
| ) -> Result<S::Proxy, failure::Error> { |
| assert_eq!(S::SERVICE_NAME, SystemDataUpdaterMarker::SERVICE_NAME); |
| self.state.lock().connect_count += 1; |
| match self.state.lock().mode { |
| Some(FlakeMode::ErrorOnConnect) => failure::bail!("test error on connect"), |
| Some(FlakeMode::DropConnection) => { |
| let (proxy, _stream) = fidl::endpoints::create_proxy::<S>().unwrap(); |
| Ok(proxy) |
| } |
| Some(FlakeMode::StatusOnCall(status)) => { |
| let (proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<S>().unwrap(); |
| let mut stream: SystemDataUpdaterRequestStream = stream.cast_stream(); |
| |
| let state = self.state.clone(); |
| fasync::spawn_local(async move { |
| while let Some(req) = stream.try_next().await.unwrap() { |
| match req { |
| SystemDataUpdaterRequest::SetChannel { |
| current_channel: _current_channel, |
| responder, |
| } => { |
| state.lock().call_count += 1; |
| responder.send(status).unwrap(); |
| } |
| _ => unreachable!(), |
| } |
| } |
| }); |
| Ok(proxy) |
| } |
| None => { |
| let (proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<S>().unwrap(); |
| let mut stream: SystemDataUpdaterRequestStream = stream.cast_stream(); |
| |
| let state = self.state.clone(); |
| fasync::spawn_local(async move { |
| while let Some(req) = stream.try_next().await.unwrap() { |
| match req { |
| SystemDataUpdaterRequest::SetChannel { |
| current_channel, |
| responder, |
| } => { |
| state.lock().call_count += 1; |
| state.lock().channel = Some(current_channel); |
| responder.send(CobaltStatus::Ok).unwrap(); |
| } |
| _ => unreachable!(), |
| } |
| } |
| }); |
| Ok(proxy) |
| } |
| } |
| } |
| } |
| |
| let connector = FlakeyServiceConnector::new(); |
| let (_, c) = build_current_channel_manager_and_notifier(connector.clone(), dir.path()) |
| .expect("failed to construct channel_manager"); |
| let mut task = c.run().boxed(); |
| assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending); |
| |
| // Retries if connecting fails |
| assert_eq!(executor.wake_expired_timers(), false); |
| executor.set_fake_time(5.seconds().after_now()); |
| assert_eq!(executor.wake_expired_timers(), true); |
| assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending); |
| assert_eq!(connector.connect_count(), 2); |
| |
| // Retries if a fidl error occurs during the request |
| connector.set_flake_mode(FlakeMode::DropConnection); |
| executor.set_fake_time(5.seconds().after_now()); |
| assert_eq!(executor.wake_expired_timers(), true); |
| assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending); |
| assert_eq!(connector.connect_count(), 3); |
| |
| // Retries on expected Cobalt error status codes |
| connector.set_flake_mode(FlakeMode::StatusOnCall(CobaltStatus::EventTooBig)); |
| executor.set_fake_time(5.seconds().after_now()); |
| assert_eq!(connector.call_count(), 0); |
| assert_eq!(executor.wake_expired_timers(), true); |
| assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending); |
| assert_eq!(connector.connect_count(), 4); |
| assert_eq!(connector.call_count(), 1); |
| |
| // Stops trying when it eventually succeeds |
| connector.set_flake_mode(None); |
| executor.set_fake_time(5.seconds().after_now()); |
| assert_eq!(executor.wake_expired_timers(), true); |
| assert_eq!(connector.channel(), None); |
| assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(())); |
| assert_eq!(connector.connect_count(), 5); |
| assert_eq!(connector.call_count(), 2); |
| assert_eq!(connector.channel(), Some("stable".to_owned())); |
| |
| // Bails out if Cobalt responds with an unexpected status code |
| let connector = FlakeyServiceConnector::new(); |
| let (_, c) = build_current_channel_manager_and_notifier(connector.clone(), dir.path()) |
| .expect("failed to construct channel_manager"); |
| let mut task = c.run().boxed(); |
| connector.set_flake_mode(FlakeMode::StatusOnCall(CobaltStatus::InvalidArguments)); |
| assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(())); |
| assert_eq!(connector.connect_count(), 1); |
| assert_eq!(connector.call_count(), 1); |
| } |
| |
| #[test] |
| fn test_current_channel_manager_writes_channel() { |
| let mut exec = fasync::Executor::new().expect("Unable to create executor"); |
| |
| let dir = tempfile::tempdir().unwrap(); |
| let target_channel_path = dir.path().join(TARGET_CHANNEL); |
| let current_channel_path = dir.path().join(CURRENT_CHANNEL); |
| |
| let target_connector = RewriteServiceConnector::new("fuchsia-pkg://devhost/update/0"); |
| let mut target_channel_manager = |
| TargetChannelManager::new(target_connector.clone(), dir.path()); |
| |
| let (current_connector, svc_dir) = |
| NamespacedServiceConnector::bind("/test/current_channel_manager2/svc") |
| .expect("ns to bind"); |
| let (mut current_channel_manager, current_channel_notifier) = |
| build_current_channel_manager_and_notifier(current_connector, dir.path()).unwrap(); |
| |
| let mut fs = ServiceFs::new_local(); |
| let channel = Arc::new(Mutex::new(None)); |
| let chan = channel.clone(); |
| |
| fs.add_fidl_service(move |mut stream: SystemDataUpdaterRequestStream| { |
| let chan = chan.clone(); |
| |
| fasync::spawn_local(async move { |
| while let Some(req) = stream.try_next().await.unwrap_or(None) { |
| match req { |
| SystemDataUpdaterRequest::SetChannel { current_channel, responder } => { |
| *chan.lock() = Some(current_channel); |
| responder.send(CobaltStatus::Ok).unwrap(); |
| } |
| _ => unreachable!(), |
| } |
| } |
| }) |
| }) |
| .serve_connection(svc_dir) |
| .expect("serve_connection"); |
| |
| fasync::spawn_local(fs.collect()); |
| |
| let mut notify_fut = current_channel_notifier.run().boxed(); |
| assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending); |
| |
| assert_matches!(read_channel(¤t_channel_path), Err(_)); |
| assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("")); |
| |
| exec.run_singlethreaded(target_channel_manager.update()) |
| .expect("channel update to succeed"); |
| exec.run_singlethreaded(current_channel_manager.update()) |
| .expect("current channel update to succeed"); |
| assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending); |
| |
| assert_eq!(read_channel(¤t_channel_path).unwrap(), "devhost"); |
| assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("devhost")); |
| |
| // Even if the current_channel is already known, it should be overwritten. |
| write_channel(&target_channel_path, "different").unwrap(); |
| exec.run_singlethreaded(current_channel_manager.update()) |
| .expect("current channel update to succeed"); |
| assert_eq!(exec.run_until_stalled(&mut notify_fut), Poll::Pending); |
| |
| assert_eq!(read_channel(¤t_channel_path).unwrap(), "different"); |
| assert_eq!(channel.lock().as_ref().map(|s| s.as_str()), Some("different")); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_target_channel_manager_writes_channel() { |
| let dir = tempfile::tempdir().unwrap(); |
| let target_channel_path = dir.path().join(TARGET_CHANNEL); |
| |
| let connector = RewriteServiceConnector::new("fuchsia-pkg://devhost/update/0"); |
| let mut channel_manager = TargetChannelManager::new(connector.clone(), dir.path()); |
| |
| // First write of the file with the correct data. |
| assert_matches!(read_channel(&target_channel_path), Err(_)); |
| channel_manager.update().await.expect("channel update to succeed"); |
| assert_eq!(read_channel(&target_channel_path).unwrap(), "devhost"); |
| |
| // If the file changes while the service is running, an update doesn't know to replace it. |
| write_channel(&target_channel_path, "unique").unwrap(); |
| channel_manager.update().await.expect("channel update to succeed"); |
| assert_eq!(read_channel(&target_channel_path).unwrap(), "unique"); |
| |
| // If the update package changes, however, the file will be updated. |
| connector.set("fuchsia-pkg://hello.world.fuchsia.com/update/0"); |
| channel_manager.update().await.expect("channel update to succeed"); |
| assert_eq!(read_channel(&target_channel_path).unwrap(), "world"); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_target_channel_manager_recovers_from_corrupt_data() { |
| let dir = tempfile::tempdir().unwrap(); |
| let target_channel_path = dir.path().join(TARGET_CHANNEL); |
| |
| fs::write(&target_channel_path, r#"invalid json"#).unwrap(); |
| |
| let connector = RewriteServiceConnector::new("fuchsia-pkg://a.b.c.fuchsia.com/update/0"); |
| let mut channel_manager = TargetChannelManager::new(connector, dir.path()); |
| |
| assert!(read_channel(&target_channel_path).is_err()); |
| channel_manager.update().await.expect("channel update to succeed"); |
| assert_eq!(read_channel(&target_channel_path).unwrap(), "b"); |
| } |
| |
| #[derive(Clone)] |
| struct RewriteServiceConnector { |
| target: Arc<Mutex<String>>, |
| } |
| |
| impl RewriteServiceConnector { |
| fn new(target: impl Into<String>) -> Self { |
| Self { target: Arc::new(Mutex::new(target.into())) } |
| } |
| fn set(&self, target: impl Into<String>) { |
| *self.target.lock() = target.into(); |
| } |
| } |
| |
| impl ServiceConnect for RewriteServiceConnector { |
| fn connect_to_service<S: DiscoverableService>(&self) -> Result<S::Proxy, failure::Error> { |
| let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<S>().unwrap(); |
| assert_eq!(S::SERVICE_NAME, EngineMarker::SERVICE_NAME); |
| let mut stream: EngineRequestStream = stream.cast_stream(); |
| |
| let target = self.target.lock().clone(); |
| fasync::spawn_local(async move { |
| while let Some(req) = stream.try_next().await.unwrap() { |
| match req { |
| EngineRequest::TestApply { url, responder } => { |
| assert_eq!(url, "fuchsia-pkg://fuchsia.com/update/0"); |
| |
| responder.send(&mut Ok(target.clone())).unwrap(); |
| } |
| _ => unreachable!(), |
| } |
| } |
| }); |
| Ok(proxy) |
| } |
| } |
| } |