blob: 8517f7e6884f140753d88a8f0d0f4be47c852afc [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::fetcher::{FetchCommand, Fetcher};
use fuchsia_async::{self as fasync, TaskGroup};
use fuchsia_sync::Mutex;
use fuchsia_zircon as zx;
use persistence_config::{Config, ServiceName, Tag, TagConfig};
use std::{collections::HashMap, sync::Arc};
// This contains the logic to decide which tags to fetch at what times. It contains the state of
// each tag (when last fetched, whether currently queued). When a request arrives via FIDL, it's
// sent here and results in requests queued to the Fetcher.
#[derive(Clone)]
pub(crate) struct Scheduler {
// This is a global lock. Scheduler only does schedule() which is synchronous and quick.
state: Arc<Mutex<State>>,
}
struct State {
fetcher: Fetcher,
services: HashMap<ServiceName, HashMap<Tag, TagState>>,
tasks: TaskGroup,
}
struct TagState {
backoff: zx::Duration,
state: FetchState,
last_fetched: zx::Time,
}
impl Scheduler {
pub(crate) fn new(fetcher: Fetcher, config: &Config) -> Self {
let mut services = HashMap::new();
for (service, tags) in config {
let mut tag_states = HashMap::new();
for (tag, tag_config) in tags {
let TagConfig { min_seconds_between_fetch, .. } = tag_config;
let backoff = zx::Duration::from_seconds(*min_seconds_between_fetch);
let tag_state = TagState {
backoff,
state: FetchState::Idle,
last_fetched: zx::Time::INFINITE_PAST,
};
tag_states.insert(tag.clone(), tag_state);
}
services.insert(service.clone(), tag_states);
}
let state = State { fetcher, services, tasks: TaskGroup::new() };
Scheduler { state: Arc::new(Mutex::new(state)) }
}
/// Gets a service name and a list of valid tags, and queues any fetches that are not already
/// pending. Updates the last-fetched time on any tag it queues, setting it equal to the later
/// of the current time and the time the fetch becomes possible.
pub(crate) fn schedule(&self, service: &ServiceName, tags: Vec<Tag>) {
// Every tag we process should use the same Now
let now = zx::Time::get_monotonic();
let mut state = self.state.lock();
let Some(service_info) = state.services.get_mut(service) else {
return;
};
let mut now_tags = vec![];
let mut later_tags: Vec<(zx::Time, Tag)> = vec![];
for tag in tags {
let Some(tag_state) = service_info.get_mut(&tag) else {
return;
};
if matches!(tag_state.state, FetchState::Pending) {
continue;
}
if tag_state.last_fetched + tag_state.backoff <= now {
now_tags.push(tag);
tag_state.last_fetched = now;
} else {
let next_fetch = tag_state.last_fetched + tag_state.backoff;
tag_state.last_fetched = next_fetch;
tag_state.state = FetchState::Pending;
later_tags.push((next_fetch, tag));
}
}
if !now_tags.is_empty() {
let _ = state.fetcher.send(FetchCommand { service: service.clone(), tags: now_tags });
}
// later_tags may not all be fetchable at the same time. Batch the ones that are.
later_tags.sort_by(|a, b| a.0.cmp(&b.0));
while !later_tags.is_empty() {
// This is N^2 but N will be too small to matter.
let first_time = later_tags[0].0;
let mut first_tags = vec![];
let mut remaining_tags = vec![];
for (next_fetch, tag) in later_tags {
if next_fetch == first_time {
first_tags.push(tag);
} else {
remaining_tags.push((next_fetch, tag));
}
}
later_tags = remaining_tags;
self.enqueue(
&mut state,
first_time,
FetchCommand { service: service.clone(), tags: first_tags },
);
}
}
fn enqueue(&self, state: &mut State, time: zx::Time, command: FetchCommand) {
let this = self.clone();
let mut fetcher = state.fetcher.clone();
state.tasks.spawn(async move {
fasync::Timer::new(time).await;
{
let mut state = this.state.lock();
let Some(tag_states) = state.services.get_mut(&command.service) else {
return;
};
for tag in command.tags.iter() {
tag_states.get_mut(tag).unwrap().state = FetchState::Idle;
}
}
let _ = fetcher.send(command);
});
}
}
/// FetchState tells whether a tag is currently waiting to be dispatched or not. If it is, then
/// another request to fetch that tag should cause no change. If it's not waiting, then it can
/// either be fetched immediately (in which case its state stays Idle, but the last-fetched time
/// will be updated to Now) or it will be queued (in which case its state is Pending and its
/// last-fetched time will be set forward to the time it's going to be fetched).
enum FetchState {
Pending,
Idle,
}