blob: 35bebbc6113eab3538dbb5836a104440f4b00f47 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
async_trait::async_trait,
breakpoint_system_client::Injector,
fidl_fuchsia_test_workscheduler as fws,
fuchsia_async::{Time, Timer},
futures::{
channel::*,
future::{select_all, BoxFuture},
lock::Mutex,
sink::SinkExt,
StreamExt,
},
std::{
convert::TryInto,
error::Error,
fmt::{self as fmt, Display, Formatter},
sync::Arc,
time::Duration,
},
};
#[derive(Debug)]
pub struct Timeout(Duration);
#[derive(Debug, Eq, PartialEq)]
pub struct DispatchedEvent {
work_id: String,
}
impl Display for Timeout {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Operation timed out (timeout={})", self.0.as_nanos())
}
}
impl Error for Timeout {
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
}
impl DispatchedEvent {
pub fn new(work_id: String) -> Self {
Self { work_id }
}
}
enum DispatchTimeout {
Dispatched(DispatchedEvent),
Timeout(Timeout),
}
pub struct WorkSchedulerDispatchReporter {
dispatched_tx: mpsc::Sender<DispatchedEvent>,
dispatched_rx: Mutex<mpsc::Receiver<DispatchedEvent>>,
}
impl WorkSchedulerDispatchReporter {
pub fn new() -> Arc<Self> {
let (tx, rx) = mpsc::channel(0);
Arc::new(Self { dispatched_tx: tx, dispatched_rx: Mutex::new(rx) })
}
pub async fn wait_for_dispatched(&self, timeout: Duration) -> Result<DispatchedEvent, Timeout> {
let timer = Box::pin(Self::wait_for(timeout)) as BoxFuture<'_, DispatchTimeout>;
let dispatched_event = Box::pin(self.get_dispatched()) as BoxFuture<'_, DispatchTimeout>;
let (result, _, _) = select_all(vec![timer, dispatched_event]).await;
match result {
DispatchTimeout::Dispatched(dispatched_event) => Ok(dispatched_event),
DispatchTimeout::Timeout(timeout) => Err(timeout),
}
}
async fn wait_for(timeout: Duration) -> DispatchTimeout {
let now = Time::now().into_nanos();
let delta: i64 = timeout.as_nanos().try_into().unwrap();
let timer = Timer::new(Time::from_nanos(now + delta));
let _ = timer.await;
DispatchTimeout::Timeout(Timeout(timeout))
}
async fn get_dispatched(&self) -> DispatchTimeout {
let mut rx = self.dispatched_rx.lock().await;
DispatchTimeout::Dispatched(rx.next().await.unwrap())
}
}
#[async_trait]
impl Injector for WorkSchedulerDispatchReporter {
type Marker = fws::WorkSchedulerDispatchReporterMarker;
async fn serve(
self: Arc<Self>,
mut request_stream: fws::WorkSchedulerDispatchReporterRequestStream,
) {
while let Some(Ok(request)) = request_stream.next().await {
match request {
fws::WorkSchedulerDispatchReporterRequest::OnDoWorkCalled {
work_id,
responder,
} => {
// Complete the exchange with the client before notifying the integration
// test of the report. This prevents the client from behing hung up on by
// the integration test if asserting that the report has arrived is the last
// step before the integration test completes.
// TODO(markdittmer): Do something with on_do_work_called errors.
responder.send().unwrap();
self.dispatched_tx.clone().send(DispatchedEvent::new(work_id)).await.unwrap();
}
}
}
}
}