blob: 32fcdba2c730401e8151f56718da126316a96317 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
constants::{
CONVERGE_SAMPLES, INITIAL_SAMPLE_POLLS, MAX_TIME_BETWEEN_SAMPLES_RANDOMIZATION,
SAMPLE_POLLS,
},
datatypes::{HttpsSample, Phase},
diagnostics::{Diagnostics, Event},
sampler::HttpsSampler,
Config,
},
anyhow::Error,
async_trait::async_trait,
fidl_fuchsia_time_external::{Properties, Status, TimeSample, Urgency},
fuchsia_async as fasync, fuchsia_zircon as zx,
futures::{channel::mpsc::Sender, future::BoxFuture, lock::Mutex, Future, SinkExt},
httpdate_hyper::{HttpsDateError, HttpsDateErrorType},
push_source::Update,
rand::Rng,
tracing::{debug, error, info},
};
/// A definition of how long an algorithm should wait between polls. Defines fixed wait durations
/// following successful poll attempts, and a capped exponential backoff following failed poll
/// attempts.
pub struct RetryStrategy {
pub min_between_failures: zx::Duration,
pub max_exponent: u32,
pub tries_per_exponent: u32,
pub converge_time_between_samples: zx::Duration,
pub maintain_time_between_samples: zx::Duration,
}
impl RetryStrategy {
/// Returns the duration to wait after a failed poll attempt. |attempt_index| is a zero-based
/// index of the failed attempt, i.e. after the third failed attempt `attempt_index` = 2.
fn backoff_duration(&self, attempt_index: u32) -> zx::Duration {
let exponent = std::cmp::min(attempt_index / self.tries_per_exponent, self.max_exponent);
zx::Duration::from_nanos(self.min_between_failures.into_nanos() * 2i64.pow(exponent))
}
}
/// An |UpdateAlgorithm| that uses an `HttpsSampler` to obtain time samples at a schedule
/// dictated by a specified retry strategy.
pub struct HttpsDateUpdateAlgorithm<
'a,
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
> {
/// Strategy defining how long to wait after successes and failures.
retry_strategy: RetryStrategy,
/// Sampler used to produce samples.
sampler: S,
/// Object managing diagnostics output.
diagnostics: D,
/// Future that completes when the network is available. A 'None' value indicates the network
/// check has previously completed.
network_available_fut: Mutex<Option<N>>,
/// HttpsDate config.
config: &'a Config,
}
impl<'a, S, D, N> HttpsDateUpdateAlgorithm<'a, S, D, N>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
async fn handle_produce_sample(&self, sample_fut: BoxFuture<'_, HttpsSample>) -> HttpsSample {
let sample = sample_fut.await;
info!(
"Got a time sample - UTC {:?}, bound size {:?}, and polls {:?}",
sample.utc, sample.final_bound_size, sample.polls
);
self.diagnostics.record(Event::Success(&sample));
sample
}
// Returns `Some(new_status)` if the error changes the current status.
fn handle_sample_error(
&self,
http_error: HttpsDateError,
last_error_type: &mut Option<HttpsDateErrorType>,
) -> Option<Status> {
let error_type = http_error.error_type();
self.diagnostics.record(Event::Failure(error_type));
if Some(error_type) != *last_error_type {
*last_error_type = Some(error_type);
let new_status = match error_type {
HttpsDateErrorType::InvalidHostname | HttpsDateErrorType::SchemeNotHttps => {
// TODO(https://fxbug.dev/42137876) - decide how to surface irrecoverable
// errors to clients
error!(
"Got an unexpected error {:?}, which indicates a bad \
configuration.",
http_error
);
Status::UnknownUnhealthy
}
HttpsDateErrorType::NetworkError => {
debug!("Failed to poll time: {:?}", http_error);
Status::Network
}
_ => {
debug!("Failed to poll time: {:?}", http_error);
Status::Protocol
}
};
Some(new_status)
} else {
None
}
}
}
#[async_trait]
impl<'a, S, D, N> push_source::UpdateAlgorithm for HttpsDateUpdateAlgorithm<'a, S, D, N>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
async fn update_device_properties(&self, _properties: Properties) {
// since our samples are polled independently for now, we don't need to use
// device properties yet.
}
async fn generate_updates(&self, mut sink: Sender<Update>) -> Result<(), Error> {
let mut network_available_lock = self.network_available_fut.lock().await;
if let Some(fut) = network_available_lock.take() {
info!("Waiting for network to become available.");
match fut.await {
Ok(()) => {
info!("Network check completed.");
sink.send(Status::Ok.into()).await?;
self.diagnostics.record(Event::NetworkCheckSuccessful);
}
Err(e) => {
error!("Network check failed, polling for time anyway: {:?}", e);
sink.send(Status::Network.into()).await?;
}
}
}
drop(network_available_lock);
// randomize poll timings somewhat so polls across devices will not be synchronized
let random_factor = 1f32
+ rand::thread_rng().gen_range(
-MAX_TIME_BETWEEN_SAMPLES_RANDOMIZATION..MAX_TIME_BETWEEN_SAMPLES_RANDOMIZATION,
);
let converge_time_between_samples =
mult_duration(self.retry_strategy.converge_time_between_samples, random_factor);
let maintain_time_between_samples =
mult_duration(self.retry_strategy.maintain_time_between_samples, random_factor);
self.diagnostics.record(Event::Phase(Phase::Initial));
self.try_generate_sample_until_successful(INITIAL_SAMPLE_POLLS, &mut sink).await?;
self.diagnostics.record(Event::Phase(Phase::Converge));
for _ in 0..CONVERGE_SAMPLES {
fasync::Timer::new(fasync::Time::after(converge_time_between_samples)).await;
self.try_generate_sample_until_successful(SAMPLE_POLLS, &mut sink).await?;
}
self.diagnostics.record(Event::Phase(Phase::Maintain));
loop {
fasync::Timer::new(fasync::Time::after(maintain_time_between_samples)).await;
self.try_generate_sample_until_successful(SAMPLE_POLLS, &mut sink).await?;
}
}
}
#[async_trait]
impl<'a, S, D, N> pull_source::UpdateAlgorithm for HttpsDateUpdateAlgorithm<'a, S, D, N>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
async fn update_device_properties(&self, _properties: Properties) {
// since our samples are polled independently for now, we don't need to use
// device properties yet.
}
async fn sample(&self, urgency: Urgency) -> Result<TimeSample, pull_source::SampleError> {
let sample_config =
self.config.sample_config_by_urgency.get(&urgency).ok_or_else(|| {
error!("No config for urgency level {:?}", urgency);
pull_source::SampleError::Internal
})?;
let num_attempts = sample_config.max_attempts;
let num_polls: usize = sample_config.num_polls.try_into().map_err(|e| {
error!("num_polls numeric overflow: {:?}", e);
pull_source::SampleError::Internal
})?;
let mut last_error_type = None;
let mut attempt_iter = 0u32..num_attempts;
loop {
let attempt = match attempt_iter.next() {
Some(a) => a,
None => match last_error_type {
None => {
debug!("Exhausted attempts to fetch a sample, empty last error type.");
return Err(pull_source::SampleError::Internal);
}
Some(e) => {
debug!("Exhausted attempts to fetch a sample, last error type {:?}.", e);
match e {
HttpsDateErrorType::InvalidHostname
| HttpsDateErrorType::SchemeNotHttps => {
return Err(pull_source::SampleError::Internal)
}
_ => return Err(pull_source::SampleError::Network),
}
}
},
};
match self.sampler.produce_sample(num_polls).await {
Ok(sample_fut) => {
return Ok(self.handle_produce_sample(sample_fut).await.into());
}
Err(http_error) => {
let _ = self.handle_sample_error(http_error, &mut last_error_type);
}
}
fasync::Timer::new(fasync::Time::after(self.retry_strategy.backoff_duration(attempt)))
.await;
}
}
async fn next_possible_sample_time(&self) -> zx::Time {
// TODO(https://fxbug.dev/42065019): Implement rate limiting if required.
zx::Time::get_monotonic()
}
}
impl<'a, S, D, N> HttpsDateUpdateAlgorithm<'a, S, D, N>
where
S: HttpsSampler + Send + Sync,
D: Diagnostics,
N: Future<Output = Result<(), Error>> + Send,
{
pub fn new(
retry_strategy: RetryStrategy,
diagnostics: D,
sampler: S,
network_available_fut: N,
config: &'a Config,
) -> Self {
Self {
retry_strategy,
sampler,
diagnostics,
network_available_fut: Mutex::new(Some(network_available_fut)),
config,
}
}
/// Repeatedly poll for a time until one sample is successfully retrieved. Pushes updates to
/// |sink|.
async fn try_generate_sample_until_successful(
&self,
num_polls: usize,
sink: &mut Sender<Update>,
) -> Result<(), Error> {
let mut attempt_iter = 0u32..;
let mut last_error_type = None;
loop {
let attempt = attempt_iter.next().unwrap_or(u32::MAX);
match self.sampler.produce_sample(num_polls).await {
Ok(sample_fut) => {
sink.send(Status::Ok.into()).await?;
let sample = self.handle_produce_sample(sample_fut).await;
sink.send(sample.into()).await?;
return Ok(());
}
Err(http_error) => {
if let Some(new_status) =
self.handle_sample_error(http_error, &mut last_error_type)
{
sink.send(new_status.into()).await?;
}
}
}
fasync::Timer::new(fasync::Time::after(self.retry_strategy.backoff_duration(attempt)))
.await;
}
}
}
fn mult_duration(duration: zx::Duration, factor: f32) -> zx::Duration {
let nanos_float = (duration.into_nanos() as f64) * factor as f64;
zx::Duration::from_nanos(nanos_float as i64)
}
#[cfg(test)]
mod test {
use {
super::*,
crate::{
datatypes::Poll, diagnostics::FakeDiagnostics, sampler::FakeSampler, SampleConfig,
},
anyhow::format_err,
assert_matches::assert_matches,
futures::{channel::mpsc::channel, future::ready, stream::StreamExt, task::Poll as FPoll},
lazy_static::lazy_static,
pull_source::UpdateAlgorithm as _,
push_source::UpdateAlgorithm as _,
std::sync::Arc,
};
/// Test retry strategy with minimal wait periods.
const TEST_RETRY_STRATEGY: RetryStrategy = RetryStrategy {
min_between_failures: zx::Duration::from_nanos(100),
max_exponent: 1,
tries_per_exponent: 1,
converge_time_between_samples: zx::Duration::from_nanos(100),
maintain_time_between_samples: zx::Duration::from_nanos(100),
};
lazy_static! {
static ref TEST_SAMPLE_1: HttpsSample = HttpsSample {
utc: zx::Time::from_nanos(111_222_333_444_555),
monotonic: zx::Time::from_nanos(666_777_888_999_000),
standard_deviation: zx::Duration::from_millis(101),
final_bound_size: zx::Duration::from_millis(20),
polls: vec![],
};
static ref TEST_SAMPLE_2: HttpsSample = HttpsSample {
utc: zx::Time::from_nanos(999_999_999_999_999),
monotonic: zx::Time::from_nanos(777_777_777_777_777),
standard_deviation: zx::Duration::from_millis(102),
final_bound_size: zx::Duration::from_millis(30),
polls: vec![Poll { round_trip_time: zx::Duration::from_millis(23) }],
};
}
fn to_fidl_time_sample(sample: &HttpsSample) -> TimeSample {
TimeSample {
utc: Some(sample.utc.into_nanos()),
monotonic: Some(sample.monotonic.into_nanos()),
standard_deviation: Some(sample.standard_deviation.into_nanos()),
..Default::default()
}
}
fn to_fidl_time_sample_arc(sample: &HttpsSample) -> Arc<TimeSample> {
Arc::new(to_fidl_time_sample(sample))
}
fn make_test_config() -> Config {
let sample_config_by_urgency = [
(Urgency::Low, SampleConfig { max_attempts: 2, num_polls: 2 }),
(Urgency::Medium, SampleConfig { max_attempts: 3, num_polls: 3 }),
(Urgency::High, SampleConfig { max_attempts: 5, num_polls: 5 }),
]
.into_iter()
.collect();
Config {
https_timeout: zx::Duration::from_seconds(10),
standard_deviation_bound_percentage: 30,
first_rtt_time_factor: 5,
use_pull_api: false,
sample_config_by_urgency,
}
}
#[fuchsia::test]
fn test_retry_strategy() {
let strategy = RetryStrategy {
min_between_failures: zx::Duration::from_seconds(1),
max_exponent: 3,
tries_per_exponent: 3,
converge_time_between_samples: zx::Duration::from_seconds(10),
maintain_time_between_samples: zx::Duration::from_seconds(10),
};
let expectation = vec![1, 1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 8, 8];
for i in 0..expectation.len() {
let expected = zx::Duration::from_seconds(expectation[i]);
let actual = strategy.backoff_duration(i as u32);
assert_eq!(
actual, expected,
"backoff after iteration {} should be {:?} but was {:?}",
i, expected, actual
);
}
}
#[fuchsia::test]
fn test_update_task_blocks_until_update_processed() {
// Tests that our update loop blocks execution when run using a channel with zero capacity
// as is done from PushSource. This verifies that each update is processed before another
// is produced.
// TODO(satsukiu) - use a generator instead and remove this test.
let mut executor = fasync::TestExecutor::new();
let (sampler, _response_complete_fut) =
FakeSampler::with_responses(vec![Ok(TEST_SAMPLE_1.clone()), Ok(TEST_SAMPLE_2.clone())]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let config = &make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
config,
);
let (sender, mut receiver) = channel(0);
let mut update_fut = update_algorithm.generate_updates(sender);
// After running to a stall, the network check complete update is available
assert!(executor.run_until_stalled(&mut update_fut).is_pending());
assert_eq!(
executor.run_until_stalled(&mut receiver.next()),
FPoll::Ready(Some(Status::Ok.into()))
);
assert!(executor.run_until_stalled(&mut receiver.next()).is_pending());
// After running to a stall again, the first update is available
assert!(executor.run_until_stalled(&mut update_fut).is_pending());
assert_eq!(
executor.run_until_stalled(&mut receiver.next()),
FPoll::Ready(Some(Status::Ok.into()))
);
assert!(executor.run_until_stalled(&mut receiver.next()).is_pending());
// Running the update task again to a stall produces a second update.
assert!(executor.run_until_stalled(&mut update_fut).is_pending());
assert_matches!(
executor.run_until_stalled(&mut receiver.next()),
FPoll::Ready(Some(Update::Sample(_)))
);
}
#[fuchsia::test]
async fn test_successful_updates() {
let expected_samples = vec![TEST_SAMPLE_1.clone(), TEST_SAMPLE_2.clone()];
let (sampler, response_complete_fut) =
FakeSampler::with_responses(expected_samples.iter().map(|sample| Ok(sample.clone())));
let diagnostics = Arc::new(FakeDiagnostics::new());
let (sender, receiver) = channel(0);
let _update_task = fasync::Task::spawn({
let diagnostics = Arc::clone(&diagnostics);
async move {
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
&config,
);
update_algorithm.generate_updates(sender).await
}
});
let updates = receiver.take_until(response_complete_fut).collect::<Vec<_>>().await;
// The first update should indicate status OK, and any subsequent status updates should
// indicate OK.
assert_eq!(updates.first().unwrap(), &Status::Ok.into());
assert!(updates
.iter()
.filter(|update| update.is_status())
.all(|update| update == &Status::Ok.into()));
let samples = updates
.iter()
.filter_map(|update| match update {
Update::Sample(s) => Some(Arc::clone(s)),
Update::Status(_) => None,
})
.collect::<Vec<_>>();
assert_eq!(
samples,
expected_samples.iter().map(to_fidl_time_sample_arc).collect::<Vec<_>>()
);
let expected_events = vec![
Event::NetworkCheckSuccessful,
Event::Phase(Phase::Initial),
Event::Success(&*TEST_SAMPLE_1),
Event::Phase(Phase::Converge),
Event::Success(&*TEST_SAMPLE_2),
];
diagnostics.assert_events(expected_events);
}
#[fuchsia::test]
async fn test_retry_until_successful() {
let injected_responses = vec![
Err(HttpsDateError::new(HttpsDateErrorType::NetworkError)),
Err(HttpsDateError::new(HttpsDateErrorType::NetworkError)),
Err(HttpsDateError::new(HttpsDateErrorType::NoCertificatesPresented)),
Ok(TEST_SAMPLE_1.clone()),
];
let (sampler, response_complete_fut) = FakeSampler::with_responses(injected_responses);
let diagnostics = Arc::new(FakeDiagnostics::new());
let (sender, receiver) = channel(0);
let _update_task = fasync::Task::spawn({
let diagnostics = Arc::clone(&diagnostics);
async move {
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
&config,
);
update_algorithm.generate_updates(sender).await
}
});
let updates = receiver.take_until(response_complete_fut).collect::<Vec<_>>().await;
// The initial OK from the network check, and each injected status should be reported.
let expected_status_updates: Vec<Update> = vec![
Status::Ok.into(),
Status::Network.into(),
Status::Protocol.into(),
Status::Ok.into(),
];
let received_status_updates =
updates.iter().filter(|updates| updates.is_status()).cloned().collect::<Vec<_>>();
assert_eq!(expected_status_updates, received_status_updates);
// Last update should be the new sample.
let last_update = updates.iter().last().unwrap();
match last_update {
Update::Sample(sample) => assert_eq!(*sample, to_fidl_time_sample_arc(&*TEST_SAMPLE_1)),
Update::Status(_) => panic!("Expected a sample but got an update"),
}
let expected_events = vec![
Event::NetworkCheckSuccessful,
Event::Phase(Phase::Initial),
Event::Failure(HttpsDateErrorType::NetworkError),
Event::Failure(HttpsDateErrorType::NetworkError),
Event::Failure(HttpsDateErrorType::NoCertificatesPresented),
Event::Success(&TEST_SAMPLE_1),
];
// depending on how futures get polled, there may or may not be an event to update the
// phase to converge.
diagnostics.assert_events_starts_with(expected_events);
}
#[fuchsia::test]
async fn test_poll_even_if_network_check_fails() {
let injected_responses = vec![Ok(TEST_SAMPLE_1.clone())];
let (sampler, response_complete_fut) = FakeSampler::with_responses(injected_responses);
let diagnostics = Arc::new(FakeDiagnostics::new());
let (sender, receiver) = channel(0);
let _update_task = fasync::Task::spawn({
let diagnostics = Arc::clone(&diagnostics);
async move {
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Err(format_err!("network check error"))),
&config,
);
update_algorithm.generate_updates(sender).await
}
});
let updates = receiver.take_until(response_complete_fut).collect::<Vec<_>>().await;
// The initial error from the network check, and OK for the sample should be reported.
let expected_status_updates: Vec<Update> = vec![Status::Network.into(), Status::Ok.into()];
let received_status_updates =
updates.iter().filter(|updates| updates.is_status()).cloned().collect::<Vec<_>>();
assert_eq!(expected_status_updates, received_status_updates);
// Last update should be the new sample.
let last_update = updates.iter().last().unwrap();
match last_update {
Update::Sample(sample) => assert_eq!(*sample, to_fidl_time_sample_arc(&*TEST_SAMPLE_1)),
Update::Status(_) => panic!("Expected a sample but got an update"),
}
// Network check event not emitted if the check fails.
diagnostics.assert_events_starts_with(vec![Event::Phase(Phase::Initial)]);
}
#[fuchsia::test]
async fn test_phases() {
let expected_num_samples = 1 /*initial*/ + CONVERGE_SAMPLES + 1 /*maintain*/;
let expected_samples = vec![TEST_SAMPLE_1.clone(); expected_num_samples];
let (sampler, response_complete_fut) =
FakeSampler::with_responses(expected_samples.iter().map(|sample| Ok(sample.clone())));
let sampler = Arc::new(sampler);
let diagnostics = Arc::new(FakeDiagnostics::new());
let (sender, receiver) = channel(0);
let _update_task = fasync::Task::spawn({
let diagnostics = Arc::clone(&diagnostics);
let sampler = Arc::clone(&sampler);
async move {
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
&config,
);
update_algorithm.generate_updates(sender).await
}
});
let updates = receiver.take_until(response_complete_fut).collect::<Vec<_>>().await;
// All status updates should indicate OK.
assert!(updates
.iter()
.filter(|update| update.is_status())
.all(|update| *update == Update::Status(Status::Ok)));
let expected_events = vec![
vec![
Event::NetworkCheckSuccessful,
Event::Phase(Phase::Initial),
Event::Success(&TEST_SAMPLE_1),
Event::Phase(Phase::Converge),
],
vec![Event::Success(&TEST_SAMPLE_1); CONVERGE_SAMPLES],
vec![Event::Phase(Phase::Maintain), Event::Success(&TEST_SAMPLE_1)],
]
.concat();
diagnostics.assert_events(expected_events);
// Samples should be requested using the number of polls appropriate for the phase.
// Samples should be requested with offset metrics only in the maintain phase.
let expected_sample_requests = vec![
vec![INITIAL_SAMPLE_POLLS],
vec![SAMPLE_POLLS; CONVERGE_SAMPLES],
vec![SAMPLE_POLLS],
]
.concat();
sampler.assert_produce_sample_requests(&expected_sample_requests).await;
}
#[fuchsia::test]
async fn test_pull_sample() {
let expected_num_samples = 1;
let expected_samples = vec![TEST_SAMPLE_1.clone(); expected_num_samples];
let (sampler, _response_complete_fut) =
FakeSampler::with_responses(expected_samples.iter().map(|sample| Ok(sample.clone())));
let diagnostics = Arc::new(FakeDiagnostics::new());
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
&config,
);
let sample = update_algorithm.sample(Urgency::Medium).await;
assert_eq!(sample.unwrap(), to_fidl_time_sample(&*TEST_SAMPLE_1));
}
#[fuchsia::test]
async fn test_pull_retries_on_error() {
let injected_responses = vec![
Err(HttpsDateError::new(HttpsDateErrorType::NetworkError)),
Err(HttpsDateError::new(HttpsDateErrorType::NoCertificatesPresented)),
Ok(TEST_SAMPLE_1.clone()),
];
let (sampler, _response_complete_fut) = FakeSampler::with_responses(injected_responses);
let diagnostics = Arc::new(FakeDiagnostics::new());
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
&config,
);
let sample = update_algorithm.sample(Urgency::Medium).await;
assert_eq!(sample.unwrap(), to_fidl_time_sample(&*TEST_SAMPLE_1));
}
#[fuchsia::test]
async fn test_pull_exhausts_num_attempts() {
let injected_responses = vec![
Err(HttpsDateError::new(HttpsDateErrorType::NetworkError)),
Err(HttpsDateError::new(HttpsDateErrorType::NetworkError)),
Err(HttpsDateError::new(HttpsDateErrorType::NoCertificatesPresented)),
Ok(TEST_SAMPLE_1.clone()),
];
let (sampler, _response_complete_fut) = FakeSampler::with_responses(injected_responses);
let diagnostics = Arc::new(FakeDiagnostics::new());
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
sampler,
ready(Ok(())),
&config,
);
let sample = update_algorithm.sample(Urgency::Medium).await;
assert_eq!(sample.unwrap_err(), pull_source::SampleError::Network);
}
#[fuchsia::test]
async fn test_pull_produce_sample_requests() {
let injected_responses =
vec![Ok(TEST_SAMPLE_1.clone()), Ok(TEST_SAMPLE_1.clone()), Ok(TEST_SAMPLE_1.clone())];
let (sampler, _response_complete_fut) = FakeSampler::with_responses(injected_responses);
let sampler = Arc::new(sampler);
let diagnostics = Arc::new(FakeDiagnostics::new());
let config = make_test_config();
let update_algorithm = HttpsDateUpdateAlgorithm::new(
TEST_RETRY_STRATEGY,
diagnostics,
Arc::clone(&sampler),
ready(Ok(())),
&config,
);
let _sample = update_algorithm.sample(Urgency::Low).await;
let _sample = update_algorithm.sample(Urgency::Medium).await;
let _sample = update_algorithm.sample(Urgency::High).await;
let sample_config_low = config.sample_config_by_urgency.get(&Urgency::Low).unwrap();
let sample_config_medium = config.sample_config_by_urgency.get(&Urgency::Medium).unwrap();
let sample_config_high = config.sample_config_by_urgency.get(&Urgency::High).unwrap();
let expected_sample_requests = vec![
vec![sample_config_low.num_polls as usize],
vec![sample_config_medium.num_polls as usize],
vec![sample_config_high.num_polls as usize],
]
.concat();
sampler.assert_produce_sample_requests(&expected_sample_requests).await;
}
}