| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::{ |
| common::{App, AppSet, CheckOptions, CheckTiming}, |
| configuration::Config, |
| http_request::{self, HttpRequest}, |
| installer::{Installer, Plan}, |
| metrics::{ClockType, Metrics, MetricsReporter, UpdateCheckFailureReason}, |
| policy::{CheckDecision, PolicyEngine, UpdateDecision}, |
| protocol::{ |
| self, |
| request::{Event, EventErrorCode, EventResult, EventType, InstallSource, GUID}, |
| response::{parse_json_response, OmahaStatus, Response}, |
| }, |
| request_builder::{self, RequestBuilder, RequestParams}, |
| storage::{Storage, StorageExt}, |
| time::{ComplexTime, PartialComplexTime, TimeSource, Timer}, |
| }; |
| |
| use anyhow::anyhow; |
| use futures::{ |
| channel::{mpsc, oneshot}, |
| future::{self, BoxFuture, Fuse}, |
| lock::Mutex, |
| prelude::*, |
| select, |
| }; |
| use http::{response::Parts, Response as HttpResponse}; |
| use log::{error, info, warn}; |
| use std::{ |
| cmp::min, |
| convert::TryInto, |
| rc::Rc, |
| str::Utf8Error, |
| time::{Duration, Instant, SystemTime}, |
| }; |
| use thiserror::Error; |
| |
| pub mod update_check; |
| |
| mod builder; |
| pub use builder::StateMachineBuilder; |
| |
| mod observer; |
| use observer::StateMachineProgressObserver; |
| pub use observer::{InstallProgress, StateMachineEvent}; |
| |
| const INSTALL_PLAN_ID: &str = "install_plan_id"; |
| const UPDATE_FIRST_SEEN_TIME: &str = "update_first_seen_time"; |
| const UPDATE_FINISH_TIME: &str = "update_finish_time"; |
| const TARGET_VERSION: &str = "target_version"; |
| const CONSECUTIVE_FAILED_INSTALL_ATTEMPTS: &str = "consecutive_failed_install_attempts"; |
| // How long do we wait after not allowed to reboot to check again. |
| const CHECK_REBOOT_ALLOWED_INTERVAL: Duration = Duration::from_secs(30 * 60); |
| // This header contains the number of seconds client must not contact server again. |
| const X_RETRY_AFTER: &str = "X-Retry-After"; |
| // How many requests we will make to Omaha before giving up. |
| const MAX_OMAHA_REQUEST_ATTEMPTS: u64 = 3; |
| |
| /// This is the core state machine for a client's update check. It is instantiated and used to |
| /// perform update checks over time or to perform a single update check process. |
| #[derive(Debug)] |
| pub struct StateMachine<PE, HR, IN, TM, MR, ST> |
| where |
| PE: PolicyEngine, |
| HR: HttpRequest, |
| IN: Installer, |
| TM: Timer, |
| MR: MetricsReporter, |
| ST: Storage, |
| { |
| /// The immutable configuration of the client itself. |
| config: Config, |
| |
| policy_engine: PE, |
| |
| http: HR, |
| |
| installer: IN, |
| |
| timer: TM, |
| |
| time_source: PE::TimeSource, |
| |
| metrics_reporter: MR, |
| |
| storage_ref: Rc<Mutex<ST>>, |
| |
| /// Context for update check. |
| context: update_check::Context, |
| |
| /// The current State of the StateMachine. |
| state: State, |
| |
| /// The list of apps used for update check. |
| app_set: AppSet, |
| } |
| |
| #[derive(Copy, Clone, Debug, Eq, PartialEq)] |
| pub enum State { |
| Idle, |
| CheckingForUpdates, |
| ErrorCheckingForUpdate, |
| NoUpdateAvailable, |
| InstallationDeferredByPolicy, |
| InstallingUpdate, |
| WaitingForReboot, |
| InstallationError, |
| } |
| |
| /// This is the set of errors that can occur when making a request to Omaha. This is an internal |
| /// collection of error types. |
| #[derive(Error, Debug)] |
| pub enum OmahaRequestError { |
| #[error("Unexpected JSON error constructing update check: {0}")] |
| Json(#[from] serde_json::Error), |
| |
| #[error("Error building update check HTTP request: {0}")] |
| HttpBuilder(#[from] http::Error), |
| |
| // TODO: This still contains hyper user error which should be split out. |
| #[error("HTTP transport error performing update check: {0}")] |
| HttpTransport(#[from] http_request::Error), |
| |
| #[error("HTTP error performing update check: {0}")] |
| HttpStatus(hyper::StatusCode), |
| } |
| |
| impl From<request_builder::Error> for OmahaRequestError { |
| fn from(err: request_builder::Error) -> Self { |
| match err { |
| request_builder::Error::Json(e) => OmahaRequestError::Json(e), |
| request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e), |
| } |
| } |
| } |
| |
| impl From<http::StatusCode> for OmahaRequestError { |
| fn from(sc: http::StatusCode) -> Self { |
| OmahaRequestError::HttpStatus(sc) |
| } |
| } |
| |
| /// This is the set of errors that can occur when parsing the response body from Omaha. This is an |
| /// internal collection of error types. |
| #[derive(Error, Debug)] |
| #[allow(dead_code)] |
| pub enum ResponseParseError { |
| #[error("Response was not valid UTF-8")] |
| Utf8(Utf8Error), |
| |
| #[error("Unexpected JSON error parsing update check response: {}", _0)] |
| Json(serde_json::Error), |
| } |
| |
| #[derive(Error, Debug)] |
| pub enum UpdateCheckError { |
| #[error("Error checking with Omaha: {:?}", _0)] |
| OmahaRequest(OmahaRequestError), |
| |
| #[error("Error parsing Omaha response: {:?}", _0)] |
| ResponseParser(ResponseParseError), |
| |
| #[error("Unable to create an install plan: {:?}", _0)] |
| InstallPlan(anyhow::Error), |
| } |
| |
| /// A handle to interact with the state machine running in another task. |
| #[derive(Clone)] |
| pub struct ControlHandle(mpsc::Sender<ControlRequest>); |
| |
| /// Error indicating that the state machine task no longer exists. |
| #[derive(Debug, Clone, Error, PartialEq, Eq)] |
| #[error("state machine dropped before all its control handles")] |
| pub struct StateMachineGone; |
| |
| impl From<mpsc::SendError> for StateMachineGone { |
| fn from(_: mpsc::SendError) -> Self { |
| StateMachineGone |
| } |
| } |
| |
| impl From<oneshot::Canceled> for StateMachineGone { |
| fn from(_: oneshot::Canceled) -> Self { |
| StateMachineGone |
| } |
| } |
| |
| enum ControlRequest { |
| StartUpdateCheck { options: CheckOptions, responder: oneshot::Sender<StartUpdateCheckResponse> }, |
| } |
| |
| /// Responses to a request to start an update check now. |
| #[derive(Debug, Clone, PartialEq, Eq)] |
| pub enum StartUpdateCheckResponse { |
| /// The state machine was idle and the request triggered an update check. |
| Started, |
| |
| /// The state machine was already processing an update check and ignored this request and |
| /// options. |
| AlreadyRunning, |
| |
| /// The update check was throttled by policy. |
| Throttled, |
| } |
| |
| impl ControlHandle { |
| /// Ask the state machine to start an update check with the provided options, returning whether |
| /// or not the state machine started a check or was already running one. |
| pub async fn start_update_check( |
| &mut self, |
| options: CheckOptions, |
| ) -> Result<StartUpdateCheckResponse, StateMachineGone> { |
| let (responder, receive_response) = oneshot::channel(); |
| self.0.send(ControlRequest::StartUpdateCheck { options, responder }).await?; |
| Ok(receive_response.await?) |
| } |
| } |
| |
| impl<PE, HR, IN, TM, MR, ST> StateMachine<PE, HR, IN, TM, MR, ST> |
| where |
| PE: PolicyEngine, |
| HR: HttpRequest, |
| IN: Installer, |
| TM: Timer, |
| MR: MetricsReporter, |
| ST: Storage, |
| { |
| /// Ask policy engine for the next update check time and update the context and yield event. |
| async fn update_next_update_time( |
| &mut self, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) -> CheckTiming { |
| let apps = self.app_set.to_vec().await; |
| let timing = self |
| .policy_engine |
| .compute_next_update_time(&apps, &self.context.schedule, &self.context.state) |
| .await; |
| self.context.schedule.next_update_time = Some(timing); |
| |
| co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule.clone())).await; |
| info!("Calculated check timing: {}", timing); |
| timing |
| } |
| |
| /// Return a future that will wait until the given check timing. |
| async fn make_wait_to_next_check( |
| &mut self, |
| check_timing: CheckTiming, |
| ) -> Fuse<BoxFuture<'static, ()>> { |
| if let Some(minimum_wait) = check_timing.minimum_wait { |
| // If there's a minimum wait, also wait at least that long, by joining the two |
| // timers so that both need to be true (in case `next_update_time` turns out to be |
| // very close to now) |
| future::join( |
| self.timer.wait_for(minimum_wait), |
| self.timer.wait_until(check_timing.time), |
| ) |
| .map(|_| ()) |
| .boxed() |
| .fuse() |
| } else { |
| // Otherwise just setup the timer for the waiting until the next time. This is a |
| // wait until either the monotonic or wall times have passed. |
| self.timer.wait_until(check_timing.time).fuse() |
| } |
| } |
| |
| async fn run( |
| mut self, |
| mut control: mpsc::Receiver<ControlRequest>, |
| mut co: async_generator::Yield<StateMachineEvent>, |
| ) { |
| if !self.app_set.valid().await { |
| error!( |
| "App set not valid, not starting state machine: {:#?}", |
| self.app_set.to_vec().await |
| ); |
| return; |
| } |
| |
| let state_machine_start_monotonic_time = self.time_source.now_in_monotonic(); |
| |
| let mut should_report_waited_for_reboot_duration = false; |
| |
| let update_finish_time = { |
| let storage = self.storage_ref.lock().await; |
| let update_finish_time = storage.get_time(UPDATE_FINISH_TIME).await; |
| if update_finish_time.is_some() { |
| if let Some(target_version) = storage.get_string(TARGET_VERSION).await { |
| if target_version == self.config.os.version { |
| should_report_waited_for_reboot_duration = true; |
| } |
| } |
| } |
| update_finish_time |
| }; |
| |
| loop { |
| info!("Initial context: {:?}", self.context); |
| |
| if should_report_waited_for_reboot_duration { |
| match self.report_waited_for_reboot_duration( |
| update_finish_time.unwrap(), |
| state_machine_start_monotonic_time, |
| self.time_source.now(), |
| ) { |
| Ok(()) => { |
| // If the report was successful, don't try again on the next loop. |
| should_report_waited_for_reboot_duration = false; |
| |
| let mut storage = self.storage_ref.lock().await; |
| storage.remove_or_log(UPDATE_FINISH_TIME).await; |
| storage.remove_or_log(TARGET_VERSION).await; |
| storage.commit_or_log().await; |
| } |
| Err(e) => { |
| warn!("Couldn't report wait for reboot duration: {:#}, will try again", e); |
| } |
| } |
| } |
| |
| let (mut options, responder) = { |
| let check_timing = self.update_next_update_time(&mut co).await; |
| let mut wait_to_next_check = self.make_wait_to_next_check(check_timing).await; |
| |
| // Wait for either the next check time or a request to start an update check. Use |
| // the default check options with the timed check, or those sent with a request. |
| select! { |
| () = wait_to_next_check => (CheckOptions::default(), None), |
| ControlRequest::StartUpdateCheck{options, responder} = control.select_next_some() => { |
| (options, Some(responder)) |
| } |
| } |
| }; |
| |
| { |
| let apps = self.app_set.to_vec().await; |
| info!("Checking to see if an update check is allowed at this time for {:?}", apps); |
| let decision = self |
| .policy_engine |
| .update_check_allowed( |
| &apps, |
| &self.context.schedule, |
| &self.context.state, |
| &options, |
| ) |
| .await; |
| |
| info!("The update check decision is: {:?}", decision); |
| |
| let request_params = match decision { |
| // Positive results, will continue with the update check process |
| CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp, |
| |
| // Negative results, exit early |
| CheckDecision::TooSoon |
| | CheckDecision::ThrottledByPolicy |
| | CheckDecision::DeniedByPolicy => { |
| info!("The update check is not allowed at this time."); |
| if let Some(responder) = responder { |
| let _ = responder.send(StartUpdateCheckResponse::Throttled); |
| } |
| continue; |
| } |
| }; |
| if let Some(responder) = responder { |
| let _ = responder.send(StartUpdateCheckResponse::Started); |
| } |
| |
| // "start" the update check itself (well, create the future that is the update check) |
| let update_check = self.start_update_check(request_params, &mut co).fuse(); |
| futures::pin_mut!(update_check); |
| |
| // Wait for the update check to complete, handling any control requests that come in |
| // during the check. |
| loop { |
| select! { |
| () = update_check => break, |
| ControlRequest::StartUpdateCheck{ |
| options: new_options, |
| responder |
| } = control.select_next_some() => { |
| if new_options.source == InstallSource::OnDemand { |
| info!("Got on demand update check request, ensuring ongoing check is on demand"); |
| // TODO(63180): merge CheckOptions in Policy, not here. |
| options.source = InstallSource::OnDemand; |
| } |
| |
| let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning); |
| } |
| } |
| } |
| } |
| |
| // TODO: This is the last place we read self.state, we should see if we can find another |
| // way to achieve this so that we can remove self.state entirely. |
| if self.state == State::WaitingForReboot { |
| self.wait_for_reboot(options, &mut control, &mut co).await; |
| } |
| |
| self.set_state(State::Idle, &mut co).await; |
| } |
| } |
| |
| async fn wait_for_reboot( |
| &mut self, |
| mut options: CheckOptions, |
| control: &mut mpsc::Receiver<ControlRequest>, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) { |
| if !self.policy_engine.reboot_allowed(&options).await { |
| let wait_to_see_if_reboot_allowed = |
| self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse(); |
| futures::pin_mut!(wait_to_see_if_reboot_allowed); |
| |
| let check_timing = self.update_next_update_time(co).await; |
| let wait_to_next_ping = self.make_wait_to_next_check(check_timing).await; |
| futures::pin_mut!(wait_to_next_ping); |
| |
| loop { |
| // Wait for either the next time to check if reboot allowed or the next |
| // ping time or a request to start an update check. |
| select! { |
| () = wait_to_see_if_reboot_allowed => { |
| if self.policy_engine.reboot_allowed(&options).await { |
| break; |
| } |
| info!("Reboot not allowed at the moment, will try again in 30 minutes..."); |
| wait_to_see_if_reboot_allowed.set( |
| self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse() |
| ); |
| }, |
| () = wait_to_next_ping => { |
| self.ping_omaha(co).await; |
| let check_timing = self.update_next_update_time(co).await; |
| wait_to_next_ping.set(self.make_wait_to_next_check(check_timing).await); |
| }, |
| ControlRequest::StartUpdateCheck{ |
| options: new_options, |
| responder |
| } = control.select_next_some() => { |
| let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning); |
| if new_options.source == InstallSource::OnDemand { |
| info!("Waiting for reboot, but ensuring that InstallSource is OnDemand"); |
| options.source = InstallSource::OnDemand; |
| |
| if self.policy_engine.reboot_allowed(&options).await { |
| info!("Upgraded update check request to on demand, policy allowed reboot"); |
| break; |
| } |
| }; |
| } |
| } |
| } |
| } |
| info!("Rebooting the system at the end of a successful update"); |
| if let Err(e) = self.installer.perform_reboot().await { |
| error!("Unable to reboot the system: {}", e); |
| } |
| } |
| |
| /// Report the duration the previous boot waited to reboot based on the update finish time in |
| /// storage, and the current time. Does not report a metric if there's an inconsistency in the |
| /// times stored or computed, i.e. if the reboot time is later than the current time. |
| /// Returns an error if time seems incorrect, e.g. update_finish_time is in the future. |
| fn report_waited_for_reboot_duration( |
| &mut self, |
| update_finish_time: SystemTime, |
| state_machine_start_monotonic_time: Instant, |
| now: ComplexTime, |
| ) -> Result<(), anyhow::Error> { |
| // If `update_finish_time` is in the future we don't have correct time, try again |
| // on the next loop. |
| let update_finish_time_to_now = |
| now.wall_duration_since(update_finish_time).map_err(|e| { |
| anyhow!( |
| "Update finish time later than now, can't report waited for reboot duration, |
| update finish time: {:?}, now: {:?}, error: {:?}", |
| update_finish_time, |
| now, |
| e, |
| ) |
| })?; |
| |
| // It might take a while for us to get here, but we only want to report the |
| // time from update finish to state machine start after reboot, so we subtract |
| // the duration since then using monotonic time. |
| |
| // We only want to report this metric if we can actually compute it. |
| // If for whatever reason the clock was wrong on the previous boot, or monotonic |
| // time is going backwards, better not to report this metric than to report an |
| // incorrect default value. |
| let state_machine_start_to_now = now |
| .mono |
| .checked_duration_since(state_machine_start_monotonic_time) |
| .ok_or_else(|| { |
| error!("Monotonic time appears to have gone backwards"); |
| anyhow!( |
| "State machine start later than now, can't report waited for reboot duration. \ |
| State machine start: {:?}, now: {:?}", |
| state_machine_start_monotonic_time, |
| now.mono, |
| ) |
| })?; |
| |
| let waited_for_reboot_duration = |
| update_finish_time_to_now.checked_sub(state_machine_start_to_now).ok_or_else(|| { |
| anyhow!( |
| "Can't report waiting for reboot duration, update finish time to now smaller \ |
| than state machine start to now. Update finish time to now: {:?}, state \ |
| machine start to now: {:?}", |
| update_finish_time_to_now, |
| state_machine_start_to_now, |
| ) |
| })?; |
| |
| info!("Waited {} seconds for reboot.", waited_for_reboot_duration.as_secs()); |
| self.report_metrics(Metrics::WaitedForRebootDuration(waited_for_reboot_duration)); |
| Ok(()) |
| } |
| |
| /// Report update check interval based on the last check time stored in storage. |
| /// It will also persist the new last check time to storage. |
| async fn report_check_interval(&mut self, install_source: InstallSource) { |
| let now = self.time_source.now(); |
| |
| match self.context.schedule.last_update_check_time { |
| // This is our first run; report the interval between that time and now, |
| // and update the context with the complex time. |
| Some(PartialComplexTime::Wall(t)) => match now.wall_duration_since(t) { |
| Ok(interval) => self.report_metrics(Metrics::UpdateCheckInterval { |
| interval, |
| clock: ClockType::Wall, |
| install_source, |
| }), |
| Err(e) => warn!("Last check time is in the future: {}", e), |
| }, |
| |
| // We've reported an update check before, or we at least have a |
| // PartialComplexTime with a monotonic component. Report our interval |
| // between these Instants. (N.B. strictly speaking, we should only |
| // ever have a PCT::Complex here.) |
| Some(PartialComplexTime::Complex(t)) => match now.mono.checked_duration_since(t.mono) { |
| Some(interval) => self.report_metrics(Metrics::UpdateCheckInterval { |
| interval, |
| clock: ClockType::Monotonic, |
| install_source, |
| }), |
| None => error!("Monotonic time in the past"), |
| }, |
| |
| // No last check time in storage, and no big deal. We'll continue from |
| // monotonic time from now on. This is the only place other than loading |
| // context from storage where the time can be set, so it's either unset |
| // because no storage, or a complex time. No need to match |
| // Some(PartialComplexTime::Monotonic) |
| _ => {} |
| } |
| |
| self.context.schedule.last_update_check_time = now.into(); |
| } |
| |
| /// Perform update check and handle the result, including updating the update check context |
| /// and cohort. |
| pub async fn start_update_check( |
| &mut self, |
| request_params: RequestParams, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) { |
| let apps = self.app_set.to_vec().await; |
| let result = self.perform_update_check(request_params, apps, co).await; |
| match &result { |
| Ok(result) => { |
| info!("Update check result: {:?}", result); |
| // Update check succeeded, update |last_update_time|. |
| self.context.schedule.last_update_time = Some(self.time_source.now().into()); |
| |
| // Determine if any app failed to install, or we had a successful upadte. |
| let install_result = result.app_responses.iter().fold(None, |result, app| { |
| match (result, &app.result) { |
| (_, update_check::Action::InstallPlanExecutionError) => Some(false), |
| (None, update_check::Action::Updated) => Some(true), |
| (result, _) => result, |
| } |
| }); |
| |
| // Update check succeeded, reset |consecutive_failed_update_checks| to 0 and |
| // report metrics. |
| self.report_attempts_to_successful_check(true).await; |
| |
| self.app_set.update_from_omaha(&result.app_responses).await; |
| |
| // Only report |attempts_to_successful_install| if we get an error trying to |
| // install, or we succeed to install an update without error. |
| if let Some(success) = install_result { |
| self.report_attempts_to_successful_install(success).await; |
| } |
| |
| // TODO: update consecutive_proxied_requests |
| } |
| Err(error) => { |
| error!("Update check failed: {:?}", error); |
| |
| let failure_reason = match error { |
| UpdateCheckError::ResponseParser(_) | UpdateCheckError::InstallPlan(_) => { |
| // We talked to Omaha, update |last_update_time|. |
| self.context.schedule.last_update_time = |
| Some(self.time_source.now().into()); |
| |
| UpdateCheckFailureReason::Omaha |
| } |
| UpdateCheckError::OmahaRequest(request_error) => match request_error { |
| OmahaRequestError::Json(_) | OmahaRequestError::HttpBuilder(_) => { |
| UpdateCheckFailureReason::Internal |
| } |
| OmahaRequestError::HttpTransport(_) | OmahaRequestError::HttpStatus(_) => { |
| UpdateCheckFailureReason::Network |
| } |
| }, |
| }; |
| self.report_metrics(Metrics::UpdateCheckFailureReason(failure_reason)); |
| |
| self.report_attempts_to_successful_check(false).await; |
| } |
| } |
| |
| co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule.clone())).await; |
| co.yield_(StateMachineEvent::ProtocolStateChange(self.context.state.clone())).await; |
| co.yield_(StateMachineEvent::UpdateCheckResult(result)).await; |
| |
| self.persist_data().await; |
| } |
| |
| // Update self.context.state.consecutive_failed_update_checks and report the metric if |
| // `success`. Does not persist the value to storage, but rather relies on the caller. |
| async fn report_attempts_to_successful_check(&mut self, success: bool) { |
| let attempts = self.context.state.consecutive_failed_update_checks + 1; |
| if success { |
| self.context.state.consecutive_failed_update_checks = 0; |
| self.report_metrics(Metrics::AttemptsToSuccessfulCheck(attempts as u64)); |
| } else { |
| self.context.state.consecutive_failed_update_checks = attempts; |
| } |
| } |
| |
| /// Update `CONSECUTIVE_FAILED_INSTALL_ATTEMPTS` in storage and report the metrics if |
| /// `success`. Does not commit the change to storage. |
| async fn report_attempts_to_successful_install(&mut self, success: bool) { |
| let storage_ref = self.storage_ref.clone(); |
| let mut storage = storage_ref.lock().await; |
| let attempts = storage.get_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS).await.unwrap_or(0) + 1; |
| |
| self.report_metrics(Metrics::AttemptsToSuccessfulInstall { |
| count: attempts as u64, |
| successful: success, |
| }); |
| |
| if success { |
| storage.remove_or_log(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS).await; |
| } else { |
| if let Err(e) = storage.set_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, attempts).await { |
| error!("Unable to persist {}: {}", CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, e); |
| } |
| } |
| } |
| |
| /// Persist all necessary data to storage. |
| async fn persist_data(&self) { |
| let mut storage = self.storage_ref.lock().await; |
| self.context.persist(&mut *storage).await; |
| self.app_set.persist(&mut *storage).await; |
| |
| storage.commit_or_log().await; |
| } |
| |
| /// This function constructs the chain of async futures needed to perform all of the async tasks |
| /// that comprise an update check. |
| async fn perform_update_check( |
| &mut self, |
| request_params: RequestParams, |
| apps: Vec<App>, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) -> Result<update_check::Response, UpdateCheckError> { |
| self.set_state(State::CheckingForUpdates, co).await; |
| |
| self.report_check_interval(request_params.source.clone()).await; |
| |
| // Construct a request for the app(s). |
| let config = self.config.clone(); |
| let mut request_builder = RequestBuilder::new(&config, &request_params); |
| for app in &apps { |
| request_builder = request_builder.add_update_check(app).add_ping(app); |
| } |
| let session_id = GUID::new(); |
| request_builder = request_builder.session_id(session_id.clone()); |
| |
| let mut omaha_request_attempt = 1; |
| |
| // Attempt in an loop of up to MAX_OMAHA_REQUEST_ATTEMPTS to communicate with Omaha. |
| // exit the loop early on success or an error that isn't related to a transport issue. |
| let loop_result = loop { |
| // Mark the start time for the request to omaha. |
| let omaha_check_start_time = self.time_source.now_in_monotonic(); |
| request_builder = request_builder.request_id(GUID::new()); |
| let result = self.do_omaha_request_and_update_context(&request_builder, co).await; |
| |
| // Report the response time of the omaha request. |
| { |
| // don't use Instant::elapsed(), it doesn't use the right TimeSource, and can panic! |
| // as a result |
| let now = self.time_source.now_in_monotonic(); |
| let duration = now.checked_duration_since(omaha_check_start_time); |
| |
| if let Some(response_time) = duration { |
| self.report_metrics(Metrics::UpdateCheckResponseTime { |
| response_time, |
| successful: result.is_ok(), |
| }); |
| } else { |
| // If this happens, it's a bug. |
| error!( |
| "now: {:?}, is before omaha_check_start_time: {:?}", |
| now, omaha_check_start_time |
| ); |
| } |
| } |
| |
| match result { |
| Ok(res) => { |
| break Ok(res); |
| } |
| Err(OmahaRequestError::Json(e)) => { |
| error!("Unable to construct request body! {:?}", e); |
| self.set_state(State::ErrorCheckingForUpdate, co).await; |
| break Err(UpdateCheckError::OmahaRequest(e.into())); |
| } |
| Err(OmahaRequestError::HttpBuilder(e)) => { |
| error!("Unable to construct HTTP request! {:?}", e); |
| self.set_state(State::ErrorCheckingForUpdate, co).await; |
| break Err(UpdateCheckError::OmahaRequest(e.into())); |
| } |
| Err(OmahaRequestError::HttpTransport(e)) => { |
| warn!("Unable to contact Omaha: {:?}", e); |
| // Don't retry if the error was caused by user code, which means we weren't |
| // using the library correctly. |
| if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS |
| || e.is_user() |
| || self.context.state.server_dictated_poll_interval.is_some() |
| { |
| self.set_state(State::ErrorCheckingForUpdate, co).await; |
| break Err(UpdateCheckError::OmahaRequest(e.into())); |
| } |
| } |
| Err(OmahaRequestError::HttpStatus(e)) => { |
| warn!("Unable to contact Omaha: {:?}", e); |
| if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS |
| || self.context.state.server_dictated_poll_interval.is_some() |
| { |
| self.set_state(State::ErrorCheckingForUpdate, co).await; |
| break Err(UpdateCheckError::OmahaRequest(e.into())); |
| } |
| } |
| } |
| |
| // TODO(fxbug.dev/41738): Move this to Policy. |
| // Randomized exponential backoff of 1, 2, & 4 seconds, +/- 500ms. |
| let backoff_time_secs = 1 << (omaha_request_attempt - 1); |
| let backoff_time = randomize(backoff_time_secs * 1000, 1000); |
| info!("Waiting {} ms before retrying...", backoff_time); |
| self.timer.wait_for(Duration::from_millis(backoff_time)).await; |
| |
| omaha_request_attempt += 1; |
| }; |
| |
| self.report_metrics(Metrics::RequestsPerCheck { |
| count: omaha_request_attempt, |
| successful: loop_result.is_ok(), |
| }); |
| |
| let (_parts, data) = loop_result?; |
| |
| let response = match Self::parse_omaha_response(&data) { |
| Ok(res) => res, |
| Err(err) => { |
| warn!("Unable to parse Omaha response: {:?}", err); |
| self.set_state(State::ErrorCheckingForUpdate, co).await; |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::error(EventErrorCode::ParseResponse), |
| &apps, |
| &session_id, |
| vec![None; apps.len()], |
| None, |
| co, |
| ) |
| .await; |
| return Err(UpdateCheckError::ResponseParser(err)); |
| } |
| }; |
| |
| info!("result: {:?}", response); |
| |
| co.yield_(StateMachineEvent::OmahaServerResponse(response.clone())).await; |
| |
| let statuses = Self::get_app_update_statuses(&response); |
| for (app_id, status) in &statuses { |
| // TODO: Report or metric statuses other than 'no-update' and 'ok' |
| info!("Omaha update check status: {} => {:?}", app_id, status); |
| } |
| |
| let some_app_has_update = statuses.iter().any(|(_id, status)| **status == OmahaStatus::Ok); |
| if !some_app_has_update { |
| // A successful, no-update, check |
| |
| self.set_state(State::NoUpdateAvailable, co).await; |
| Ok(Self::make_response(response, update_check::Action::NoUpdate)) |
| } else { |
| info!( |
| "At least one app has an update, proceeding to build and process an Install Plan" |
| ); |
| let next_versions: Vec<_> = response |
| .apps |
| .iter() |
| .map(|app| { |
| app.update_check.as_ref().and_then(|update_check| { |
| update_check.manifest.as_ref().map(|manifest| manifest.version.clone()) |
| }) |
| }) |
| .collect(); |
| let install_plan = match IN::InstallPlan::try_create_from(&request_params, &response) { |
| Ok(plan) => plan, |
| Err(e) => { |
| error!("Unable to construct install plan! {}", e); |
| self.set_state(State::InstallingUpdate, co).await; |
| self.set_state(State::InstallationError, co).await; |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::error(EventErrorCode::ConstructInstallPlan), |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| None, |
| co, |
| ) |
| .await; |
| return Err(UpdateCheckError::InstallPlan(e.into())); |
| } |
| }; |
| |
| info!("Validating Install Plan with Policy"); |
| let install_plan_decision = self.policy_engine.update_can_start(&install_plan).await; |
| match install_plan_decision { |
| UpdateDecision::Ok => { |
| info!("Proceeding with install plan."); |
| } |
| UpdateDecision::DeferredByPolicy => { |
| info!("Install plan was deferred by Policy."); |
| // Report "error" to Omaha (as this is an event that needs reporting as the |
| // install isn't starting immediately. |
| let event = Event { |
| event_type: EventType::UpdateComplete, |
| event_result: EventResult::UpdateDeferred, |
| ..Event::default() |
| }; |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| event, |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| None, |
| co, |
| ) |
| .await; |
| |
| self.set_state(State::InstallationDeferredByPolicy, co).await; |
| return Ok(Self::make_response( |
| response, |
| update_check::Action::DeferredByPolicy, |
| )); |
| } |
| UpdateDecision::DeniedByPolicy => { |
| warn!("Install plan was denied by Policy, see Policy logs for reasoning"); |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::error(EventErrorCode::DeniedByPolicy), |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| None, |
| co, |
| ) |
| .await; |
| return Ok(Self::make_response(response, update_check::Action::DeniedByPolicy)); |
| } |
| } |
| |
| self.set_state(State::InstallingUpdate, co).await; |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::success(EventType::UpdateDownloadStarted), |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| None, |
| co, |
| ) |
| .await; |
| |
| let install_plan_id = install_plan.id(); |
| let update_start_time = self.time_source.now_in_walltime(); |
| let update_first_seen_time = |
| self.record_update_first_seen_time(&install_plan_id, update_start_time).await; |
| |
| let (send, mut recv) = mpsc::channel(0); |
| let observer = StateMachineProgressObserver(send); |
| let perform_install = async { |
| let result = self.installer.perform_install(&install_plan, Some(&observer)).await; |
| // Drop observer so that we can stop waiting for the next progress. |
| drop(observer); |
| result |
| }; |
| let yield_progress = async { |
| while let Some(progress) = recv.next().await { |
| co.yield_(StateMachineEvent::InstallProgressChange(progress)).await; |
| } |
| }; |
| |
| let (install_result, ()) = future::join(perform_install, yield_progress).await; |
| let update_finish_time = self.time_source.now_in_walltime(); |
| let install_duration = match update_finish_time.duration_since(update_start_time) { |
| Ok(duration) => { |
| let metrics = if install_result.is_ok() { |
| Metrics::SuccessfulUpdateDuration(duration) |
| } else { |
| Metrics::FailedUpdateDuration(duration) |
| }; |
| self.report_metrics(metrics); |
| Some(duration) |
| } |
| Err(e) => { |
| warn!("Update start time is in the future: {}", e); |
| None |
| } |
| }; |
| if let Err(e) = install_result { |
| co.yield_(StateMachineEvent::InstallerError(Some(Box::new(e)))).await; |
| self.set_state(State::InstallationError, co).await; |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::error(EventErrorCode::Installation), |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| install_duration, |
| co, |
| ) |
| .await; |
| |
| return Ok(Self::make_response( |
| response, |
| update_check::Action::InstallPlanExecutionError, |
| )); |
| } |
| |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::success(EventType::UpdateDownloadFinished), |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| install_duration, |
| co, |
| ) |
| .await; |
| |
| // TODO: Verify downloaded update if needed. |
| |
| self.report_omaha_event_and_update_context( |
| &request_params, |
| Event::success(EventType::UpdateComplete), |
| &apps, |
| &session_id, |
| next_versions.clone(), |
| install_duration, |
| co, |
| ) |
| .await; |
| |
| match update_finish_time.duration_since(update_first_seen_time) { |
| Ok(duration) => { |
| self.report_metrics(Metrics::SuccessfulUpdateFromFirstSeen(duration)) |
| } |
| Err(e) => warn!("Update first seen time is in the future: {}", e), |
| } |
| { |
| let mut storage = self.storage_ref.lock().await; |
| if let Err(e) = storage.set_time(UPDATE_FINISH_TIME, update_finish_time).await { |
| error!("Unable to persist {}: {}", UPDATE_FINISH_TIME, e); |
| } |
| let target_version = next_versions[0].as_deref().unwrap_or_else(|| { |
| error!("Target version string not found in Omaha response."); |
| "UNKNOWN" |
| }); |
| if let Err(e) = storage.set_string(TARGET_VERSION, target_version).await { |
| error!("Unable to persist {}: {}", TARGET_VERSION, e); |
| } |
| storage.commit_or_log().await; |
| } |
| self.set_state(State::WaitingForReboot, co).await; |
| Ok(Self::make_response(response, update_check::Action::Updated)) |
| } |
| } |
| |
| /// Report the given |event| to Omaha, errors occurred during reporting are logged but not |
| /// acted on. |
| async fn report_omaha_event_and_update_context<'a>( |
| &'a mut self, |
| request_params: &'a RequestParams, |
| event: Event, |
| apps: &'a Vec<App>, |
| session_id: &GUID, |
| next_versions: Vec<Option<String>>, |
| install_duration: Option<Duration>, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) { |
| let config = self.config.clone(); |
| let mut request_builder = RequestBuilder::new(&config, &request_params); |
| for (app, next_version) in apps.iter().zip(next_versions) { |
| let event = Event { |
| previous_version: Some(app.version.to_string()), |
| next_version, |
| download_time_ms: install_duration.and_then(|d| d.as_millis().try_into().ok()), |
| ..event.clone() |
| }; |
| request_builder = request_builder.add_event(app, &event); |
| } |
| request_builder = request_builder.session_id(session_id.clone()).request_id(GUID::new()); |
| if let Err(e) = self.do_omaha_request_and_update_context(&request_builder, co).await { |
| self.report_metrics(Metrics::OmahaEventLost(event)); |
| warn!("Unable to report event to Omaha: {:?}", e); |
| } |
| } |
| |
| /// Sends a ping to Omaha and updates context and app_set. |
| async fn ping_omaha(&mut self, co: &mut async_generator::Yield<StateMachineEvent>) { |
| let apps = self.app_set.to_vec().await; |
| let request_params = |
| RequestParams { source: InstallSource::ScheduledTask, use_configured_proxies: true }; |
| let config = self.config.clone(); |
| let mut request_builder = RequestBuilder::new(&config, &request_params); |
| for app in &apps { |
| request_builder = request_builder.add_ping(app); |
| } |
| request_builder = request_builder.session_id(GUID::new()).request_id(GUID::new()); |
| |
| let (_parts, data) = |
| match self.do_omaha_request_and_update_context(&request_builder, co).await { |
| Ok(res) => res, |
| Err(e) => { |
| error!("Ping Omaha failed: {:#}", anyhow!(e)); |
| self.context.state.consecutive_failed_update_checks += 1; |
| self.persist_data().await; |
| return; |
| } |
| }; |
| |
| let response = match Self::parse_omaha_response(&data) { |
| Ok(res) => res, |
| Err(e) => { |
| error!("Unable to parse Omaha response: {:#}", anyhow!(e)); |
| self.context.state.consecutive_failed_update_checks += 1; |
| self.persist_data().await; |
| return; |
| } |
| }; |
| |
| self.context.state.consecutive_failed_update_checks = 0; |
| |
| // Even though this is a ping, we should still update the last_update_time for |
| // policy to compute the next ping time. |
| self.context.schedule.last_update_time = Some(self.time_source.now().into()); |
| co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule.clone())).await; |
| |
| let result = Self::make_response(response, update_check::Action::NoUpdate); |
| |
| self.app_set.update_from_omaha(&result.app_responses).await; |
| |
| self.persist_data().await; |
| } |
| |
| /// Make an http request to Omaha, and collect the response into an error or a blob of bytes |
| /// that can be parsed. |
| /// |
| /// Given the http client and the request build, this makes the http request, and then coalesces |
| /// the various errors into a single error type for easier error handling by the make process |
| /// flow. |
| /// |
| /// This function also converts an HTTP error response into an Error, to divert those into the |
| /// error handling paths instead of the Ok() path. |
| /// |
| /// If a valid X-Retry-After header is found in the response, this function will update the |
| /// server dictated poll interval in context. |
| async fn do_omaha_request_and_update_context<'a>( |
| &'a mut self, |
| builder: &RequestBuilder<'a>, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) -> Result<(Parts, Vec<u8>), OmahaRequestError> { |
| let (parts, body) = |
| Self::make_request(&mut self.http, builder.build()?).await?.into_parts(); |
| // Clients MUST respect this header even if paired with non-successful HTTP response code. |
| let server_dictated_poll_interval = parts.headers.get(X_RETRY_AFTER).and_then(|header| { |
| match header |
| .to_str() |
| .map_err(|e| anyhow!(e)) |
| .and_then(|s| s.parse::<u64>().map_err(|e| anyhow!(e))) |
| { |
| Ok(seconds) => { |
| // Servers SHOULD NOT send a value in excess of 86400 (24 hours), and clients |
| // SHOULD treat values greater than 86400 as 86400. |
| Some(Duration::from_secs(min(seconds, 86400))) |
| } |
| Err(e) => { |
| error!("Unable to parse {} header: {:#}", X_RETRY_AFTER, e); |
| None |
| } |
| } |
| }); |
| if self.context.state.server_dictated_poll_interval != server_dictated_poll_interval { |
| self.context.state.server_dictated_poll_interval = server_dictated_poll_interval; |
| co.yield_(StateMachineEvent::ProtocolStateChange(self.context.state.clone())).await; |
| let mut storage = self.storage_ref.lock().await; |
| self.context.persist(&mut *storage).await; |
| storage.commit_or_log().await; |
| } |
| if !parts.status.is_success() { |
| // Convert HTTP failure responses into Errors. |
| Err(OmahaRequestError::HttpStatus(parts.status)) |
| } else { |
| // Pass successful responses to the caller. |
| info!("Omaha HTTP response: {}", parts.status); |
| Ok((parts, body)) |
| } |
| } |
| |
| /// Make an http request and collect the response body into a Vec of bytes. |
| /// |
| /// Specifically, this takes the body of the response and concatenates it into a single Vec of |
| /// bytes so that any errors in receiving it can be captured immediately, instead of needing to |
| /// handle them as part of parsing the response body. |
| async fn make_request( |
| http_client: &mut HR, |
| request: http::Request<hyper::Body>, |
| ) -> Result<HttpResponse<Vec<u8>>, http_request::Error> { |
| info!("Making http request to: {}", request.uri()); |
| http_client.request(request).await.map_err(|err| { |
| warn!("Unable to perform request: {}", err); |
| err |
| }) |
| } |
| |
| /// This method takes the response bytes from Omaha, and converts them into a protocol::Response |
| /// struct, returning all of the various errors that can occur in that process as a consolidated |
| /// error enum. |
| fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> { |
| parse_json_response(&data).map_err(ResponseParseError::Json) |
| } |
| |
| /// Utility to extract pairs of app id => omaha status response, to make it easier to ask |
| /// questions about the response. |
| fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> { |
| response |
| .apps |
| .iter() |
| .filter_map(|app| match &app.update_check { |
| None => None, |
| Some(u) => Some((app.id.as_str(), &u.status)), |
| }) |
| .collect() |
| } |
| |
| /// Utility to take a set of protocol::response::Apps and then construct a response from the |
| /// update check based on those app IDs. |
| /// |
| /// TODO: Change the Policy and Installer to return a set of results, one for each app ID, then |
| /// make this match that. |
| fn make_response( |
| response: protocol::response::Response, |
| action: update_check::Action, |
| ) -> update_check::Response { |
| update_check::Response { |
| app_responses: response |
| .apps |
| .iter() |
| .map(|app| update_check::AppResponse { |
| app_id: app.id.clone(), |
| cohort: app.cohort.clone(), |
| user_counting: response.daystart.clone().into(), |
| result: action.clone(), |
| }) |
| .collect(), |
| } |
| } |
| |
| /// Update the state internally and send it to the observer. |
| async fn set_state( |
| &mut self, |
| state: State, |
| co: &mut async_generator::Yield<StateMachineEvent>, |
| ) { |
| self.state = state.clone(); |
| co.yield_(StateMachineEvent::StateChange(state)).await; |
| } |
| |
| fn report_metrics(&mut self, metrics: Metrics) { |
| if let Err(err) = self.metrics_reporter.report_metrics(metrics) { |
| warn!("Unable to report metrics: {:?}", err); |
| } |
| } |
| |
| async fn record_update_first_seen_time( |
| &mut self, |
| install_plan_id: &str, |
| now: SystemTime, |
| ) -> SystemTime { |
| let mut storage = self.storage_ref.lock().await; |
| let previous_id = storage.get_string(INSTALL_PLAN_ID).await; |
| if let Some(previous_id) = previous_id { |
| if previous_id == install_plan_id { |
| return storage.get_time(UPDATE_FIRST_SEEN_TIME).await.unwrap_or(now); |
| } |
| } |
| // Update INSTALL_PLAN_ID and UPDATE_FIRST_SEEN_TIME for new update. |
| if let Err(e) = storage.set_string(INSTALL_PLAN_ID, install_plan_id).await { |
| error!("Unable to persist {}: {}", INSTALL_PLAN_ID, e); |
| return now; |
| } |
| if let Err(e) = storage.set_time(UPDATE_FIRST_SEEN_TIME, now).await { |
| error!("Unable to persist {}: {}", UPDATE_FIRST_SEEN_TIME, e); |
| let _ = storage.remove(INSTALL_PLAN_ID).await; |
| return now; |
| } |
| storage.commit_or_log().await; |
| now |
| } |
| } |
| |
| /// Return a random number in [n - range / 2, n - range / 2 + range). |
| fn randomize(n: u64, range: u64) -> u64 { |
| n - range / 2 + rand::random::<u64>() % range |
| } |
| |
| #[cfg(test)] |
| impl<PE, HR, IN, TM, MR, ST> StateMachine<PE, HR, IN, TM, MR, ST> |
| where |
| PE: PolicyEngine, |
| HR: HttpRequest, |
| IN: Installer, |
| TM: Timer, |
| MR: MetricsReporter, |
| ST: Storage, |
| { |
| /// Run perform_update_check once, returning the update check result. |
| pub async fn oneshot(&mut self) -> Result<update_check::Response, UpdateCheckError> { |
| let request_params = RequestParams::default(); |
| |
| let apps = self.app_set.to_vec().await; |
| |
| async_generator::generate(move |mut co| async move { |
| self.perform_update_check(request_params, apps, &mut co).await |
| }) |
| .into_complete() |
| .await |
| } |
| |
| /// Run start_upate_check once, discarding its states. |
| pub async fn run_once(&mut self) { |
| let request_params = RequestParams::default(); |
| |
| async_generator::generate(move |mut co| async move { |
| self.start_update_check(request_params, &mut co).await; |
| }) |
| .map(|_| ()) |
| .collect::<()>() |
| .await; |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::update_check::*; |
| use super::*; |
| use crate::{ |
| common::{ |
| App, CheckOptions, PersistedApp, ProtocolState, UpdateCheckSchedule, UserCounting, |
| }, |
| configuration::Updater, |
| http_request::mock::MockHttpRequest, |
| installer::{ |
| stub::{StubInstallErrors, StubInstaller, StubPlan}, |
| ProgressObserver, |
| }, |
| metrics::MockMetricsReporter, |
| policy::{MockPolicyEngine, StubPolicyEngine}, |
| protocol::{request::OS, response, Cohort}, |
| storage::MemStorage, |
| time::{ |
| timers::{BlockingTimer, MockTimer, RequestedWait}, |
| MockTimeSource, PartialComplexTime, |
| }, |
| }; |
| use futures::executor::{block_on, LocalPool}; |
| use futures::future::BoxFuture; |
| use futures::task::LocalSpawnExt; |
| use log::info; |
| use matches::assert_matches; |
| use pretty_assertions::assert_eq; |
| use serde_json::json; |
| use std::cell::RefCell; |
| use std::time::Duration; |
| use version::Version; |
| |
| fn make_test_app_set() -> AppSet { |
| AppSet::new(vec![App::builder("{00000000-0000-0000-0000-000000000001}", [1, 2, 3, 4]) |
| .with_cohort(Cohort::new("stable-channel")) |
| .build()]) |
| } |
| |
| // Assert that the last request made to |http| is equal to the request built by |
| // |request_builder|. |
| async fn assert_request<'a>(http: MockHttpRequest, request_builder: RequestBuilder<'a>) { |
| let body = request_builder.build().unwrap().into_body(); |
| let body = body |
| .try_fold(Vec::new(), |mut vec, b| async move { |
| vec.extend(b); |
| Ok(vec) |
| }) |
| .await |
| .unwrap(); |
| // Compare string instead of Vec<u8> for easier debugging. |
| let body_str = String::from_utf8_lossy(&body); |
| http.assert_body_str(&body_str).await; |
| } |
| |
| #[test] |
| fn run_simple_check_with_noupdate_result() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "noupdate" |
| } |
| }] |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| StateMachineBuilder::new_stub().http(http).oneshot().await.unwrap(); |
| |
| info!("update check complete!"); |
| }); |
| } |
| |
| #[test] |
| fn test_cohort_returned_with_noupdate_result() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "cohort": "1", |
| "cohortname": "stable-channel", |
| "updatecheck": { |
| "status": "noupdate" |
| } |
| }] |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let response = StateMachineBuilder::new_stub().http(http).oneshot().await.unwrap(); |
| assert_eq!("{00000000-0000-0000-0000-000000000001}", response.app_responses[0].app_id); |
| assert_eq!(Some("1".into()), response.app_responses[0].cohort.id); |
| assert_eq!(Some("stable-channel".into()), response.app_responses[0].cohort.name); |
| assert_eq!(None, response.app_responses[0].cohort.hint); |
| }); |
| } |
| |
| #[test] |
| fn test_report_parse_response_error() { |
| block_on(async { |
| let http = MockHttpRequest::new(HttpResponse::new("invalid response".into())); |
| |
| let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await; |
| |
| let response = state_machine.oneshot().await; |
| assert_matches!(response, Err(UpdateCheckError::ResponseParser(_))); |
| |
| let request_params = RequestParams::default(); |
| let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params); |
| let event = Event { |
| previous_version: Some("1.2.3.4".to_string()), |
| ..Event::error(EventErrorCode::ParseResponse) |
| }; |
| let apps = state_machine.app_set.to_vec().await; |
| request_builder = request_builder |
| .add_event(&apps[0], &event) |
| .session_id(GUID::from_u128(0)) |
| .request_id(GUID::from_u128(2)); |
| assert_request(state_machine.http, request_builder).await; |
| }); |
| } |
| |
| #[test] |
| fn test_report_construct_install_plan_error() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "4.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await; |
| |
| let response = state_machine.oneshot().await; |
| assert_matches!(response, Err(UpdateCheckError::InstallPlan(_))); |
| |
| let request_params = RequestParams::default(); |
| let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params); |
| let event = Event { |
| previous_version: Some("1.2.3.4".to_string()), |
| ..Event::error(EventErrorCode::ConstructInstallPlan) |
| }; |
| let apps = state_machine.app_set.to_vec().await; |
| request_builder = request_builder |
| .add_event(&apps[0], &event) |
| .session_id(GUID::from_u128(0)) |
| .request_id(GUID::from_u128(2)); |
| assert_request(state_machine.http, request_builder).await; |
| }); |
| } |
| |
| #[test] |
| fn test_report_installation_error() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok", |
| "manifest": { |
| "version": "5.6.7.8", |
| "actions": { |
| "action": [], |
| }, |
| "packages": { |
| "package": [], |
| }, |
| } |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(StubInstaller { should_fail: true }) |
| .build() |
| .await; |
| |
| let response = state_machine.oneshot().await.unwrap(); |
| assert_eq!(Action::InstallPlanExecutionError, response.app_responses[0].result); |
| |
| let request_params = RequestParams::default(); |
| let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params); |
| let event = Event { |
| previous_version: Some("1.2.3.4".to_string()), |
| next_version: Some("5.6.7.8".to_string()), |
| download_time_ms: Some(0), |
| ..Event::error(EventErrorCode::Installation) |
| }; |
| let apps = state_machine.app_set.to_vec().await; |
| request_builder = request_builder |
| .add_event(&apps[0], &event) |
| .session_id(GUID::from_u128(0)) |
| .request_id(GUID::from_u128(3)); |
| assert_request(state_machine.http, request_builder).await; |
| }); |
| } |
| |
| // Test that our observer can see when there's an installation error, and that it gets |
| // the right error type. |
| #[test] |
| fn test_observe_installation_error() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok", |
| "manifest": { |
| "version": "5.6.7.8", |
| "actions": { |
| "action": [], |
| }, |
| "packages": { |
| "package": [], |
| }, |
| } |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let actual_errors = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(StubInstaller { should_fail: true }) |
| .oneshot_check() |
| .await |
| .filter_map(|event| { |
| future::ready(match event { |
| StateMachineEvent::InstallerError(Some(e)) => { |
| Some(*e.downcast::<StubInstallErrors>().unwrap()) |
| } |
| _ => None, |
| }) |
| }) |
| .collect::<Vec<StubInstallErrors>>() |
| .await; |
| |
| let expected_errors = vec![StubInstallErrors::Failed]; |
| assert_eq!(actual_errors, expected_errors); |
| }); |
| } |
| |
| #[test] |
| fn test_report_deferred_by_policy() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let policy_engine = MockPolicyEngine { |
| update_decision: UpdateDecision::DeferredByPolicy, |
| ..MockPolicyEngine::default() |
| }; |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .policy_engine(policy_engine) |
| .http(http) |
| .build() |
| .await; |
| |
| let response = state_machine.oneshot().await.unwrap(); |
| assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result); |
| |
| let request_params = RequestParams::default(); |
| let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params); |
| let event = Event { |
| event_type: EventType::UpdateComplete, |
| event_result: EventResult::UpdateDeferred, |
| previous_version: Some("1.2.3.4".to_string()), |
| ..Event::default() |
| }; |
| let apps = state_machine.app_set.to_vec().await; |
| request_builder = request_builder |
| .add_event(&apps[0], &event) |
| .session_id(GUID::from_u128(0)) |
| .request_id(GUID::from_u128(2)); |
| assert_request(state_machine.http, request_builder).await; |
| }); |
| } |
| |
| #[test] |
| fn test_report_denied_by_policy() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let policy_engine = MockPolicyEngine { |
| update_decision: UpdateDecision::DeniedByPolicy, |
| ..MockPolicyEngine::default() |
| }; |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .policy_engine(policy_engine) |
| .http(http) |
| .build() |
| .await; |
| |
| let response = state_machine.oneshot().await.unwrap(); |
| assert_eq!(Action::DeniedByPolicy, response.app_responses[0].result); |
| |
| let request_params = RequestParams::default(); |
| let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params); |
| let event = Event { |
| previous_version: Some("1.2.3.4".to_string()), |
| ..Event::error(EventErrorCode::DeniedByPolicy) |
| }; |
| let apps = state_machine.app_set.to_vec().await; |
| request_builder = request_builder |
| .add_event(&apps[0], &event) |
| .session_id(GUID::from_u128(0)) |
| .request_id(GUID::from_u128(2)); |
| assert_request(state_machine.http, request_builder).await; |
| }); |
| } |
| |
| #[test] |
| fn test_wait_timer() { |
| let mut pool = LocalPool::new(); |
| let mock_time = MockTimeSource::new_from_now(); |
| let next_update_time = mock_time.now() + Duration::from_secs(111); |
| let (timer, mut timers) = BlockingTimer::new(); |
| let policy_engine = MockPolicyEngine { |
| check_timing: Some(CheckTiming::builder().time(next_update_time).build()), |
| time_source: mock_time, |
| ..MockPolicyEngine::default() |
| }; |
| |
| let (_ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub().policy_engine(policy_engine).timer(timer).start(), |
| ); |
| |
| pool.spawner().spawn_local(state_machine.map(|_| ()).collect()).unwrap(); |
| |
| // With otherwise stub implementations, the pool stalls when a timer is awaited. Dropping |
| // the state machine will panic if any timer durations were not used. |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| } |
| |
| #[test] |
| fn test_cohort_and_user_counting_updates_are_used_in_subsequent_requests() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "daystart": { |
| "elapsed_days": 1234567, |
| "elapsed_seconds": 3645 |
| }, |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "cohort": "1", |
| "cohortname": "stable-channel", |
| "updatecheck": { |
| "status": "noupdate" |
| } |
| }] |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let mut http = MockHttpRequest::new(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response)); |
| let last_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell()); |
| let apps = make_test_app_set(); |
| |
| let mut state_machine = |
| StateMachineBuilder::new_stub().http(http).app_set(apps.clone()).build().await; |
| |
| // Run it the first time. |
| state_machine.run_once().await; |
| |
| let apps = apps.to_vec().await; |
| assert_eq!(Some("1".to_string()), apps[0].cohort.id); |
| assert_eq!(None, apps[0].cohort.hint); |
| assert_eq!(Some("stable-channel".to_string()), apps[0].cohort.name); |
| assert_eq!(UserCounting::ClientRegulatedByDate(Some(1234567)), apps[0].user_counting); |
| |
| // Run it the second time. |
| state_machine.run_once().await; |
| |
| let request_params = RequestParams::default(); |
| let expected_request_builder = |
| RequestBuilder::new(&state_machine.config, &request_params) |
| .add_update_check(&apps[0]) |
| .add_ping(&apps[0]) |
| .session_id(GUID::from_u128(2)) |
| .request_id(GUID::from_u128(3)); |
| // Check that the second update check used the new app. |
| assert_request(last_request_viewer, expected_request_builder).await; |
| }); |
| } |
| |
| #[test] |
| fn test_user_counting_returned() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "daystart": { |
| "elapsed_days": 1234567, |
| "elapsed_seconds": 3645 |
| }, |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "cohort": "1", |
| "cohortname": "stable-channel", |
| "updatecheck": { |
| "status": "noupdate" |
| } |
| }] |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let response = StateMachineBuilder::new_stub().http(http).oneshot().await.unwrap(); |
| |
| assert_eq!( |
| UserCounting::ClientRegulatedByDate(Some(1234567)), |
| response.app_responses[0].user_counting |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_observe_state() { |
| block_on(async { |
| let actual_states = StateMachineBuilder::new_stub() |
| .oneshot_check() |
| .await |
| .filter_map(|event| { |
| future::ready(match event { |
| StateMachineEvent::StateChange(state) => Some(state), |
| _ => None, |
| }) |
| }) |
| .collect::<Vec<State>>() |
| .await; |
| |
| let expected_states = vec![State::CheckingForUpdates, State::ErrorCheckingForUpdate]; |
| assert_eq!(actual_states, expected_states); |
| }); |
| } |
| |
| #[test] |
| fn test_observe_schedule() { |
| block_on(async { |
| let mock_time = MockTimeSource::new_from_now(); |
| let actual_schedules = StateMachineBuilder::new_stub() |
| .policy_engine(StubPolicyEngine::new(&mock_time)) |
| .oneshot_check() |
| .await |
| .filter_map(|event| { |
| future::ready(match event { |
| StateMachineEvent::ScheduleChange(schedule) => Some(schedule), |
| _ => None, |
| }) |
| }) |
| .collect::<Vec<UpdateCheckSchedule>>() |
| .await; |
| |
| // The resultant schedule should only contain the timestamp of the above update check. |
| let expected_schedule = UpdateCheckSchedule::builder() |
| .last_time(mock_time.now()) |
| .last_check_time(mock_time.now()) |
| .build(); |
| |
| assert_eq!(actual_schedules, vec![expected_schedule]); |
| }); |
| } |
| |
| #[test] |
| fn test_observe_protocol_state() { |
| block_on(async { |
| let actual_protocol_states = StateMachineBuilder::new_stub() |
| .oneshot_check() |
| .await |
| .filter_map(|event| { |
| future::ready(match event { |
| StateMachineEvent::ProtocolStateChange(state) => Some(state), |
| _ => None, |
| }) |
| }) |
| .collect::<Vec<ProtocolState>>() |
| .await; |
| |
| let expected_protocol_state = |
| ProtocolState { consecutive_failed_update_checks: 1, ..ProtocolState::default() }; |
| |
| assert_eq!(actual_protocol_states, vec![expected_protocol_state]); |
| }); |
| } |
| |
| #[test] |
| fn test_observe_omaha_server_response() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "cohort": "1", |
| "cohortname": "stable-channel", |
| "updatecheck": { |
| "status": "noupdate" |
| } |
| }] |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let expected_omaha_response = response::parse_json_response(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| |
| let actual_omaha_response = StateMachineBuilder::new_stub() |
| .http(http) |
| .oneshot_check() |
| .await |
| .filter_map(|event| { |
| future::ready(match event { |
| StateMachineEvent::OmahaServerResponse(response) => Some(response), |
| _ => None, |
| }) |
| }) |
| .collect::<Vec<response::Response>>() |
| .await; |
| |
| assert_eq!(actual_omaha_response, vec![expected_omaha_response]); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_omaha_event_lost() { |
| block_on(async { |
| // This is sufficient to trigger a lost Omaha event as oneshot triggers an |
| // update check, which gets the invalid response (but hasn't checked the |
| // validity yet). This invalid response still contains an OK status, resulting |
| // in the UpdateCheckResponseTime and RequestsPerCheck events being generated |
| // reporting success. |
| // |
| // The response is then parsed and found to be incorrect; this parse error is |
| // attempted to be sent back to Omaha as an event with the ParseResponse error |
| // associated. However, the MockHttpRequest has already consumed the one |
| // response it knew how to give; this event is reported via HTTP, but is "lost" |
| // because the mock responds with a 500 error when it has no responses left to |
| // return. |
| // |
| // That finally results in the OmahaEventLost. |
| let http = MockHttpRequest::new(HttpResponse::new("invalid response".into())); |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let _response = StateMachineBuilder::new_stub() |
| .http(http) |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| Metrics::OmahaEventLost(Event { |
| event_type: EventType::UpdateComplete, |
| event_result: EventResult::Error, |
| errorcode: Some(EventErrorCode::ParseResponse), |
| previous_version: None, |
| next_version: None, |
| download_time_ms: None, |
| }) |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_update_check_response_time() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let _response = StateMachineBuilder::new_stub() |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_update_check_response_time_on_failure() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let mut http = MockHttpRequest::default(); |
| |
| for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS { |
| http.add_error(http_request::mock_errors::make_transport_error()); |
| } |
| |
| // Note: we exit the update loop before we fetch the successful result, so we never see |
| // this result. |
| http.add_response(hyper::Response::default()); |
| |
| let _response = StateMachineBuilder::new_stub() |
| .http(http) |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: false }, |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: false }, |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: false }, |
| Metrics::RequestsPerCheck { count: 3, successful: false }, |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_update_check_response_time_on_failure_followed_by_success() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let mut http = MockHttpRequest::default(); |
| |
| for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 { |
| http.add_error(http_request::mock_errors::make_transport_error()); |
| } |
| http.add_response(hyper::Response::default()); |
| |
| let _response = StateMachineBuilder::new_stub() |
| .http(http) |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: false }, |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: false }, |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 3, successful: true }, |
| Metrics::OmahaEventLost(Event { |
| event_type: EventType::UpdateComplete, |
| event_result: EventResult::Error, |
| errorcode: Some(EventErrorCode::ParseResponse), |
| previous_version: None, |
| next_version: None, |
| download_time_ms: None |
| }), |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_requests_per_check() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let _response = StateMachineBuilder::new_stub() |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| assert!(metrics_reporter |
| .metrics |
| .contains(&Metrics::RequestsPerCheck { count: 1, successful: true })); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_requests_per_check_on_failure_followed_by_success() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let mut http = MockHttpRequest::default(); |
| |
| for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 { |
| http.add_error(http_request::mock_errors::make_transport_error()); |
| } |
| |
| http.add_response(hyper::Response::default()); |
| |
| let _response = StateMachineBuilder::new_stub() |
| .http(http) |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| assert!(!metrics_reporter.metrics.is_empty()); |
| assert!(metrics_reporter.metrics.contains(&Metrics::RequestsPerCheck { |
| count: MAX_OMAHA_REQUEST_ATTEMPTS, |
| successful: true |
| })); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_requests_per_check_on_failure() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let mut http = MockHttpRequest::default(); |
| |
| for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS { |
| http.add_error(http_request::mock_errors::make_transport_error()); |
| } |
| |
| // Note we will give up before we get this successful request. |
| http.add_response(hyper::Response::default()); |
| |
| let _response = StateMachineBuilder::new_stub() |
| .http(http) |
| .metrics_reporter(&mut metrics_reporter) |
| .oneshot() |
| .await; |
| |
| assert!(!metrics_reporter.metrics.is_empty()); |
| assert!(metrics_reporter.metrics.contains(&Metrics::RequestsPerCheck { |
| count: MAX_OMAHA_REQUEST_ATTEMPTS, |
| successful: false |
| })); |
| }); |
| } |
| |
| #[test] |
| fn test_requests_per_check_backoff_with_mock_timer() { |
| block_on(async { |
| let mut timer = MockTimer::new(); |
| timer.expect_for_range(Duration::from_millis(500), Duration::from_millis(1500)); |
| timer.expect_for_range(Duration::from_millis(1500), Duration::from_millis(2500)); |
| let requested_waits = timer.get_requested_waits_view(); |
| let response = StateMachineBuilder::new_stub() |
| .http(MockHttpRequest::empty()) |
| .timer(timer) |
| .oneshot() |
| .await; |
| |
| let waits = requested_waits.borrow(); |
| assert_eq!(waits.len(), 2); |
| assert_matches!( |
| waits[0], |
| RequestedWait::For(d) if d >= Duration::from_millis(500) && d <= Duration::from_millis(1500) |
| ); |
| assert_matches!( |
| waits[1], |
| RequestedWait::For(d) if d >= Duration::from_millis(1500) && d <= Duration::from_millis(2500) |
| ); |
| |
| assert_matches!( |
| response, |
| Err(UpdateCheckError::OmahaRequest(OmahaRequestError::HttpStatus(_))) |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_update_check_failure_reason_omaha() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .metrics_reporter(&mut metrics_reporter) |
| .build() |
| .await; |
| |
| state_machine.run_once().await; |
| |
| assert!(metrics_reporter |
| .metrics |
| .contains(&Metrics::UpdateCheckFailureReason(UpdateCheckFailureReason::Omaha))); |
| }); |
| } |
| |
| #[test] |
| fn test_metrics_report_update_check_failure_reason_network() { |
| block_on(async { |
| let mut metrics_reporter = MockMetricsReporter::new(); |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(MockHttpRequest::empty()) |
| .metrics_reporter(&mut metrics_reporter) |
| .build() |
| .await; |
| |
| state_machine.run_once().await; |
| |
| assert!(metrics_reporter |
| .metrics |
| .contains(&Metrics::UpdateCheckFailureReason(UpdateCheckFailureReason::Network))); |
| }); |
| } |
| |
| #[test] |
| fn test_persist_last_update_time() { |
| block_on(async { |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| StateMachineBuilder::new_stub() |
| .storage(Rc::clone(&storage)) |
| .oneshot_check() |
| .await |
| .map(|_| ()) |
| .collect::<()>() |
| .await; |
| |
| let storage = storage.lock().await; |
| storage.get_int(LAST_UPDATE_TIME).await.unwrap(); |
| assert_eq!(true, storage.committed()); |
| }); |
| } |
| |
| #[test] |
| fn test_persist_server_dictated_poll_interval() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "noupdate" |
| } |
| }] |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let response = |
| HttpResponse::builder().header(X_RETRY_AFTER, 1234).body(response).unwrap(); |
| let http = MockHttpRequest::new(response); |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| state_machine.oneshot().await.unwrap(); |
| |
| assert_eq!( |
| state_machine.context.state.server_dictated_poll_interval, |
| Some(Duration::from_secs(1234)) |
| ); |
| |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await, Some(1234000000)); |
| assert!(storage.committed()); |
| }); |
| } |
| |
| #[test] |
| fn test_persist_server_dictated_poll_interval_http_error() { |
| block_on(async { |
| let response = HttpResponse::builder() |
| .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) |
| .header(X_RETRY_AFTER, 1234) |
| .body(vec![]) |
| .unwrap(); |
| let http = MockHttpRequest::new(response); |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| assert_matches!( |
| state_machine.oneshot().await, |
| Err(UpdateCheckError::OmahaRequest(OmahaRequestError::HttpStatus(_))) |
| ); |
| |
| assert_eq!( |
| state_machine.context.state.server_dictated_poll_interval, |
| Some(Duration::from_secs(1234)) |
| ); |
| |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await, Some(1234000000)); |
| assert!(storage.committed()); |
| }); |
| } |
| |
| #[test] |
| fn test_persist_server_dictated_poll_interval_max_duration() { |
| block_on(async { |
| let response = HttpResponse::builder() |
| .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) |
| .header(X_RETRY_AFTER, 123456789) |
| .body(vec![]) |
| .unwrap(); |
| let http = MockHttpRequest::new(response); |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| assert_matches!( |
| state_machine.oneshot().await, |
| Err(UpdateCheckError::OmahaRequest(OmahaRequestError::HttpStatus(_))) |
| ); |
| |
| assert_eq!( |
| state_machine.context.state.server_dictated_poll_interval, |
| Some(Duration::from_secs(86400)) |
| ); |
| |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await, Some(86400000000)); |
| assert!(storage.committed()); |
| }); |
| } |
| |
| #[test] |
| fn test_server_dictated_poll_interval_with_transport_error_no_retry() { |
| block_on(async { |
| let mut http = MockHttpRequest::empty(); |
| http.add_error(http_request::mock_errors::make_transport_error()); |
| let mut storage = MemStorage::new(); |
| storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 1234000000); |
| storage.commit(); |
| let storage = Rc::new(Mutex::new(storage)); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| // This verifies that state machine does not retry because MockHttpRequest will only |
| // return the transport error on the first request, any additional requests will get |
| // HttpStatus error. |
| assert_matches!( |
| state_machine.oneshot().await, |
| Err(UpdateCheckError::OmahaRequest(OmahaRequestError::HttpTransport(_))) |
| ); |
| |
| assert_eq!( |
| state_machine.context.state.server_dictated_poll_interval, |
| Some(Duration::from_secs(1234)) |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_persist_app() { |
| block_on(async { |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| let app_set = make_test_app_set(); |
| |
| StateMachineBuilder::new_stub() |
| .storage(Rc::clone(&storage)) |
| .app_set(app_set.clone()) |
| .oneshot_check() |
| .await |
| .map(|_| ()) |
| .collect::<()>() |
| .await; |
| |
| let storage = storage.lock().await; |
| let apps = app_set.to_vec().await; |
| storage.get_string(&apps[0].id).await.unwrap(); |
| assert!(storage.committed()); |
| }); |
| } |
| |
| #[test] |
| fn test_load_last_update_time() { |
| block_on(async { |
| let mut storage = MemStorage::new(); |
| let mut mock_time = MockTimeSource::new_from_now(); |
| mock_time.truncate_submicrosecond_walltime(); |
| let last_update_time = mock_time.now_in_walltime() - Duration::from_secs(999); |
| storage.set_time(LAST_UPDATE_TIME, last_update_time).await.unwrap(); |
| |
| let state_machine = StateMachineBuilder::new_stub() |
| .policy_engine(StubPolicyEngine::new(&mock_time)) |
| .storage(Rc::new(Mutex::new(storage))) |
| .build() |
| .await; |
| |
| assert_eq!( |
| state_machine.context.schedule.last_update_time.unwrap(), |
| PartialComplexTime::Wall(last_update_time) |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_load_server_dictated_poll_interval() { |
| block_on(async { |
| let mut storage = MemStorage::new(); |
| storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 56789).await.unwrap(); |
| |
| let state_machine = |
| StateMachineBuilder::new_stub().storage(Rc::new(Mutex::new(storage))).build().await; |
| |
| assert_eq!( |
| Some(Duration::from_micros(56789)), |
| state_machine.context.state.server_dictated_poll_interval |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_load_app() { |
| block_on(async { |
| let app_set = AppSet::new(vec![App::builder( |
| "{00000000-0000-0000-0000-000000000001}", |
| [1, 2, 3, 4], |
| ) |
| .build()]); |
| let mut storage = MemStorage::new(); |
| let persisted_app = PersistedApp { |
| cohort: Cohort { |
| id: Some("cohort_id".to_string()), |
| hint: Some("test_channel".to_string()), |
| name: None, |
| }, |
| user_counting: UserCounting::ClientRegulatedByDate(Some(22222)), |
| }; |
| let json = serde_json::to_string(&persisted_app).unwrap(); |
| let apps = app_set.to_vec().await; |
| storage.set_string(&apps[0].id, &json).await.unwrap(); |
| |
| let _state_machine = StateMachineBuilder::new_stub() |
| .storage(Rc::new(Mutex::new(storage))) |
| .app_set(app_set.clone()) |
| .build() |
| .await; |
| |
| let apps = app_set.to_vec().await; |
| assert_eq!(persisted_app.cohort, apps[0].cohort); |
| assert_eq!(UserCounting::ClientRegulatedByDate(Some(22222)), apps[0].user_counting); |
| }); |
| } |
| |
| #[test] |
| fn test_report_check_interval_with_no_storage() { |
| block_on(async { |
| let mut mock_time = MockTimeSource::new_from_now(); |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .build() |
| .await; |
| |
| state_machine.report_check_interval(InstallSource::ScheduledTask).await; |
| // No metrics should be reported because no LAST_UPDATE_TIME in storage. |
| assert!(state_machine.metrics_reporter.metrics.is_empty()); |
| |
| // A second update check should report metrics. |
| let interval = Duration::from_micros(999999); |
| mock_time.advance(interval); |
| |
| state_machine.report_check_interval(InstallSource::ScheduledTask).await; |
| |
| assert_eq!( |
| state_machine.metrics_reporter.metrics, |
| vec![Metrics::UpdateCheckInterval { |
| interval, |
| clock: ClockType::Monotonic, |
| install_source: InstallSource::ScheduledTask, |
| }] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_report_check_interval_mono_transition() { |
| block_on(async { |
| let mut mock_time = MockTimeSource::new_from_now(); |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .build() |
| .await; |
| |
| // Make sure that, provided a wall time, we get an initial report |
| // using the wall time. |
| let initial_duration = Duration::from_secs(999); |
| let initial_time = mock_time.now_in_walltime() - initial_duration; |
| state_machine.context.schedule.last_update_check_time = |
| Some(PartialComplexTime::Wall(initial_time)); |
| state_machine.report_check_interval(InstallSource::ScheduledTask).await; |
| |
| // Advance one more time, and this time we should see a monotonic delta. |
| let interval = Duration::from_micros(999999); |
| mock_time.advance(interval); |
| state_machine.report_check_interval(InstallSource::ScheduledTask).await; |
| |
| // One final time, to demonstrate monotonic time edges to |
| // monotonic time. |
| mock_time.advance(interval); |
| state_machine.report_check_interval(InstallSource::ScheduledTask).await; |
| assert_eq!( |
| state_machine.metrics_reporter.metrics, |
| vec![ |
| Metrics::UpdateCheckInterval { |
| interval: initial_duration, |
| clock: ClockType::Wall, |
| install_source: InstallSource::ScheduledTask, |
| }, |
| Metrics::UpdateCheckInterval { |
| interval, |
| clock: ClockType::Monotonic, |
| install_source: InstallSource::ScheduledTask, |
| }, |
| Metrics::UpdateCheckInterval { |
| interval, |
| clock: ClockType::Monotonic, |
| install_source: InstallSource::ScheduledTask, |
| }, |
| ] |
| ); |
| }); |
| } |
| |
| #[derive(Debug)] |
| pub struct TestInstaller { |
| reboot_called: Rc<RefCell<bool>>, |
| install_fails: usize, |
| mock_time: MockTimeSource, |
| } |
| struct TestInstallerBuilder { |
| install_fails: usize, |
| mock_time: MockTimeSource, |
| } |
| impl TestInstaller { |
| fn builder(mock_time: MockTimeSource) -> TestInstallerBuilder { |
| TestInstallerBuilder { install_fails: 0, mock_time } |
| } |
| } |
| impl TestInstallerBuilder { |
| fn add_install_fail(mut self) -> Self { |
| self.install_fails += 1; |
| self |
| } |
| fn build(self) -> TestInstaller { |
| TestInstaller { |
| reboot_called: Rc::new(RefCell::new(false)), |
| install_fails: self.install_fails, |
| mock_time: self.mock_time, |
| } |
| } |
| } |
| const INSTALL_DURATION: Duration = Duration::from_micros(98765433); |
| |
| impl Installer for TestInstaller { |
| type InstallPlan = StubPlan; |
| type Error = StubInstallErrors; |
| fn perform_install<'a>( |
| &'a mut self, |
| _install_plan: &StubPlan, |
| observer: Option<&'a dyn ProgressObserver>, |
| ) -> BoxFuture<'a, Result<(), Self::Error>> { |
| if self.install_fails > 0 { |
| self.install_fails -= 1; |
| future::ready(Err(StubInstallErrors::Failed)).boxed() |
| } else { |
| self.mock_time.advance(INSTALL_DURATION); |
| async move { |
| if let Some(observer) = observer { |
| observer.receive_progress(None, 0.0, None, None).await; |
| observer.receive_progress(None, 0.3, None, None).await; |
| observer.receive_progress(None, 0.9, None, None).await; |
| observer.receive_progress(None, 1.0, None, None).await; |
| } |
| Ok(()) |
| } |
| .boxed() |
| } |
| } |
| |
| fn perform_reboot(&mut self) -> BoxFuture<'_, Result<(), anyhow::Error>> { |
| self.reboot_called.replace(true); |
| future::ready(Ok(())).boxed() |
| } |
| } |
| |
| #[test] |
| fn test_report_successful_update_duration() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| let mut mock_time = MockTimeSource::new_from_now(); |
| mock_time.truncate_submicrosecond_walltime(); |
| let now = mock_time.now(); |
| |
| let update_completed_time = now + INSTALL_DURATION; |
| let expected_update_duration = update_completed_time.wall_duration_since(now).unwrap(); |
| |
| let first_seen_time = now - Duration::from_micros(100000000); |
| |
| let expected_duration_since_first_seen = |
| update_completed_time.wall_duration_since(first_seen_time).unwrap(); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(TestInstaller::builder(mock_time.clone()).build()) |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| |
| { |
| let mut storage = storage.lock().await; |
| storage.set_string(INSTALL_PLAN_ID, "").await.unwrap(); |
| storage.set_time(UPDATE_FIRST_SEEN_TIME, first_seen_time).await.unwrap(); |
| storage.commit().await.unwrap(); |
| } |
| |
| state_machine.run_once().await; |
| |
| #[rustfmt::skip] |
| assert_matches!( |
| state_machine.metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }), |
| Metrics::SuccessfulUpdateDuration(install_duration), |
| Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }), |
| Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }), |
| Metrics::SuccessfulUpdateFromFirstSeen(duration_since_first_seen), |
| Metrics::AttemptsToSuccessfulCheck(1), |
| Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true }, |
| ] |
| if |
| *install_duration == expected_update_duration && |
| *duration_since_first_seen == expected_duration_since_first_seen |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_report_failed_update_duration() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(StubInstaller { should_fail: true }) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .build() |
| .await; |
| // clock::mock::set(time::i64_to_time(123456789)); |
| |
| state_machine.run_once().await; |
| |
| assert!(state_machine |
| .metrics_reporter |
| .metrics |
| .contains(&Metrics::FailedUpdateDuration(Duration::from_micros(0)))); |
| }); |
| } |
| |
| #[test] |
| fn test_record_update_first_seen_time() { |
| block_on(async { |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| let mut state_machine = |
| StateMachineBuilder::new_stub().storage(Rc::clone(&storage)).build().await; |
| |
| let mut mock_time = MockTimeSource::new_from_now(); |
| mock_time.truncate_submicrosecond_walltime(); |
| let now = mock_time.now_in_walltime(); |
| assert_eq!(state_machine.record_update_first_seen_time("id", now).await, now); |
| { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_string(INSTALL_PLAN_ID).await, Some("id".to_string())); |
| assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now)); |
| assert_eq!(storage.len(), 2); |
| assert!(storage.committed()); |
| } |
| |
| mock_time.advance(Duration::from_secs(1000)); |
| let now2 = mock_time.now_in_walltime(); |
| assert_eq!(state_machine.record_update_first_seen_time("id", now2).await, now); |
| { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_string(INSTALL_PLAN_ID).await, Some("id".to_string())); |
| assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now)); |
| assert_eq!(storage.len(), 2); |
| assert!(storage.committed()); |
| } |
| assert_eq!(state_machine.record_update_first_seen_time("id2", now2).await, now2); |
| { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_string(INSTALL_PLAN_ID).await, Some("id2".to_string())); |
| assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now2)); |
| assert_eq!(storage.len(), 2); |
| assert!(storage.committed()); |
| } |
| }); |
| } |
| |
| #[test] |
| fn test_report_attempts_to_successful_check() { |
| block_on(async { |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .installer(StubInstaller { should_fail: true }) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| |
| state_machine.report_attempts_to_successful_check(true).await; |
| |
| // consecutive_failed_update_attempts should be zero (there were no previous failures) |
| // but we should record an attempt in metrics |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 0); |
| assert_eq!( |
| state_machine.metrics_reporter.metrics, |
| vec![Metrics::AttemptsToSuccessfulCheck(1)] |
| ); |
| |
| state_machine.report_attempts_to_successful_check(false).await; |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 1); |
| |
| state_machine.report_attempts_to_successful_check(false).await; |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 2); |
| |
| // consecutive_failed_update_attempts should be reset to zero on success |
| // but we should record the previous number of failed attempts (2) + 1 in metrics |
| state_machine.report_attempts_to_successful_check(true).await; |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 0); |
| assert_eq!( |
| state_machine.metrics_reporter.metrics, |
| vec![Metrics::AttemptsToSuccessfulCheck(1), Metrics::AttemptsToSuccessfulCheck(3)] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_ping_omaha_updates_consecutive_failed_update_checks_and_persists() { |
| block_on(async { |
| let mut http = MockHttpRequest::empty(); |
| http.add_error(http_request::mock_errors::make_transport_error()); |
| http.add_response(HttpResponse::new(vec![])); |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| http.add_response(HttpResponse::new(response)); |
| |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| // Start out with a value in storage... |
| { |
| let mut storage = storage.lock().await; |
| storage.set_int(CONSECUTIVE_FAILED_UPDATE_CHECKS, 1); |
| storage.commit(); |
| } |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .storage(Rc::clone(&storage)) |
| .http(http) |
| .build() |
| .await; |
| |
| async_generator::generate(move |mut co| async move { |
| // Failed ping increases `consecutive_failed_update_checks`, adding the value from |
| // storage. |
| state_machine.ping_omaha(&mut co).await; |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 2); |
| { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await, Some(2)); |
| } |
| |
| state_machine.ping_omaha(&mut co).await; |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 3); |
| { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await, Some(3)); |
| } |
| |
| // Successful ping resets `consecutive_failed_update_checks`. |
| state_machine.ping_omaha(&mut co).await; |
| assert_eq!(state_machine.context.state.consecutive_failed_update_checks, 0); |
| { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await, None); |
| } |
| }) |
| .into_complete() |
| .await; |
| }); |
| } |
| |
| #[test] |
| fn test_report_attempts_to_successful_install() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response.clone())); |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| let mock_time = MockTimeSource::new_from_now(); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(TestInstaller::builder(mock_time.clone()).build()) |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| |
| state_machine.run_once().await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| state_machine.metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }), |
| Metrics::SuccessfulUpdateDuration(_), |
| Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }), |
| Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }), |
| Metrics::SuccessfulUpdateFromFirstSeen(_), |
| Metrics::AttemptsToSuccessfulCheck(1), |
| Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true }, |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_report_attempts_to_successful_install_fails_then_succeeds() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let mut http = MockHttpRequest::new(HttpResponse::new(response.clone())); |
| // Responses to events. This first batch corresponds to the install failure, so these |
| // should be the update download started, and another for a failed install. |
| // `Event::error(EventErrorCode::Installation)`. |
| http.add_response(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response.clone())); |
| |
| // Respond to the next request. |
| http.add_response(HttpResponse::new(response.clone())); |
| // Responses to events. This cooresponds to the update download started, and the other |
| // for a successful install. |
| http.add_response(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response.clone())); |
| |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| let mock_time = MockTimeSource::new_from_now(); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(TestInstaller::builder(mock_time.clone()).add_install_fail().build()) |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| |
| state_machine.run_once().await; |
| state_machine.run_once().await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| state_machine.metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| Metrics::FailedUpdateDuration(_), |
| Metrics::AttemptsToSuccessfulCheck(1), |
| Metrics::AttemptsToSuccessfulInstall { count: 1, successful: false }, |
| Metrics::UpdateCheckInterval { .. }, |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| Metrics::SuccessfulUpdateDuration(_), |
| Metrics::OmahaEventLost(Event { .. }), |
| Metrics::SuccessfulUpdateFromFirstSeen(_), |
| Metrics::AttemptsToSuccessfulCheck(1), |
| Metrics::AttemptsToSuccessfulInstall { count: 2, successful: true } |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_report_attempts_to_successful_install_does_not_report_for_no_update() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "noupdate", |
| "info": "no update for you" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response.clone())); |
| |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| let mock_time = MockTimeSource::new_from_now(); |
| |
| let mut state_machine = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(TestInstaller::builder(mock_time.clone()).build()) |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .metrics_reporter(MockMetricsReporter::new()) |
| .storage(Rc::clone(&storage)) |
| .build() |
| .await; |
| |
| state_machine.run_once().await; |
| |
| // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice |
| // patterns yet. |
| #[rustfmt::skip] |
| assert_matches!( |
| state_machine.metrics_reporter.metrics.as_slice(), |
| [ |
| Metrics::UpdateCheckResponseTime { response_time: _, successful: true }, |
| Metrics::RequestsPerCheck { count: 1, successful: true }, |
| Metrics::AttemptsToSuccessfulCheck(1), |
| ] |
| ); |
| }); |
| } |
| |
| #[test] |
| fn test_successful_update_triggers_reboot() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let mock_time = MockTimeSource::new_from_now(); |
| let next_update_time = mock_time.now(); |
| let (timer, mut timers) = BlockingTimer::new(); |
| |
| let installer = TestInstaller::builder(mock_time.clone()).build(); |
| let reboot_called = Rc::clone(&installer.reboot_called); |
| let (_ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(installer) |
| .policy_engine(StubPolicyEngine::new(mock_time)) |
| .timer(timer) |
| .start(), |
| ); |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| blocked_timer.unblock(); |
| pool.run_until_stalled(); |
| |
| assert!(*reboot_called.borrow()); |
| } |
| |
| #[test] |
| fn test_failed_update_does_not_trigger_reboot() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let mock_time = MockTimeSource::new_from_now(); |
| let next_update_time = mock_time.now(); |
| let (timer, mut timers) = BlockingTimer::new(); |
| |
| let installer = TestInstaller::builder(mock_time.clone()).add_install_fail().build(); |
| let reboot_called = Rc::clone(&installer.reboot_called); |
| let (_ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(installer) |
| .policy_engine(StubPolicyEngine::new(mock_time)) |
| .timer(timer) |
| .start(), |
| ); |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| blocked_timer.unblock(); |
| pool.run_until_stalled(); |
| |
| assert!(!*reboot_called.borrow()); |
| } |
| |
| // Verify that if we are in the middle of checking for or applying an update, a new OnDemand |
| // update check request will "upgrade" the inflight check request to behave as if it was |
| // OnDemand. In particular, this should cause an immediate reboot. |
| #[test] |
| fn test_reboots_immediately_if_user_initiated_update_requests_occurs_during_install() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let mock_time = MockTimeSource::new_from_now(); |
| |
| let (send_install, mut recv_install) = mpsc::channel(0); |
| let (send_reboot, mut recv_reboot) = mpsc::channel(0); |
| let reboot_check_options_received = Rc::new(RefCell::new(vec![])); |
| let policy_engine = MockPolicyEngine { |
| reboot_check_options_received: Rc::clone(&reboot_check_options_received), |
| check_timing: Some(CheckTiming::builder().time(mock_time.now()).build()), |
| ..MockPolicyEngine::default() |
| }; |
| |
| let (mut ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(BlockingInstaller { |
| on_install: send_install, |
| on_reboot: Some(send_reboot), |
| }) |
| .policy_engine(policy_engine) |
| .start(), |
| ); |
| |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| let unblock_install = pool.run_until(recv_install.next()).unwrap(); |
| pool.run_until_stalled(); |
| assert_eq!( |
| observer.take_states(), |
| vec![State::CheckingForUpdates, State::InstallingUpdate] |
| ); |
| |
| pool.run_until(async { |
| assert_eq!( |
| ctl.start_update_check(CheckOptions { source: InstallSource::OnDemand }).await, |
| Ok(StartUpdateCheckResponse::AlreadyRunning) |
| ); |
| }); |
| |
| pool.run_until_stalled(); |
| assert_eq!(observer.take_states(), vec![]); |
| |
| unblock_install.send(Ok(())).unwrap(); |
| pool.run_until_stalled(); |
| assert_eq!(observer.take_states(), vec![State::WaitingForReboot]); |
| |
| let unblock_reboot = pool.run_until(recv_reboot.next()).unwrap(); |
| pool.run_until_stalled(); |
| unblock_reboot.send(Ok(())).unwrap(); |
| |
| // Make sure when we checked whether we could reboot, it was from an OnDemand source |
| assert_eq!( |
| *reboot_check_options_received.borrow(), |
| vec![CheckOptions { source: InstallSource::OnDemand }] |
| ); |
| } |
| |
| // Verifies that if the state machine is done with an install and waiting for a reboot, and a |
| // user-initiated UpdateCheckRequest comes in, we reboot immediately. |
| #[test] |
| fn test_reboots_immediately_when_check_now_comes_in_during_wait() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let mut http = MockHttpRequest::new(HttpResponse::new(response.clone())); |
| // Responses to events. |
| http.add_response(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response.clone())); |
| // Response to the ping. |
| http.add_response(HttpResponse::new(response)); |
| let mut mock_time = MockTimeSource::new_from_now(); |
| mock_time.truncate_submicrosecond_walltime(); |
| let next_update_time = mock_time.now() + Duration::from_secs(1000); |
| let (timer, mut timers) = BlockingTimer::new(); |
| let reboot_allowed = Rc::new(RefCell::new(false)); |
| let reboot_check_options_received = Rc::new(RefCell::new(vec![])); |
| let policy_engine = MockPolicyEngine { |
| time_source: mock_time.clone(), |
| reboot_allowed: Rc::clone(&reboot_allowed), |
| check_timing: Some(CheckTiming::builder().time(next_update_time).build()), |
| reboot_check_options_received: Rc::clone(&reboot_check_options_received), |
| ..MockPolicyEngine::default() |
| }; |
| let installer = TestInstaller::builder(mock_time.clone()).build(); |
| let reboot_called = Rc::clone(&installer.reboot_called); |
| let storage_ref = Rc::new(Mutex::new(MemStorage::new())); |
| let apps = make_test_app_set(); |
| |
| let (mut ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .app_set(apps.clone()) |
| .http(http) |
| .installer(installer) |
| .policy_engine(policy_engine) |
| .timer(timer) |
| .storage(Rc::clone(&storage_ref)) |
| .start(), |
| ); |
| |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| // The first wait before update check. |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| blocked_timer.unblock(); |
| pool.run_until_stalled(); |
| |
| // The timers for reboot and ping, even though the order should be deterministic, but that |
| // is an implementation detail, the test should still pass if that order changes. |
| let blocked_timer1 = pool.run_until(timers.next()).unwrap(); |
| let blocked_timer2 = pool.run_until(timers.next()).unwrap(); |
| let (wait_for_reboot_timer, _wait_for_next_ping_timer) = |
| match blocked_timer1.requested_wait() { |
| RequestedWait::For(_) => (blocked_timer1, blocked_timer2), |
| RequestedWait::Until(_) => (blocked_timer2, blocked_timer1), |
| }; |
| // This is the timer waiting for next reboot_allowed check. |
| assert_eq!( |
| wait_for_reboot_timer.requested_wait(), |
| RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL) |
| ); |
| |
| // If we send an update check request that's from a user (source == OnDemand), we should |
| // short-circuit the wait for reboot, and update immediately. |
| assert!(!*reboot_called.borrow()); |
| *reboot_allowed.borrow_mut() = true; |
| pool.run_until(async { |
| assert_eq!( |
| ctl.start_update_check(CheckOptions { source: InstallSource::OnDemand }).await, |
| Ok(StartUpdateCheckResponse::AlreadyRunning) |
| ); |
| }); |
| pool.run_until_stalled(); |
| assert!(*reboot_called.borrow()); |
| |
| // Check that we got one check for reboot from a Scheduled Task (the start of the wait), |
| // and then another came in with OnDemand, as we "upgraded it" with our OnDemand check |
| // request |
| assert_eq!( |
| *reboot_check_options_received.borrow(), |
| vec![ |
| CheckOptions { source: InstallSource::ScheduledTask }, |
| CheckOptions { source: InstallSource::OnDemand }, |
| ] |
| ); |
| } |
| |
| // Verifies that if reboot is not allowed, state machine will send pings to Omaha while waiting |
| // for reboot, and it will reply AlreadyRunning to any StartUpdateCheck requests, and when it's |
| // finally time to reboot, it will trigger reboot. |
| #[test] |
| fn test_wait_for_reboot() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let mut http = MockHttpRequest::new(HttpResponse::new(response.clone())); |
| // Responses to events. |
| http.add_response(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response.clone())); |
| http.add_response(HttpResponse::new(response.clone())); |
| // Response to the ping. |
| http.add_response(HttpResponse::new(response)); |
| let ping_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell()); |
| let second_ping_request_viewer = |
| MockHttpRequest::from_request_cell(http.get_request_cell()); |
| let mut mock_time = MockTimeSource::new_from_now(); |
| mock_time.truncate_submicrosecond_walltime(); |
| let next_update_time = mock_time.now() + Duration::from_secs(1000); |
| let (timer, mut timers) = BlockingTimer::new(); |
| let reboot_allowed = Rc::new(RefCell::new(false)); |
| let policy_engine = MockPolicyEngine { |
| time_source: mock_time.clone(), |
| reboot_allowed: Rc::clone(&reboot_allowed), |
| check_timing: Some(CheckTiming::builder().time(next_update_time).build()), |
| ..MockPolicyEngine::default() |
| }; |
| let installer = TestInstaller::builder(mock_time.clone()).build(); |
| let reboot_called = Rc::clone(&installer.reboot_called); |
| let storage_ref = Rc::new(Mutex::new(MemStorage::new())); |
| let apps = make_test_app_set(); |
| |
| let (mut ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .app_set(apps.clone()) |
| .http(http) |
| .installer(installer) |
| .policy_engine(policy_engine) |
| .timer(timer) |
| .storage(Rc::clone(&storage_ref)) |
| .start(), |
| ); |
| |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| // The first wait before update check. |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| blocked_timer.unblock(); |
| pool.run_until_stalled(); |
| |
| // The timers for reboot and ping, even though the order should be deterministic, but that |
| // is an implementation detail, the test should still pass if that order changes. |
| let blocked_timer1 = pool.run_until(timers.next()).unwrap(); |
| let blocked_timer2 = pool.run_until(timers.next()).unwrap(); |
| let (wait_for_reboot_timer, wait_for_next_ping_timer) = |
| match blocked_timer1.requested_wait() { |
| RequestedWait::For(_) => (blocked_timer1, blocked_timer2), |
| RequestedWait::Until(_) => (blocked_timer2, blocked_timer1), |
| }; |
| // This is the timer waiting for next reboot_allowed check. |
| assert_eq!( |
| wait_for_reboot_timer.requested_wait(), |
| RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL) |
| ); |
| // This is the timer waiting for the next ping. |
| assert_eq!( |
| wait_for_next_ping_timer.requested_wait(), |
| RequestedWait::Until(next_update_time.into()) |
| ); |
| // Unblock the ping. |
| mock_time.advance(Duration::from_secs(1000)); |
| wait_for_next_ping_timer.unblock(); |
| pool.run_until_stalled(); |
| |
| // Verify that it sends a ping. |
| let config = crate::configuration::test_support::config_generator(); |
| let request_params = RequestParams::default(); |
| let apps = pool.run_until(apps.to_vec()); |
| let mut expected_request_builder = RequestBuilder::new(&config, &request_params) |
| // 0: session id for update check |
| // 1: request id for update check |
| // 2-4: request id for events |
| .session_id(GUID::from_u128(5)) |
| .request_id(GUID::from_u128(6)); |
| for app in &apps { |
| expected_request_builder = expected_request_builder.add_ping(&app); |
| } |
| pool.run_until(assert_request(ping_request_viewer, expected_request_builder)); |
| |
| pool.run_until(async { |
| assert_eq!( |
| ctl.start_update_check(CheckOptions::default()).await, |
| Ok(StartUpdateCheckResponse::AlreadyRunning) |
| ); |
| }); |
| |
| // Last update time is updated in storage. |
| pool.run_until(async { |
| let storage = storage_ref.lock().await; |
| let context = update_check::Context::load(&*storage).await; |
| assert_eq!(context.schedule.last_update_time, Some(mock_time.now_in_walltime().into())); |
| }); |
| |
| // State machine should be waiting for the next ping. |
| let wait_for_next_ping_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!( |
| wait_for_next_ping_timer.requested_wait(), |
| RequestedWait::Until(next_update_time.into()) |
| ); |
| |
| // Let state machine check reboot_allowed again, but still don't allow it. |
| wait_for_reboot_timer.unblock(); |
| pool.run_until_stalled(); |
| assert!(!*reboot_called.borrow()); |
| |
| // State machine should be waiting for the next reboot. |
| let wait_for_reboot_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!( |
| wait_for_reboot_timer.requested_wait(), |
| RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL) |
| ); |
| |
| // Time for a second ping. |
| wait_for_next_ping_timer.unblock(); |
| pool.run_until_stalled(); |
| |
| // Verify that it sends another ping. |
| let mut expected_request_builder = RequestBuilder::new(&config, &request_params) |
| .session_id(GUID::from_u128(7)) |
| .request_id(GUID::from_u128(8)); |
| for app in &apps { |
| expected_request_builder = expected_request_builder.add_ping(&app); |
| } |
| pool.run_until(assert_request(second_ping_request_viewer, expected_request_builder)); |
| |
| assert!(!*reboot_called.borrow()); |
| |
| // Now allow reboot. |
| *reboot_called.borrow_mut() = true; |
| wait_for_reboot_timer.unblock(); |
| pool.run_until_stalled(); |
| assert!(*reboot_called.borrow()); |
| } |
| |
| #[derive(Debug)] |
| struct BlockingInstaller { |
| on_install: mpsc::Sender<oneshot::Sender<Result<(), StubInstallErrors>>>, |
| on_reboot: Option<mpsc::Sender<oneshot::Sender<Result<(), anyhow::Error>>>>, |
| } |
| |
| impl Installer for BlockingInstaller { |
| type InstallPlan = StubPlan; |
| type Error = StubInstallErrors; |
| |
| fn perform_install( |
| &mut self, |
| _install_plan: &StubPlan, |
| _observer: Option<&dyn ProgressObserver>, |
| ) -> BoxFuture<'_, Result<(), StubInstallErrors>> { |
| let (send, recv) = oneshot::channel(); |
| let send_fut = self.on_install.send(send); |
| |
| async move { |
| send_fut.await.unwrap(); |
| recv.await.unwrap() |
| } |
| .boxed() |
| } |
| |
| fn perform_reboot(&mut self) -> BoxFuture<'_, Result<(), anyhow::Error>> { |
| match &mut self.on_reboot { |
| Some(on_reboot) => { |
| let (send, recv) = oneshot::channel(); |
| let send_fut = on_reboot.send(send); |
| |
| async move { |
| send_fut.await.unwrap(); |
| recv.await.unwrap() |
| } |
| .boxed() |
| } |
| None => future::ready(Ok(())).boxed(), |
| } |
| } |
| } |
| |
| #[derive(Debug, Default)] |
| struct TestObserver { |
| states: Rc<RefCell<Vec<State>>>, |
| } |
| |
| impl TestObserver { |
| fn observe(&self, s: impl Stream<Item = StateMachineEvent>) -> impl Future<Output = ()> { |
| let states = Rc::clone(&self.states); |
| async move { |
| futures::pin_mut!(s); |
| while let Some(event) = s.next().await { |
| match event { |
| StateMachineEvent::StateChange(state) => { |
| states.borrow_mut().push(state); |
| } |
| _ => {} |
| } |
| } |
| } |
| } |
| |
| fn observe_until_terminal( |
| &self, |
| s: impl Stream<Item = StateMachineEvent>, |
| ) -> impl Future<Output = ()> { |
| let states = Rc::clone(&self.states); |
| async move { |
| futures::pin_mut!(s); |
| while let Some(event) = s.next().await { |
| match event { |
| StateMachineEvent::StateChange(state) => { |
| states.borrow_mut().push(state); |
| match state { |
| State::Idle | State::WaitingForReboot => return, |
| _ => {} |
| } |
| } |
| _ => {} |
| } |
| } |
| } |
| } |
| |
| fn take_states(&self) -> Vec<State> { |
| std::mem::replace(&mut *self.states.borrow_mut(), vec![]) |
| } |
| } |
| |
| #[test] |
| fn test_start_update_during_update_replies_with_in_progress() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let (send_install, mut recv_install) = mpsc::channel(0); |
| let (mut ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(BlockingInstaller { on_install: send_install, on_reboot: None }) |
| .start(), |
| ); |
| |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe_until_terminal(state_machine)).unwrap(); |
| |
| let unblock_install = pool.run_until(recv_install.next()).unwrap(); |
| pool.run_until_stalled(); |
| assert_eq!( |
| observer.take_states(), |
| vec![State::CheckingForUpdates, State::InstallingUpdate] |
| ); |
| |
| pool.run_until(async { |
| assert_eq!( |
| ctl.start_update_check(CheckOptions::default()).await, |
| Ok(StartUpdateCheckResponse::AlreadyRunning) |
| ); |
| }); |
| pool.run_until_stalled(); |
| assert_eq!(observer.take_states(), vec![]); |
| |
| unblock_install.send(Ok(())).unwrap(); |
| pool.run_until_stalled(); |
| |
| assert_eq!(observer.take_states(), vec![State::WaitingForReboot]); |
| } |
| |
| #[test] |
| fn test_start_update_during_timer_starts_update() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let mut mock_time = MockTimeSource::new_from_now(); |
| let next_update_time = mock_time.now() + Duration::from_secs(321); |
| |
| let (timer, mut timers) = BlockingTimer::new(); |
| let policy_engine = MockPolicyEngine { |
| check_timing: Some(CheckTiming::builder().time(next_update_time).build()), |
| time_source: mock_time.clone(), |
| ..MockPolicyEngine::default() |
| }; |
| let (mut ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub().policy_engine(policy_engine).timer(timer).start(), |
| ); |
| |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| mock_time.advance(Duration::from_secs(200)); |
| assert_eq!(observer.take_states(), vec![]); |
| |
| // Nothing happens while the timer is waiting. |
| pool.run_until_stalled(); |
| assert_eq!(observer.take_states(), vec![]); |
| |
| blocked_timer.unblock(); |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| assert_eq!( |
| observer.take_states(), |
| vec![State::CheckingForUpdates, State::ErrorCheckingForUpdate, State::Idle] |
| ); |
| |
| // Unless a control signal to start an update check comes in. |
| pool.run_until(async { |
| assert_eq!( |
| ctl.start_update_check(CheckOptions::default()).await, |
| Ok(StartUpdateCheckResponse::Started) |
| ); |
| }); |
| pool.run_until_stalled(); |
| assert_eq!( |
| observer.take_states(), |
| vec![State::CheckingForUpdates, State::ErrorCheckingForUpdate, State::Idle] |
| ); |
| } |
| |
| #[test] |
| fn test_start_update_check_returns_throttled() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let mut mock_time = MockTimeSource::new_from_now(); |
| let next_update_time = mock_time.now() + Duration::from_secs(321); |
| |
| let (timer, mut timers) = BlockingTimer::new(); |
| let policy_engine = MockPolicyEngine { |
| check_timing: Some(CheckTiming::builder().time(next_update_time).build()), |
| time_source: mock_time.clone(), |
| check_decision: CheckDecision::ThrottledByPolicy, |
| ..MockPolicyEngine::default() |
| }; |
| let (mut ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub().policy_engine(policy_engine).timer(timer).start(), |
| ); |
| |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| |
| let blocked_timer = pool.run_until(timers.next()).unwrap(); |
| assert_eq!(blocked_timer.requested_wait(), RequestedWait::Until(next_update_time.into())); |
| mock_time.advance(Duration::from_secs(200)); |
| assert_eq!(observer.take_states(), vec![]); |
| |
| pool.run_until(async { |
| assert_eq!( |
| ctl.start_update_check(CheckOptions::default()).await, |
| Ok(StartUpdateCheckResponse::Throttled) |
| ); |
| }); |
| pool.run_until_stalled(); |
| assert_eq!(observer.take_states(), vec![]); |
| } |
| |
| #[test] |
| fn test_progress_observer() { |
| block_on(async { |
| let response = json!({"response":{ |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok" |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let mock_time = MockTimeSource::new_from_now(); |
| let progresses = StateMachineBuilder::new_stub() |
| .http(http) |
| .installer(TestInstaller::builder(mock_time.clone()).build()) |
| .policy_engine(StubPolicyEngine::new(mock_time)) |
| .oneshot_check() |
| .await |
| .filter_map(|event| { |
| future::ready(match event { |
| StateMachineEvent::InstallProgressChange(InstallProgress { progress }) => { |
| Some(progress) |
| } |
| _ => None, |
| }) |
| }) |
| .collect::<Vec<f32>>() |
| .await; |
| assert_eq!(progresses, [0.0, 0.3, 0.9, 1.0]); |
| }); |
| } |
| |
| #[test] |
| // A scenario in which |
| // (now_in_monotonic - state_machine_start_in_monotonic) > (update_finish_time - now_in_wall) |
| // should not panic. |
| fn test_report_waited_for_reboot_duration_doesnt_panic_on_wrong_current_time() { |
| block_on(async { |
| let metrics_reporter = MockMetricsReporter::new(); |
| |
| let state_machine_start_monotonic = Instant::now(); |
| let update_finish_time = SystemTime::now(); |
| |
| // Set the monotonic increase in time larger than the wall time increase since the end |
| // of the last update. |
| // This can happen if we don't have a reliable current wall time. |
| let now_wall = update_finish_time + Duration::from_secs(1); |
| let now_monotonic = state_machine_start_monotonic + Duration::from_secs(10); |
| |
| let mut state_machine = |
| StateMachineBuilder::new_stub().metrics_reporter(metrics_reporter).build().await; |
| |
| // Time has advanced monotonically since we noted the start of the state machine for |
| // longer than the wall time difference between update finish time and now. |
| // This computation should currently overflow. |
| state_machine |
| .report_waited_for_reboot_duration( |
| update_finish_time, |
| state_machine_start_monotonic, |
| ComplexTime { wall: now_wall, mono: now_monotonic }, |
| ) |
| .expect_err("should overflow and error out"); |
| |
| // We should have reported no metrics |
| assert!(state_machine.metrics_reporter.metrics.is_empty()); |
| }); |
| } |
| |
| #[test] |
| fn test_report_waited_for_reboot_duration() { |
| let mut pool = LocalPool::new(); |
| let spawner = pool.spawner(); |
| |
| let response = json!({"response": { |
| "server": "prod", |
| "protocol": "3.0", |
| "app": [{ |
| "appid": "{00000000-0000-0000-0000-000000000001}", |
| "status": "ok", |
| "updatecheck": { |
| "status": "ok", |
| "manifest": { |
| "version": "1.2.3.5", |
| "actions": { |
| "action": [], |
| }, |
| "packages": { |
| "package": [], |
| }, |
| } |
| } |
| }], |
| }}); |
| let response = serde_json::to_vec(&response).unwrap(); |
| let http = MockHttpRequest::new(HttpResponse::new(response)); |
| let mut mock_time = MockTimeSource::new_from_now(); |
| mock_time.truncate_submicrosecond_walltime(); |
| let storage = Rc::new(Mutex::new(MemStorage::new())); |
| |
| // Do one update. |
| assert_matches!( |
| pool.run_until( |
| StateMachineBuilder::new_stub() |
| .http(http) |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .storage(Rc::clone(&storage)) |
| .oneshot() |
| ), |
| Ok(_) |
| ); |
| |
| mock_time.advance(Duration::from_secs(999)); |
| |
| // Execute state machine `run()`, simulating that we already rebooted. |
| let config = Config { |
| updater: Updater { name: "updater".to_string(), version: Version::from([0, 1]) }, |
| os: OS { version: "1.2.3.5".to_string(), ..OS::default() }, |
| service_url: "http://example.com/".to_string(), |
| }; |
| let metrics_reporter = Rc::new(RefCell::new(MockMetricsReporter::new())); |
| let (_ctl, state_machine) = pool.run_until( |
| StateMachineBuilder::new_stub() |
| .config(config) |
| .metrics_reporter(Rc::clone(&metrics_reporter)) |
| .policy_engine(StubPolicyEngine::new(mock_time.clone())) |
| .storage(Rc::clone(&storage)) |
| .timer(MockTimer::new()) |
| .start(), |
| ); |
| |
| // Move state machine forward using observer. |
| let observer = TestObserver::default(); |
| spawner.spawn_local(observer.observe(state_machine)).unwrap(); |
| pool.run_until_stalled(); |
| |
| assert_eq!( |
| metrics_reporter |
| .borrow() |
| .metrics |
| .iter() |
| .filter(|m| match m { |
| Metrics::WaitedForRebootDuration(_) => true, |
| _ => false, |
| }) |
| .collect::<Vec<_>>(), |
| vec![&Metrics::WaitedForRebootDuration(Duration::from_secs(999))] |
| ); |
| |
| // Verify that storage is cleaned up. |
| pool.run_until(async { |
| let storage = storage.lock().await; |
| assert_eq!(storage.get_time(UPDATE_FINISH_TIME).await, None); |
| assert_eq!(storage.get_string(TARGET_VERSION).await, None); |
| assert!(storage.committed()); |
| }) |
| } |
| } |