| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! This module contains the core algorithm for `WorkScheduler`, a component manager subsytem for |
| //! dispatching batches of work. |
| //! |
| //! The subsystem's interface consists of three FIDL prototocols:: |
| //! |
| //! * `fuchsia.sys2.WorkScheduler`: A framework service for scheduling and canceling work. |
| //! * `fuchsia.sys2.Worker`: A service that `WorkScheduler` clients expose to the framework to be |
| //! notified when work units are dispatched. |
| //! * `fuchsia.sys2.WorkSchedulerControl`: A built-in service for controlling the period between |
| //! wakeup, batch, and dispatch cycles. |
| |
| use { |
| crate::{ |
| framework::FrameworkCapability, |
| model::{error::ModelError, hooks::*, AbsoluteMoniker, Realm}, |
| work_scheduler::work_item::WorkItem, |
| }, |
| cm_rust::{CapabilityPath, ExposeDecl, ExposeTarget, FrameworkCapabilityDecl}, |
| failure::{format_err, Error}, |
| fidl::endpoints::ServerEnd, |
| fidl_fuchsia_sys2 as fsys, |
| fuchsia_async::{self as fasync, Time, Timer}, |
| fuchsia_zircon as zx, |
| futures::{ |
| future::{AbortHandle, Abortable, BoxFuture}, |
| lock::Mutex, |
| TryStreamExt, |
| }, |
| lazy_static::lazy_static, |
| log::warn, |
| std::{convert::TryInto, sync::Arc}, |
| }; |
| |
| lazy_static! { |
| pub static ref WORKER_CAPABILITY_PATH: CapabilityPath = |
| "/svc/fuchsia.sys2.Worker".try_into().unwrap(); |
| pub static ref WORK_SCHEDULER_CAPABILITY_PATH: CapabilityPath = |
| "/svc/fuchsia.sys2.WorkScheduler".try_into().unwrap(); |
| pub static ref WORK_SCHEDULER_CONTROL_CAPABILITY_PATH: CapabilityPath = |
| "/svc/fuchsia.sys2.WorkSchedulerControl".try_into().unwrap(); |
| pub static ref ROOT_WORK_SCHEDULER: Arc<Mutex<WorkScheduler>> = |
| Arc::new(Mutex::new(WorkScheduler::new())); |
| } |
| |
| /// A self-managed timer instantiated by `WorkScheduler` to implement the "wakeup" part of its |
| /// wakeup, batch, and dispatch cycles. |
| struct WorkSchedulerTimer { |
| /// Next absolute monotonic time when a timeout should be triggered to wakeup, batch, and |
| /// dispatch work. |
| next_timeout_monotonic: i64, |
| /// The handle used to abort the next wakeup, batch, and dispatch cycle if it needs to be |
| /// replaced by a different timer to be woken up at a different time. |
| abort_handle: AbortHandle, |
| } |
| |
| impl WorkSchedulerTimer { |
| /// Construct a new timer that will fire at monotonic time `next_timeout_monotonic`. When the |
| /// the timer fires, if it was not aborted, it will invoke `work_scheduler.dispatch_work()`. |
| fn new(next_timeout_monotonic: i64, work_scheduler: WorkScheduler) -> Self { |
| let (abort_handle, abort_registration) = AbortHandle::new_pair(); |
| |
| let future = Abortable::new( |
| Timer::new(Time::from_nanos(next_timeout_monotonic)), |
| abort_registration, |
| ); |
| fasync::spawn(async move { |
| // Dispatch work only when abortable was not aborted. |
| if future.await.is_ok() { |
| work_scheduler.dispatch_work().await; |
| } |
| }); |
| |
| WorkSchedulerTimer { next_timeout_monotonic, abort_handle } |
| } |
| } |
| |
| /// Automatically cancel a timer that is dropped by `WorkScheduler`. This allows `WorkScheduler` to |
| /// use patterns like: |
| /// |
| /// WorkScheduler.timer = Some(WorkSchedulerTimer::new(deadline, self.clone())) |
| /// |
| /// and expect any timer previously stored in `WorkScheduler.timer` to be aborted as a part of the |
| /// operation. |
| impl Drop for WorkSchedulerTimer { |
| fn drop(&mut self) { |
| self.abort_handle.abort(); |
| } |
| } |
| |
| /// State maintained by a `WorkScheduler`, kept consistent via a single `Mutex`. |
| struct WorkSchedulerState { |
| /// Scheduled work items that have not been dispatched. |
| work_items: Vec<WorkItem>, |
| /// Period between wakeup, batch, dispatch cycles. Set to `None` when dispatching work is |
| /// disabled. |
| batch_period: Option<i64>, |
| /// Current timer for next wakeup, batch, dispatch cycle, if any. |
| timer: Option<WorkSchedulerTimer>, |
| } |
| |
| impl WorkSchedulerState { |
| pub fn new() -> Self { |
| WorkSchedulerState { work_items: Vec::new(), batch_period: None, timer: None } |
| } |
| |
| fn set_timer(&mut self, next_monotonic_deadline: i64, work_scheduler: WorkScheduler) { |
| self.timer = Some(WorkSchedulerTimer::new(next_monotonic_deadline, work_scheduler)); |
| } |
| } |
| |
| /// Provides a common facility for scheduling canceling work. Each component instance manages its |
| /// work items in isolation from each other, but the `WorkScheduler` maintains a collection of all |
| /// items to make global scheduling decisions. |
| #[derive(Clone)] |
| pub struct WorkScheduler { |
| inner: Arc<WorkSchedulerInner>, |
| } |
| |
| struct WorkSchedulerInner { |
| state: Mutex<WorkSchedulerState>, |
| } |
| |
| impl WorkSchedulerInner { |
| pub fn new() -> Self { |
| Self { state: Mutex::new(WorkSchedulerState::new()) } |
| } |
| |
| async fn on_route_capability_async<'a>( |
| self: Arc<Self>, |
| realm: Arc<Realm>, |
| capability_decl: &'a FrameworkCapabilityDecl, |
| capability: Option<Box<dyn FrameworkCapability>>, |
| ) -> Result<Option<Box<dyn FrameworkCapability>>, ModelError> { |
| match capability_decl { |
| FrameworkCapabilityDecl::LegacyService(capability_path) |
| if *capability_path == *WORK_SCHEDULER_CAPABILITY_PATH => |
| { |
| Self::check_for_worker(&*realm).await?; |
| Ok(Some(Box::new(WorkSchedulerCapability::new( |
| realm.abs_moniker.clone(), |
| WorkScheduler { inner: self.clone() }, |
| )) as Box<dyn FrameworkCapability>)) |
| } |
| _ => Ok(capability), |
| } |
| } |
| |
| async fn check_for_worker(realm: &Realm) -> Result<(), ModelError> { |
| let realm_state = realm.lock_state().await; |
| let realm_state = realm_state.as_ref().expect("check_for_worker: not resolved"); |
| let decl = realm_state.decl(); |
| decl.exposes |
| .iter() |
| .find(|&expose| match expose { |
| ExposeDecl::LegacyService(ls) => ls.target_path == *WORKER_CAPABILITY_PATH, |
| _ => false, |
| }) |
| .map_or_else( |
| || { |
| Err(ModelError::capability_discovery_error(format_err!( |
| "component uses WorkScheduler without exposing Worker: {}", |
| realm.abs_moniker |
| ))) |
| }, |
| |expose| match expose { |
| ExposeDecl::LegacyService(ls) => match ls.target { |
| ExposeTarget::Framework => Ok(()), |
| _ => Err(ModelError::capability_discovery_error(format_err!( |
| "component exposes Worker, but not as legacy service to framework: {}", |
| realm.abs_moniker |
| ))), |
| }, |
| _ => Err(ModelError::capability_discovery_error(format_err!( |
| "component exposes Worker, but not as legacy service to framework: {}", |
| realm.abs_moniker |
| ))), |
| }, |
| ) |
| } |
| } |
| |
| impl WorkScheduler { |
| pub fn new() -> Self { |
| Self { inner: Arc::new(WorkSchedulerInner::new()) } |
| } |
| |
| pub fn hooks(&self) -> Vec<HookRegistration> { |
| vec![HookRegistration { |
| event_type: EventType::RouteFrameworkCapability, |
| callback: self.inner.clone(), |
| }] |
| } |
| |
| pub async fn schedule_work( |
| &self, |
| abs_moniker: &AbsoluteMoniker, |
| work_id: &str, |
| work_request: &fsys::WorkRequest, |
| ) -> Result<(), fsys::Error> { |
| let mut state = self.inner.state.lock().await; |
| let work_items = &mut state.work_items; |
| let work_item = WorkItem::try_new(abs_moniker, work_id, work_request)?; |
| |
| if work_items.contains(&work_item) { |
| return Err(fsys::Error::InstanceAlreadyExists); |
| } |
| |
| work_items.push(work_item); |
| work_items.sort_by(WorkItem::deadline_order); |
| |
| self.update_timeout(&mut *state); |
| |
| Ok(()) |
| } |
| |
| pub async fn cancel_work( |
| &self, |
| abs_moniker: &AbsoluteMoniker, |
| work_id: &str, |
| ) -> Result<(), fsys::Error> { |
| let mut state = self.inner.state.lock().await; |
| let work_items = &mut state.work_items; |
| let work_item = WorkItem::new_by_identity(abs_moniker, work_id); |
| |
| // TODO(markdittmer): Use `work_items.remove_item(work_item)` if/when it becomes stable. |
| let mut found = false; |
| work_items.retain(|item| { |
| let matches = &work_item == item; |
| found = found || matches; |
| !matches |
| }); |
| |
| if !found { |
| return Err(fsys::Error::InstanceNotFound); |
| } |
| |
| self.update_timeout(&mut *state); |
| |
| Ok(()) |
| } |
| |
| pub async fn get_batch_period(&self) -> Result<i64, fsys::Error> { |
| let state = self.inner.state.lock().await; |
| match state.batch_period { |
| Some(batch_period) => Ok(batch_period), |
| // TODO(markdittmer): GetBatchPeriod Ok case should probably return Option<i64> to |
| // more directly reflect "dispatching work disabled". |
| None => Ok(std::i64::MAX), |
| } |
| } |
| |
| pub async fn set_batch_period(&self, batch_period: i64) -> Result<(), fsys::Error> { |
| if batch_period <= 0 { |
| return Err(fsys::Error::InvalidArguments); |
| } |
| |
| let mut state = self.inner.state.lock().await; |
| if batch_period != std::i64::MAX { |
| state.batch_period = Some(batch_period); |
| } else { |
| // TODO(markdittmer): SetBatchPeriod should probably accept Option<i64> to more directly |
| // reflect "dispatching work disabled". |
| state.batch_period = None; |
| } |
| |
| self.update_timeout(&mut *state); |
| |
| Ok(()) |
| } |
| |
| pub async fn serve_root_work_scheduler_control( |
| mut stream: fsys::WorkSchedulerControlRequestStream, |
| ) -> Result<(), Error> { |
| while let Some(request) = stream.try_next().await? { |
| match request { |
| fsys::WorkSchedulerControlRequest::GetBatchPeriod { responder, .. } => { |
| let root_work_scheduler = ROOT_WORK_SCHEDULER.lock().await; |
| let mut result = root_work_scheduler.get_batch_period().await; |
| responder.send(&mut result)?; |
| } |
| fsys::WorkSchedulerControlRequest::SetBatchPeriod { |
| responder, |
| batch_period, |
| .. |
| } => { |
| let root_work_scheduler = ROOT_WORK_SCHEDULER.lock().await; |
| let mut result = root_work_scheduler.set_batch_period(batch_period).await; |
| responder.send(&mut result)?; |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// Dispatch expired `work_items`. In the one-shot case expired items are dispatched and dropped |
| /// from `work_items`. In the periodic case expired items are retained and given a new deadline. |
| /// New deadlines must meet all of the following criteria: |
| /// |
| /// now < new_deadline |
| /// and |
| /// now + period <= new_deadline |
| /// and |
| /// new_deadline = first_deadline + n * period |
| /// |
| /// Example: |
| /// |
| /// F = First expected dispatch time for work item |
| /// C = Current expected dispatch time for work item |
| /// N = Now |
| /// * = New expected dispatch time for work item |
| /// | = Period marker for work item (that isn't otherwise labeled) |
| /// |
| /// Period markers only: ...------|----|----|----|----|----|----|----|----|... |
| /// Fully annotated timeline: ...------F----|----C----|----|----|-N--*----|----|... |
| /// |
| /// Example of edge case: |
| /// |
| /// Now lands exactly on a period marker. |
| /// |
| /// Period markers only: ...------|----|----|----|----|----|----|----|----|... |
| /// Fully annotated timeline: ...------F----|----C----|----|----N----*----|----|... |
| /// |
| /// Example of edge case: |
| /// |
| /// Period markers only: ...------||||||||||||||||||||... |
| /// Fully annotated timeline: ...------F||C||||||||N*||||||... |
| /// |
| /// Example of edge case: |
| /// |
| /// N=C. Denote M = N=C. |
| /// |
| /// Period markers only: ...------|----|----|----|----|----|... |
| /// Fully annotated timeline: ...------F----|----M----*----|----|... |
| /// |
| /// Note that updating `WorkItem` deadlines is _independent_ of updating `WorkScheduler` batch |
| /// period. When either `work_items` (and their deadlines) change or `batch_period` changes, the |
| /// next wakeup timeout is re-evaluated, but this involves updating _only_ the wakeup timeout, |
| /// not any `WorkItem` deadlines. |
| async fn dispatch_work(&self) { |
| let mut state = self.inner.state.lock().await; |
| let now = Time::now().into_nanos(); |
| let work_items = &mut state.work_items; |
| |
| work_items.retain(|item| { |
| // Retain future work items. |
| if item.next_deadline_monotonic > now { |
| return true; |
| } |
| |
| // TODO(markdittmer): Dispatch work item. |
| |
| // Only dispatched/past items to retain: periodic items that will recur. |
| item.period.is_some() |
| }); |
| |
| // Update deadlines on dispatched periodic items. |
| for mut item in work_items.iter_mut() { |
| // Stop processing items once we reach future items. |
| if item.next_deadline_monotonic > now { |
| break; |
| } |
| |
| // All retained dispatched/past items have a period (hence, safe to unwrap()). |
| let period = item.period.unwrap(); |
| item.next_deadline_monotonic += if now < item.next_deadline_monotonic + period { |
| // Normal case: next deadline after adding one period is in the future. |
| period |
| } else { |
| // Skip deadlines in the past by advancing `next_deadline_monotonic` to the first |
| // multiple of `period` after now |
| period * (((now - item.next_deadline_monotonic) / period) + 1) |
| }; |
| } |
| |
| work_items.sort_by(WorkItem::deadline_order); |
| |
| self.update_timeout(&mut *state); |
| } |
| |
| /// Update the timeout for the next wakeup, batch, and dispatch cycle, if necessary. The timeout |
| /// should be disabled if either there are no `work_items` or there is no `batch_period`. |
| /// Otherwise, a suitable timeout may already be set. A suitable timeout is one that satisfies: |
| /// |
| /// timeout > work_deadline |
| /// and |
| /// timeout - batch_period < work_deadline |
| /// where |
| /// work_deadline is the earliest expected dispatch time of all `work_items` |
| /// |
| /// That is, a suitable timeout will trigger after there is something to schedule, but before a |
| /// full `batch_period` has elapsed since the next schedulable `WorkItem` hit its deadline. |
| /// |
| /// If the current timeout is not suitable, then the timeout is updated to the unique suitable |
| /// timeout rounded to the nearest `batch_deadline` (in absolute monotonic time): |
| /// |
| /// timeout > work_deadline |
| /// and |
| /// timeout - batch_period < work_deadline |
| /// and |
| /// (timeout % batch_period) == 0 |
| /// where |
| /// work_deadline is the earliest expected dispatch time of all `work_items` |
| /// |
| /// This scheme avoids updating the timeout whenever possible, while maintaining that all |
| /// scheduled `WorkItem` objects will be dispatched no later than |
| /// `WorkItem.next_deadline_monotonic + WorkScheduler.batch_period`. |
| fn update_timeout(&self, state: &mut WorkSchedulerState) { |
| if state.work_items.is_empty() || state.batch_period.is_none() { |
| // No work to schedule. Abort any existing timer to wakeup and dispatch work. |
| state.timer = None; |
| return; |
| } |
| let work_deadline = state.work_items[0].next_deadline_monotonic; |
| let batch_period = state.batch_period.unwrap(); |
| |
| if let Some(timer) = &state.timer { |
| let timeout = timer.next_timeout_monotonic; |
| if timeout > work_deadline && timeout - batch_period < work_deadline { |
| // There is an active timeout that will fire after the next deadline but before a |
| // full batch period has elapsed after the deadline. Timer needs no update. |
| return; |
| } |
| } |
| |
| // Define a deadline, an absolute monotonic time, as the soonest time after `work_deadline` |
| // that is aligned with `batch_period`. |
| let new_deadline = work_deadline - (work_deadline % batch_period) + batch_period; |
| state.set_timer(new_deadline, self.clone()); |
| } |
| } |
| |
| impl Hook for WorkSchedulerInner { |
| fn on<'a>(self: Arc<Self>, event: &'a Event) -> BoxFuture<'a, Result<(), ModelError>> { |
| Box::pin(async move { |
| if let Event::RouteFrameworkCapability { realm, capability_decl, capability } = event { |
| let mut capability = capability.lock().await; |
| *capability = self |
| .on_route_capability_async(realm.clone(), capability_decl, capability.take()) |
| .await?; |
| } |
| Ok(()) |
| }) |
| } |
| } |
| |
| /// `Capability` to invoke `WorkScheduler` FIDL API bound to a particular `WorkScheduler` object and |
| /// component instance's `AbsoluteMoniker`. All FIDL operations bound to the same object and moniker |
| /// observe the same collection of `WorkItem` objects. |
| struct WorkSchedulerCapability { |
| abs_moniker: AbsoluteMoniker, |
| work_scheduler: WorkScheduler, |
| } |
| |
| impl WorkSchedulerCapability { |
| pub fn new(abs_moniker: AbsoluteMoniker, work_scheduler: WorkScheduler) -> Self { |
| WorkSchedulerCapability { abs_moniker, work_scheduler } |
| } |
| |
| /// Service `open` invocation via an event loop that dispatches FIDL operations to |
| /// `work_scheduler`. |
| async fn open_async( |
| work_scheduler: WorkScheduler, |
| abs_moniker: &AbsoluteMoniker, |
| mut stream: fsys::WorkSchedulerRequestStream, |
| ) -> Result<(), Error> { |
| while let Some(request) = stream.try_next().await? { |
| match request { |
| fsys::WorkSchedulerRequest::ScheduleWork { |
| responder, |
| work_id, |
| work_request, |
| .. |
| } => { |
| let mut result = |
| work_scheduler.schedule_work(abs_moniker, &work_id, &work_request).await; |
| responder.send(&mut result)?; |
| } |
| fsys::WorkSchedulerRequest::CancelWork { responder, work_id, .. } => { |
| let mut result = work_scheduler.cancel_work(abs_moniker, &work_id).await; |
| responder.send(&mut result)?; |
| } |
| } |
| } |
| Ok(()) |
| } |
| } |
| |
| impl FrameworkCapability for WorkSchedulerCapability { |
| /// Spawn an event loop to service `WorkScheduler` FIDL operations. |
| fn open( |
| &self, |
| _flags: u32, |
| _open_mode: u32, |
| _relative_path: String, |
| server_end: zx::Channel, |
| ) -> BoxFuture<Result<(), ModelError>> { |
| let server_end = ServerEnd::<fsys::WorkSchedulerMarker>::new(server_end); |
| let stream: fsys::WorkSchedulerRequestStream = server_end.into_stream().unwrap(); |
| let work_scheduler = self.work_scheduler.clone(); |
| let abs_moniker = self.abs_moniker.clone(); |
| fasync::spawn(async move { |
| let result = Self::open_async(work_scheduler, &abs_moniker, stream).await; |
| if let Err(e) = result { |
| // TODO(markdittmer): Set an epitaph to indicate this was an unexpected error. |
| warn!("WorkSchedulerCapability.open failed: {}", e); |
| } |
| }); |
| |
| Box::pin(async { Ok(()) }) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| crate::model::{AbsoluteMoniker, ChildMoniker}, |
| fuchsia_async::{Executor, Time, WaitState}, |
| futures::Future, |
| }; |
| |
| /// Time is measured in nanoseconds. This provides a constant symbol for one second. |
| const SECOND: i64 = 1000000000; |
| |
| // Use arbitrary start monolithic time. This will surface bugs that, for example, are not |
| // apparent when "time starts at 0". |
| const FAKE_MONOTONIC_TIME: i64 = 374789234875; |
| |
| async fn get_work_status( |
| work_scheduler: &WorkScheduler, |
| abs_moniker: &AbsoluteMoniker, |
| work_id: &str, |
| ) -> Result<(i64, Option<i64>), fsys::Error> { |
| let state = work_scheduler.inner.state.lock().await; |
| let work_items = &state.work_items; |
| match work_items |
| .iter() |
| .find(|work_item| &work_item.abs_moniker == abs_moniker && work_item.id == work_id) |
| { |
| Some(work_item) => Ok((work_item.next_deadline_monotonic, work_item.period)), |
| None => Err(fsys::Error::InstanceNotFound), |
| } |
| } |
| |
| async fn get_all_by_deadline(work_scheduler: &WorkScheduler) -> Vec<WorkItem> { |
| let state = work_scheduler.inner.state.lock().await; |
| state.work_items.clone() |
| } |
| |
| fn child(parent: &AbsoluteMoniker, name: &str) -> AbsoluteMoniker { |
| parent.child(ChildMoniker::new(name.to_string(), None, 0)) |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn work_scheduler_basic() { |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| let a = child(&root, "a"); |
| let b = child(&a, "b"); |
| let c = child(&b, "c"); |
| |
| let now_once = fsys::WorkRequest { |
| start: Some(fsys::Start::MonotonicTime(FAKE_MONOTONIC_TIME)), |
| period: None, |
| }; |
| let each_second = fsys::WorkRequest { |
| start: Some(fsys::Start::MonotonicTime(FAKE_MONOTONIC_TIME + SECOND)), |
| period: Some(SECOND), |
| }; |
| let in_an_hour = fsys::WorkRequest { |
| start: Some(fsys::Start::MonotonicTime(FAKE_MONOTONIC_TIME + (SECOND * 60 * 60))), |
| period: None, |
| }; |
| |
| // Schedule different 2 out of 3 requests on each component instance. |
| |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&a, "NOW_ONCE", &now_once).await); |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&a, "EACH_SECOND", &each_second).await); |
| |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&b, "EACH_SECOND", &each_second).await); |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&b, "IN_AN_HOUR", &in_an_hour).await); |
| |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&c, "IN_AN_HOUR", &in_an_hour).await); |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&c, "NOW_ONCE", &now_once).await); |
| |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME, None)), |
| get_work_status(&work_scheduler, &a, "NOW_ONCE").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + SECOND, Some(SECOND))), |
| get_work_status(&work_scheduler, &a, "EACH_SECOND").await |
| ); |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &a, "IN_AN_HOUR").await |
| ); |
| |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &b, "NOW_ONCE").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + SECOND, Some(SECOND))), |
| get_work_status(&work_scheduler, &b, "EACH_SECOND").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + (SECOND * 60 * 60), None)), |
| get_work_status(&work_scheduler, &b, "IN_AN_HOUR").await |
| ); |
| |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME, None)), |
| get_work_status(&work_scheduler, &c, "NOW_ONCE").await |
| ); |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &c, "EACH_SECOND").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + (SECOND * 60 * 60), None)), |
| get_work_status(&work_scheduler, &c, "IN_AN_HOUR").await |
| ); |
| |
| // Cancel a's NOW_ONCE. Confirm it only affects a's scheduled work. |
| |
| assert_eq!(Ok(()), work_scheduler.cancel_work(&a, "NOW_ONCE").await); |
| |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &a, "NOW_ONCE").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + SECOND, Some(SECOND))), |
| get_work_status(&work_scheduler, &a, "EACH_SECOND").await |
| ); |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &a, "IN_AN_HOUR").await |
| ); |
| |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &b, "NOW_ONCE").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + SECOND, Some(SECOND))), |
| get_work_status(&work_scheduler, &b, "EACH_SECOND").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + (SECOND * 60 * 60), None)), |
| get_work_status(&work_scheduler, &b, "IN_AN_HOUR").await |
| ); |
| |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME, None)), |
| get_work_status(&work_scheduler, &c, "NOW_ONCE").await |
| ); |
| assert_eq!( |
| Err(fsys::Error::InstanceNotFound), |
| get_work_status(&work_scheduler, &c, "EACH_SECOND").await |
| ); |
| assert_eq!( |
| Ok((FAKE_MONOTONIC_TIME + (SECOND * 60 * 60), None)), |
| get_work_status(&work_scheduler, &c, "IN_AN_HOUR").await |
| ); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn work_scheduler_deadline_order() { |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| let a = child(&root, "a"); |
| let b = child(&a, "b"); |
| let c = child(&b, "c"); |
| |
| let now_once = fsys::WorkRequest { |
| start: Some(fsys::Start::MonotonicTime(FAKE_MONOTONIC_TIME)), |
| period: None, |
| }; |
| let each_second = fsys::WorkRequest { |
| start: Some(fsys::Start::MonotonicTime(FAKE_MONOTONIC_TIME + SECOND)), |
| period: Some(SECOND), |
| }; |
| let in_an_hour = fsys::WorkRequest { |
| start: Some(fsys::Start::MonotonicTime(FAKE_MONOTONIC_TIME + (SECOND * 60 * 60))), |
| period: None, |
| }; |
| |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&a, "EACH_SECOND", &each_second).await); |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&c, "NOW_ONCE", &now_once).await); |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&b, "IN_AN_HOUR", &in_an_hour).await); |
| |
| // Order should match deadlines, not order of scheduling or component topology. |
| assert_eq!( |
| vec![ |
| WorkItem::new(&c, "NOW_ONCE", FAKE_MONOTONIC_TIME, None), |
| WorkItem::new(&a, "EACH_SECOND", FAKE_MONOTONIC_TIME + SECOND, Some(SECOND),), |
| WorkItem::new(&b, "IN_AN_HOUR", FAKE_MONOTONIC_TIME + (SECOND * 60 * 60), None,), |
| ], |
| get_all_by_deadline(&work_scheduler).await |
| ); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn work_scheduler_batch_period() { |
| let work_scheduler = WorkScheduler::new(); |
| assert_eq!(Ok(std::i64::MAX), work_scheduler.get_batch_period().await); |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(SECOND).await); |
| assert_eq!(Ok(SECOND), work_scheduler.get_batch_period().await) |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn work_scheduler_batch_period_error() { |
| let work_scheduler = WorkScheduler::new(); |
| assert_eq!(Err(fsys::Error::InvalidArguments), work_scheduler.set_batch_period(0).await); |
| assert_eq!(Err(fsys::Error::InvalidArguments), work_scheduler.set_batch_period(-1).await) |
| } |
| |
| struct TestWorkUnit { |
| start: i64, |
| work_item: WorkItem, |
| } |
| |
| impl TestWorkUnit { |
| fn new( |
| start: i64, |
| abs_moniker: &AbsoluteMoniker, |
| id: &str, |
| next_deadline_monotonic: i64, |
| period: Option<i64>, |
| ) -> Self { |
| TestWorkUnit { |
| start, |
| work_item: WorkItem::new(abs_moniker, id, next_deadline_monotonic, period), |
| } |
| } |
| } |
| |
| struct TimeTest { |
| executor: Executor, |
| } |
| |
| impl TimeTest { |
| fn new() -> Self { |
| let executor = Executor::new_with_fake_time().unwrap(); |
| executor.set_fake_time(Time::from_nanos(0)); |
| TimeTest { executor } |
| } |
| |
| fn set_time(&mut self, time: i64) { |
| self.executor.set_fake_time(Time::from_nanos(time)); |
| } |
| |
| fn run_and_sync<F>(&mut self, fut: &mut F) |
| where |
| F: Future + Unpin, |
| { |
| assert!(self.executor.run_until_stalled(fut).is_ready()); |
| while self.executor.is_waiting() == WaitState::Ready { |
| assert!(self.executor.run_until_stalled(&mut Box::pin(async {})).is_ready()); |
| } |
| } |
| |
| fn set_time_and_run_timers(&mut self, time: i64) { |
| self.set_time(time); |
| assert!(self.executor.wake_expired_timers()); |
| while self.executor.is_waiting() == WaitState::Ready { |
| assert!(self.executor.run_until_stalled(&mut Box::pin(async {})).is_ready()); |
| } |
| } |
| |
| fn assert_no_timers(&mut self) { |
| assert_eq!(None, self.executor.wake_next_timer()); |
| } |
| |
| fn assert_next_timer_at(&mut self, time: i64) { |
| assert_eq!(WaitState::Waiting(Time::from_nanos(time)), self.executor.is_waiting()); |
| } |
| |
| fn assert_work_items( |
| &mut self, |
| work_scheduler: &WorkScheduler, |
| test_work_units: Vec<TestWorkUnit>, |
| ) { |
| self.run_and_sync(&mut Box::pin(async { |
| // Check collection of work items. |
| let work_items: Vec<WorkItem> = test_work_units |
| .iter() |
| .map(|test_work_unit| test_work_unit.work_item.clone()) |
| .collect(); |
| assert_eq!(work_items, get_all_by_deadline(&work_scheduler).await); |
| |
| // Check invariants on relationships between `now` and `WorkItem` state. |
| let now = Time::now().into_nanos(); |
| for test_work_unit in test_work_units.iter() { |
| let work_item = &test_work_unit.work_item; |
| let deadline = work_item.next_deadline_monotonic; |
| println!( |
| "Now: {}\nDeadline: {}\nStart: {}", |
| now, deadline, test_work_unit.start |
| ); |
| // Either: |
| // 1. This is a check for initial state, in which case allow now=deadline=0, or |
| // 2. All deadlines should be in the future. |
| assert!( |
| (now == 0 && deadline == now) || now < deadline, |
| "Expected either |
| 1. This is a check for initial state, so allow now=deadline=0, or |
| 2. All deadlines should be in the future." |
| ); |
| if let Some(period) = work_item.period { |
| println!("Period: {}", period); |
| // All periodic deadlines should be either: |
| // 1. Waiting to be dispatched for the first time, or |
| // 2. At most one period into the future (for otherwise, a period would be |
| // skipped). |
| assert!( |
| now < test_work_unit.start || now + period >= deadline, |
| "Expected all periodic deadlines should be either: |
| 1. Waiting to be dispatched for the first time, or |
| 2. At most one period into the future (for otherwise, a period would |
| be skipped" |
| ); |
| // All periodic deadlines should be aligned to: |
| // `deadline = start + n*period` for some non-negative integer, `n`. |
| assert_eq!( |
| 0, |
| (deadline - test_work_unit.start) % period, |
| "Expected all periodic deadlines should be aligned to: |
| `deadline = start + n*period` for some non-negative integer, `n`." |
| ); |
| } |
| } |
| })); |
| } |
| |
| fn assert_no_work(&mut self, work_scheduler: &WorkScheduler) { |
| self.run_and_sync(&mut Box::pin(async { |
| assert_eq!(vec![] as Vec<WorkItem>, get_all_by_deadline(&work_scheduler).await); |
| })); |
| } |
| } |
| |
| #[test] |
| fn work_scheduler_time_get_batch_period_queues_nothing() { |
| let mut t = TimeTest::new(); |
| t.run_and_sync(&mut Box::pin(async { |
| let work_scheduler = WorkScheduler::new(); |
| assert_eq!(Ok(std::i64::MAX), work_scheduler.get_batch_period().await); |
| })); |
| t.assert_no_timers(); |
| } |
| |
| #[test] |
| fn work_scheduler_time_set_batch_period_no_work_queues_nothing() { |
| let mut t = TimeTest::new(); |
| t.run_and_sync(&mut Box::pin(async { |
| let work_scheduler = WorkScheduler::new(); |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(1).await); |
| })); |
| t.assert_no_timers(); |
| } |
| |
| #[test] |
| fn work_scheduler_time_schedule_inf_batch_period_queues_nothing() { |
| let mut t = TimeTest::new(); |
| t.run_and_sync(&mut Box::pin(async { |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| let now_once = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(0)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "NOW_ONCE", &now_once).await); |
| })); |
| t.assert_no_timers(); |
| } |
| |
| #[test] |
| fn work_scheduler_time_schedule_finite_batch_period_queues_and_dispatches() { |
| let mut t = TimeTest::new(); |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| |
| // Set batch period and queue a unit of work. |
| t.run_and_sync(&mut Box::pin(async { |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(1).await); |
| let now_once = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(0)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "NOW_ONCE", &now_once).await); |
| })); |
| |
| // Confirm timer and work item. |
| t.assert_next_timer_at(1); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![TestWorkUnit::new(0, &root, "NOW_ONCE", 0, None)], |
| ); |
| |
| // Run work stemming from timer and confirm no more work items. |
| t.set_time_and_run_timers(1); |
| t.assert_no_work(&work_scheduler); |
| } |
| |
| #[test] |
| fn work_scheduler_time_periodic_stays_queued() { |
| let mut t = TimeTest::new(); |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| |
| // Set batch period and queue a unit of work. |
| t.run_and_sync(&mut Box::pin(async { |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(1).await); |
| let every_moment = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(0)), period: Some(1) }; |
| assert_eq!( |
| Ok(()), |
| work_scheduler.schedule_work(&root, "EVERY_MOMENT", &every_moment).await |
| ); |
| })); |
| |
| // Confirm timer and work item. |
| t.assert_next_timer_at(1); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![TestWorkUnit::new(0, &root, "EVERY_MOMENT", 0, Some(1))], |
| ); |
| |
| // Dispatch work and assert next periodic work item and timer. |
| t.set_time_and_run_timers(1); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![TestWorkUnit::new(0, &root, "EVERY_MOMENT", 2, Some(1))], |
| ); |
| t.assert_next_timer_at(3); |
| } |
| |
| #[test] |
| fn work_scheduler_time_timeout_updates_when_earlier_work_item_added() { |
| let mut t = TimeTest::new(); |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| |
| // Set batch period and queue a unit of work. |
| t.run_and_sync(&mut Box::pin(async { |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(5).await); |
| let at_nine = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(9)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "AT_NINE", &at_nine).await); |
| })); |
| |
| // Confirm timer and work item. |
| t.assert_next_timer_at(10); |
| t.assert_work_items(&work_scheduler, vec![TestWorkUnit::new(9, &root, "AT_NINE", 9, None)]); |
| |
| // Queue unit of work with deadline _earlier_ than first unit of work. |
| t.run_and_sync(&mut Box::pin(async { |
| let at_four = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(4)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "AT_FOUR", &at_four).await); |
| })); |
| |
| // Confirm timer moved _back_, and work units are as expected. |
| t.assert_next_timer_at(5); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![ |
| TestWorkUnit::new(4, &root, "AT_FOUR", 4, None), |
| TestWorkUnit::new(9, &root, "AT_NINE", 9, None), |
| ], |
| ); |
| |
| // Dispatch work and assert remaining work and timer. |
| t.set_time_and_run_timers(5); |
| t.assert_work_items(&work_scheduler, vec![TestWorkUnit::new(9, &root, "AT_NINE", 9, None)]); |
| t.assert_next_timer_at(10); |
| |
| // Queue unit of work with deadline _later_ than existing unit of work. |
| t.run_and_sync(&mut Box::pin(async { |
| let at_ten = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(10)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "AT_TEN", &at_ten).await); |
| })); |
| |
| // Confirm unchanged, and work units are as expected. |
| t.assert_next_timer_at(10); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![ |
| TestWorkUnit::new(9, &root, "AT_NINE", 9, None), |
| TestWorkUnit::new(10, &root, "AT_TEN", 10, None), |
| ], |
| ); |
| |
| // Dispatch work and assert no work left. |
| t.set_time_and_run_timers(10); |
| t.assert_no_work(&work_scheduler); |
| } |
| |
| #[test] |
| fn work_scheduler_time_late_timer_fire() { |
| let mut t = TimeTest::new(); |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| |
| // Set period and schedule two work items, one of which _should_ be dispatched in a second |
| // cycle. |
| t.run_and_sync(&mut Box::pin(async { |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(5).await); |
| let at_four = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(4)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "AT_FOUR", &at_four).await); |
| let at_nine = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(9)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "AT_NINE", &at_nine).await); |
| })); |
| |
| // Confirm timer and work items. |
| t.assert_next_timer_at(5); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![ |
| TestWorkUnit::new(4, &root, "AT_FOUR", 4, None), |
| TestWorkUnit::new(9, &root, "AT_NINE", 9, None), |
| ], |
| ); |
| |
| // Simulate delayed dispatch: System load or some other factor caused dispatch of work to be |
| // delayed beyond the deadline of _both_ units of work. |
| t.set_time_and_run_timers(16); |
| |
| // Confirm timers and dispatched units. |
| t.assert_no_timers(); |
| t.assert_no_work(&work_scheduler); |
| } |
| |
| #[test] |
| fn work_scheduler_time_late_timer_fire_periodic_work_item() { |
| let mut t = TimeTest::new(); |
| let work_scheduler = WorkScheduler::new(); |
| let root = AbsoluteMoniker::root(); |
| |
| // Set period and schedule two work items, one of which _should_ be dispatched in a second |
| // cycle. |
| t.run_and_sync(&mut Box::pin(async { |
| assert_eq!(Ok(()), work_scheduler.set_batch_period(5).await); |
| let at_four = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(4)), period: None }; |
| assert_eq!(Ok(()), work_scheduler.schedule_work(&root, "AT_FOUR", &at_four).await); |
| let at_nine_periodic = |
| fsys::WorkRequest { start: Some(fsys::Start::MonotonicTime(9)), period: Some(5) }; |
| assert_eq!( |
| Ok(()), |
| work_scheduler |
| .schedule_work(&root, "AT_NINE_PERIODIC_FIVE", &at_nine_periodic) |
| .await |
| ); |
| })); |
| |
| // Confirm timer and work items. |
| t.assert_next_timer_at(5); |
| t.assert_work_items( |
| &work_scheduler, |
| vec![ |
| TestWorkUnit::new(4, &root, "AT_FOUR", 4, None), |
| TestWorkUnit::new(9, &root, "AT_NINE_PERIODIC_FIVE", 9, Some(5)), |
| ], |
| ); |
| |
| // Simulate _seriously_ delayed dispatch: System load or some other |
| // factor caused dispatch of work to be delayed _way_ beyond the |
| // deadline of _both_ units of work. |
| t.set_time_and_run_timers(116); |
| |
| // Confirm timer set to next batch period, and periodic work item still queued. |
| t.assert_next_timer_at(120); |
| t.assert_work_items( |
| &work_scheduler, |
| // Time: |
| // now=116 |
| // WorkItem: |
| // start=9 |
| // period=5 |
| // |
| // Updated WorkItem.period should be: |
| // WorkItem.next_deadline_monotonic = 9 + 5*n |
| // where |
| // Time.now < WorkItem.next_deadline_monotonic |
| // and |
| // Time.now + WorkItem.period > WorkItem.next_deadline_monotonic |
| // |
| // WorkItem.next_deadline_monotonic = 119 = 9 + (22 * 5). |
| vec![TestWorkUnit::new(9, &root, "AT_NINE_PERIODIC_FIVE", 119, Some(5))], |
| ); |
| } |
| } |