blob: e59693ebc60c68e549fba08cbd4209d461e33e28 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![warn(missing_docs)]
use {
anyhow::{format_err, Context as _, Error},
fidl_fuchsia_input_report::{
InputReport, InputReportsReaderRequest, InputReportsReaderRequestStream,
},
futures::{StreamExt, TryStreamExt},
std::convert::TryFrom as _,
};
/// Implements the server side of the `fuchsia.input.report.InputReportsReader`
/// protocol. Used by `modern_backend::InputDevice`.
pub(super) struct InputReportsReader {
pub(super) request_stream: InputReportsReaderRequestStream,
/// FIFO queue of reports to be consumed by calls to
/// `fuchsia.input.report.InputReportsReader.ReadInputReports()`.
pub(super) report_receiver: futures::channel::mpsc::UnboundedReceiver<InputReport>,
}
impl InputReportsReader {
/// Returns a `Future` that resolves when
/// * `self.reports` is empty, or
/// * `self.request_stream` yields `None`, or
/// * an error occurs (invalid FIDL request, failure to send FIDL response).
///
/// # Resolves to
/// * `Ok(())` if all reports were written successfully
/// * `Err` otherwise
///
/// # Corner cases
/// If `self.reports` is _initially_ empty, the returned `Future` will resolve immediately.
///
/// # Note
/// When the future resolves, `InputReports` may still be sitting unread in the
/// channel to the `fuchsia.input.report.InputReportsReader` client. (The client will
/// typically be an input pipeline implementation.)
pub(super) async fn into_future(self) -> Result<(), Error> {
// Group `reports` into chunks, to respect the requirements of the `InputReportsReader`
// protocol. Then `zip()` each chunk with a `InputReportsReader` protocol request.
// * If there are more chunks than requests, then some of the `InputReport`s were
// not sent to the `InputReportsReader` client. In this case, this function
// will report an error by checking `reports.is_done()` below.
// * If there are more requests than reports, no special-case handling is needed.
// This is because an input pipeline implementation will normally issue
// `ReadInputReports` requests indefinitely.
let chunk_size = usize::try_from(fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT)
.context("converting MAX_DEVICE_REPORT_COUNT to usize")?;
let mut reports = self.report_receiver.ready_chunks(chunk_size).fuse();
self.request_stream
.zip(reports.by_ref())
.map(|(request, reports)| match request {
Ok(request) => Ok((request, reports)),
Err(e) => Err(anyhow::Error::from(e).context("while reading reader request")),
})
.try_for_each(|request_and_reports| async {
match request_and_reports {
(InputReportsReaderRequest::ReadInputReports { responder }, reports) => {
responder
.send(Ok(&reports))
.map_err(anyhow::Error::from)
.context("while sending reports")
}
}
})
.await?;
match reports.is_done() {
true => Ok(()),
false => Err(format_err!("request_stream terminated with reports still pending")),
}
}
}
#[cfg(test)]
mod tests {
use {
super::{InputReport, InputReportsReader},
anyhow::{Context as _, Error},
fidl::endpoints,
fidl_fuchsia_input_report::{InputReportsReaderMarker, MAX_DEVICE_REPORT_COUNT},
fuchsia_async as fasync, fuchsia_zircon as zx,
futures::future,
};
mod report_count {
use {
super::*,
futures::{pin_mut, task::Poll},
};
#[fasync::run_until_stalled(test)]
async fn serves_single_report() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let reports_fut = proxy.read_input_reports();
std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
let (_, reports_result) = future::join(reader_fut, reports_fut).await;
let reports = reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), 1, "incorrect reports length");
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn serves_max_report_count_reports() -> Result<(), Error> {
let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
.context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
for _ in 0..max_reports {
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
}
let reports_fut = proxy.read_input_reports();
std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
let (_, reports_result) = future::join(reader_fut, reports_fut).await;
let reports = reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), max_reports, "incorrect reports length");
Ok(())
}
#[test]
fn splits_overflowed_reports_to_next_read() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
.context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
for _ in 0..max_reports + 1 {
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
}
pin_mut!(reader_fut);
// Note: this test deliberately serializes its FIDL requests. Concurrent requests
// are tested separately, in `super::fidl_interactions::preserves_query_order()`.
let reports_fut = proxy.read_input_reports();
let _ = executor.run_until_stalled(&mut reader_fut);
pin_mut!(reports_fut);
match executor.run_until_stalled(&mut reports_fut) {
Poll::Pending => panic!("read did not complete (1st query)"),
Poll::Ready(res) => {
let reports = res
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), max_reports, "incorrect reports length (1st query)");
}
}
let reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
let _ = executor.run_until_stalled(&mut reader_fut);
pin_mut!(reports_fut);
match executor.run_until_stalled(&mut reports_fut) {
Poll::Pending => panic!("read did not complete (2nd query)"),
Poll::Ready(res) => {
let reports = res
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), 1, "incorrect reports length (2nd query)");
}
}
Ok(())
}
}
mod future_resolution {
use super::*;
use assert_matches::assert_matches;
#[fasync::run_until_stalled(test)]
async fn resolves_to_ok_when_all_reports_are_written() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let _reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
assert_matches!(reader_fut.await, Ok(()));
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn resolves_to_err_when_request_stream_is_terminated_before_reports_are_written(
) -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
assert_matches!(reader_fut.await, Err(_));
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn resolves_to_err_if_request_stream_yields_error() -> Result<(), Error> {
let (client_end, request_stream) =
endpoints::create_request_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader client_end and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
client_end
.into_channel()
.write(b"not a valid FIDL message", /* handles */ &mut [])
.expect("internal error writing to channel");
assert_matches!(reader_fut.await, Err(_)); // while reading reader request
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn resolves_to_ok_if_client_closes_channel_and_ignores_reply() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let result_fut = proxy.read_input_reports(); // Send query.
std::mem::drop(result_fut); // Close handle to channel.
std::mem::drop(proxy); // Close other handle to channel.
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
assert_matches!(reader_fut.await, Ok(()));
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn immediately_resolves_to_ok_when_reports_is_initially_empty() -> Result<(), Error> {
let (_proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
assert_matches!(reader_fut.await, Ok(()));
Ok(())
}
}
mod fidl_interactions {
use {
super::*,
assert_matches::assert_matches,
futures::{pin_mut, task::Poll},
};
#[test]
fn closes_channel_after_reports_are_consumed() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
// Process the first query. This should close the FIDL connection.
let futures = future::join(reader_fut, reports_fut);
pin_mut!(futures);
std::mem::drop(executor.run_until_stalled(&mut futures));
// Try sending another query. This should fail.
assert_matches!(
executor.run_until_stalled(&mut proxy.read_input_reports()),
Poll::Ready(Err(fidl::Error::ClientChannelClosed { .. }))
);
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn preserves_query_order() -> Result<(), Error> {
let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
.context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
for _ in 0..max_reports + 1 {
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
}
let first_reports_fut = proxy.read_input_reports();
let second_reports_fut = proxy.read_input_reports();
std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
let (_, first_reports_result, second_reports_result) =
futures::join!(reader_fut, first_reports_fut, second_reports_fut);
let first_reports = first_reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
let second_reports = second_reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(first_reports.len(), max_reports, "incorrect reports length (1st query)");
assert_eq!(second_reports.len(), 1, "incorrect reports length (2nd query)");
Ok(())
}
}
#[fasync::run_until_stalled(test)]
async fn preserves_report_order() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>()
.context("creating InputReportsReader proxy and stream")?;
let (report_sender, report_receiver) = futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport { event_time: Some(1), ..Default::default() })
.expect("sending first InputReport");
report_sender
.unbounded_send(InputReport { event_time: Some(2), ..Default::default() })
.expect("sending second InputReport");
let reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
assert_eq!(
future::join(reader_fut, reports_fut)
.await
.1
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error")
.iter()
.map(|report| report.event_time)
.collect::<Vec<_>>(),
[Some(1), Some(2)]
);
Ok(())
}
}