blob: 06952d8586ea61c58c2d17b414b0a3610d49f999 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::input_reports_reader::InputReportsReader;
use anyhow::{Context as _, Error};
use async_utils::event::Event as AsyncEvent;
use fidl::endpoints::ServerEnd;
use fidl::Error as FidlError;
use fidl_fuchsia_input_report::{
DeviceDescriptor, FeatureReport, InputDeviceRequest, InputDeviceRequestStream, InputReport,
InputReportsReaderMarker,
};
use fuchsia_async as fasync;
use futures::channel::mpsc;
use futures::{future, pin_mut, StreamExt, TryFutureExt};
/// Implements the server side of the
/// `fuchsia.input.report.InputDevice` FIDL protocol. This struct also enables users to inject
/// input reports `as fuchsia.ui.input.InputReport`.
///
/// # Notes
/// * Some of the methods of `fuchsia.input.report.InputDevice` are not relevant to
/// input injection, so this implemnentation does not support them:
/// * `SendOutputReport` provides a way to change keyboard LED state.
/// If these FIDL methods are invoked, `InputDevice::flush()` will resolve to Err.
/// * This implementation does not support multiple calls to `GetInputReportsReader`,
/// since:
/// * The ideal semantics for multiple calls are not obvious, and
/// * Each `InputDevice` has a single FIDL client (an input pipeline implementation),
/// and the current input pipeline implementation is happy to use a single
/// `InputReportsReader` for the lifetime of the `InputDevice`.
pub(crate) struct InputDevice {
/// FIFO queue of reports to be consumed by calls to
/// `fuchsia.input.report.InputReportsReader.ReadInputReports()`.
/// Populated by `input_device::InputDevice`.
report_sender: futures::channel::mpsc::UnboundedSender<InputReport>,
/// `Task` to keep serving the `fuchsia.input.report.InputDevice` protocol.
_input_device_task: fasync::Task<()>,
}
impl InputDevice {
/// Creates a new `InputDevice` that will create a task to:
/// a) process requests from `request_stream`, and
/// b) respond to `GetDescriptor` calls with the descriptor generated by `descriptor_generator()`
pub(super) fn new(
request_stream: InputDeviceRequestStream,
descriptor: DeviceDescriptor,
got_input_reports_reader: AsyncEvent,
) -> Self {
let (report_sender, report_receiver) = mpsc::unbounded::<InputReport>();
// Create a `Task` to keep serving the `fuchsia.input.report.InputDevice` protocol.
let input_device_task = fasync::Task::local(Self::serve_reports(
request_stream,
descriptor,
report_receiver,
got_input_reports_reader,
));
Self { report_sender, _input_device_task: input_device_task }
}
/// Enqueues an input report, to be read by the input reports reader.
pub(super) fn send_input_report(&self, input_report: InputReport) -> Result<(), Error> {
self.report_sender
.unbounded_send(input_report)
.context("failed to send input report to reader")
}
/// Returns a `Future` which resolves when all input reports for this device
/// have been sent to the FIDL peer.
///
/// # Note
/// When the future resolves, input reports may still be sitting unread in the
/// channel to the FIDL peer.
#[cfg(test)]
pub(super) async fn flush(self) {
let Self { _input_device_task: input_device_task, report_sender } = self;
std::mem::drop(report_sender); // Drop `report_sender` to close channel.
input_device_task.await
}
/// Returns a `Future` which resolves when all `InputReport`s for this device
/// have been sent to a `fuchsia.input.InputReportsReader` client.
///
/// # Notes
/// * This function `panic()`s on error, to ensure that the error is reported
/// synchronously. Otherwise, the original error might lead to additional errors, and
/// make the integration tests that use this library harder to debug.
/// * When the `Future` resolves, `InputReports` may still be sitting unread in the
/// channel to the `fuchsia.input.InputReportsReader` client. (The client will
/// typically be an input pipeline implementation.)
///
/// # Corner cases
/// Resolves to `Err` if the `fuchsia.input.InputDevice` client did not call
/// `GetInputReportsReader()`, even if no `InputReport`s were queued.
async fn serve_reports(
request_stream: InputDeviceRequestStream,
descriptor: DeviceDescriptor,
report_receiver: mpsc::UnboundedReceiver<InputReport>,
got_input_reports_reader: AsyncEvent,
) {
// Process `fuchsia.input.report.InputDevice` requests, waiting for the `InputDevice`
// client to provide a `ServerEnd<InputReportsReader>` by calling `GetInputReportsReader()`.
let mut input_reports_reader_server_end_stream = request_stream.filter_map(|r| {
future::ready(Self::handle_device_request(
r,
&descriptor,
got_input_reports_reader.clone(),
))
});
let input_reports_reader_fut = {
let reader_server_end = input_reports_reader_server_end_stream
.next()
.await
.unwrap_or_else(|| panic!("stream ended without a call to GetInputReportsReader"));
InputReportsReader {
request_stream: reader_server_end.into_stream().unwrap_or_else(|e| {
panic!("failed to convert ServerEnd<InputReportsReader>: {e}")
}),
report_receiver,
}
.into_future()
};
pin_mut!(input_reports_reader_fut);
// Create a `Future` to keep serving the `fuchsia.input.report.InputDevice` protocol.
// This time, receiving a `ServerEnd<InputReportsReaderMarker>` will be an `Err`.
let input_device_server_fut = async {
match input_reports_reader_server_end_stream.next().await {
Some(_server_end) => {
// There are no obvious "best" semantics for how to handle multiple
// `GetInputReportsReader` calls, and there is no current need to
// do so. Instead of taking a guess at what the client might want
// in such a case, just `panic()`.
panic!("InputDevice does not support multiple GetInputReportsReader calls")
}
None => Ok(()),
}
};
pin_mut!(input_device_server_fut);
// Now, process both `fuchsia.input.report.InputDevice` requests, and
// `fuchsia.input.report.InputReportsReader` requests. And keep processing
// `InputReportsReader` requests even if the `InputDevice` connection
// is severed.
future::select(
input_device_server_fut.and_then(|_: ()| future::pending()),
input_reports_reader_fut,
)
.await
.factor_first()
.0
.unwrap_or_else(|e| panic!("processing FIDL requests: {e}"))
}
/// Processes a single request from an `InputDeviceRequestStream`
///
/// # Returns
/// * Some(ServerEnd<InputReportsReaderMarker>) if the request yielded an
/// `InputReportsReader`. `InputDevice` should route its `InputReports` to the yielded
/// `InputReportsReader`.
/// * None if the request was fully processed by `handle_device_request()`
///
/// # Note
/// * This function `panic()`s on error. See `serve_reports()` for the reason why.
fn handle_device_request(
request: Result<InputDeviceRequest, FidlError>,
descriptor: &DeviceDescriptor,
got_input_reports_reader: AsyncEvent,
) -> Option<ServerEnd<InputReportsReaderMarker>> {
match request {
Ok(InputDeviceRequest::GetInputReportsReader { reader: reader_server_end, .. }) => {
let _ = got_input_reports_reader.signal();
Some(reader_server_end)
}
Ok(InputDeviceRequest::GetDescriptor { responder }) => {
match responder.send(&descriptor) {
Ok(()) => None,
Err(e) => panic!("failed to send GetDescriptor response: {e}"),
}
}
Ok(InputDeviceRequest::GetFeatureReport { responder }) => {
match responder.send(Ok(&FeatureReport::default())) {
Ok(()) => None,
Err(e) => panic!("failed to send GetFeatureReport response: {e}"),
}
}
Err(e) => {
panic!("failed to read `InputReportsReader` request: {:?}", &e);
}
_ => {
panic!(
"InputDevice::handle_device_request does not support this request: {:?}",
&request
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints;
use fidl_fuchsia_input_report::{DeviceDescriptor, InputDeviceMarker};
use fuchsia_async as fasync;
mod responds_to_get_feature_report_request {
use super::*;
#[fasync::run_until_stalled(test)]
async fn single_request_before_call_to_get_feature_report() -> Result<(), Error> {
let (proxy, request_stream) = endpoints::create_proxy_and_stream::<InputDeviceMarker>()
.context("creating InputDevice proxy and stream")?;
let input_device_server_fut = Box::new(InputDevice::new(
request_stream,
DeviceDescriptor::default(),
AsyncEvent::new(),
))
.flush();
let get_feature_report_fut = proxy.get_feature_report();
// Avoid unrelated `panic()`: `InputDevice` requires clients to get an input
// reports reader, to help debug integration test failures where no component
// read events from the fake device.
let (_input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>()
.context("internal error creating InputReportsReader proxy and server end")?;
let _ = proxy.get_input_reports_reader(input_reports_reader_server_end);
std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
let (_, get_feature_report_result) =
future::join(input_device_server_fut, get_feature_report_fut).await;
assert_eq!(
get_feature_report_result.context("fidl error")?,
Ok(FeatureReport::default())
);
Ok(())
}
}
mod responds_to_get_descriptor_request {
use super::utils::{make_input_device_proxy_and_struct, make_touchscreen_descriptor};
use super::*;
use assert_matches::assert_matches;
use futures::task::Poll;
#[fasync::run_until_stalled(test)]
async fn single_request_before_call_to_get_input_reports_reader() -> Result<(), Error> {
let (proxy, request_stream) = endpoints::create_proxy_and_stream::<InputDeviceMarker>()
.context("creating InputDevice proxy and stream")?;
let input_device_server_fut = Box::new(InputDevice::new(
request_stream,
make_touchscreen_descriptor(),
AsyncEvent::new(),
))
.flush();
let get_descriptor_fut = proxy.get_descriptor();
// Avoid unrelated `panic()`: `InputDevice` requires clients to get an input
// reports reader, to help debug integration test failures where no component
// read events from the fake device.
let (_input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>()
.context("internal error creating InputReportsReader proxy and server end")?;
let _ = proxy.get_input_reports_reader(input_reports_reader_server_end);
std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
let (_, get_descriptor_result) =
future::join(input_device_server_fut, get_descriptor_fut).await;
assert_eq!(get_descriptor_result.context("fidl error")?, make_touchscreen_descriptor());
Ok(())
}
#[test]
fn multiple_requests_before_call_to_get_input_reports_reader() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let (proxy, request_stream) = endpoints::create_proxy_and_stream::<InputDeviceMarker>()
.context("creating InputDevice proxy and stream")?;
let input_device_server_fut = Box::new(InputDevice::new(
request_stream,
make_touchscreen_descriptor(),
AsyncEvent::new(),
))
.flush();
pin_mut!(input_device_server_fut);
let mut get_descriptor_fut = proxy.get_descriptor();
assert_matches!(
executor.run_until_stalled(&mut input_device_server_fut),
Poll::Pending
);
std::mem::drop(executor.run_until_stalled(&mut get_descriptor_fut));
let mut get_descriptor_fut = proxy.get_descriptor();
let _ = executor.run_until_stalled(&mut input_device_server_fut);
assert_matches!(
executor.run_until_stalled(&mut get_descriptor_fut),
Poll::Ready(Ok(_))
);
Ok(())
}
#[test]
fn after_call_to_get_input_reports_reader_with_report_pending() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, got_input_reports_reader) =
make_input_device_proxy_and_struct();
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.context("internal error queuing input event")?;
let input_device_server_fut = input_device.flush();
pin_mut!(input_device_server_fut);
let (_input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>()
.context("internal error creating InputReportsReader proxy and server end")?;
input_device_proxy
.get_input_reports_reader(input_reports_reader_server_end)
.context("sending get_input_reports_reader request")?;
assert_matches!(
executor.run_until_stalled(&mut input_device_server_fut),
Poll::Pending
);
let mut get_descriptor_fut = input_device_proxy.get_descriptor();
assert_matches!(
executor.run_until_stalled(&mut input_device_server_fut),
Poll::Pending
);
assert_matches!(executor.run_until_stalled(&mut get_descriptor_fut), Poll::Ready(_));
let mut got_input_reports_reader_fut = got_input_reports_reader.wait();
assert_matches!(
executor.run_until_stalled(&mut got_input_reports_reader_fut),
Poll::Ready(_)
);
Ok(())
}
}
mod future_resolution {
use super::utils::{make_input_device_proxy_and_struct, make_input_reports_reader_proxy};
use super::*;
use futures::task::Poll;
mod resolves_after_all_reports_are_sent_to_input_reports_reader {
use super::*;
use assert_matches::assert_matches;
#[test]
fn if_device_request_channel_was_closed() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy); // Close device request channel.
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Ready(()));
}
#[test]
fn even_if_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Ready(()));
}
#[test]
fn even_if_reports_was_empty_and_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Ready(()));
}
}
mod panics_if_peer_closed_device_channel_without_calling_get_input_reports_reader {
use super::*;
use assert_matches::assert_matches;
#[test]
#[should_panic]
fn if_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy);
// Run the executor until the `InputDevice` causes a panic.
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending);
}
#[test]
#[should_panic]
fn even_if_no_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy);
// Run the executor until the `InputDevice` causes a panic.
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending);
}
}
mod is_pending_if_peer_has_device_channel_open_and_has_not_called_get_input_reports_reader {
use super::*;
use assert_matches::assert_matches;
#[test]
fn if_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (_input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_no_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (_input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_get_device_descriptor_has_been_called() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
let _get_descriptor_fut = input_device_proxy.get_descriptor();
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
}
mod is_pending_if_peer_has_not_read_any_reports_when_a_report_is_available {
use super::*;
use assert_matches::assert_matches;
#[test]
fn if_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let _input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_device_channel_is_closed() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let _input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
std::mem::drop(input_device_proxy); // Terminate `InputDeviceRequestStream`.
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
}
mod is_pending_if_peer_did_not_read_all_reports {
use super::*;
use assert_matches::assert_matches;
use fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT;
#[test]
fn if_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
(0..=MAX_DEVICE_REPORT_COUNT).for_each(|_| {
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
});
// One query isn't enough to consume all of the reports queued above.
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_device_request_channel_is_closed() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
(0..=MAX_DEVICE_REPORT_COUNT).for_each(|_| {
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
});
// One query isn't enough to consume all of the reports queued above.
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy); // Terminate `InputDeviceRequestStream`.
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
}
}
mod utils {
use {
super::*,
fidl_fuchsia_input_report::{
Axis, ContactInputDescriptor, InputDeviceProxy, InputReportsReaderProxy, Range,
TouchDescriptor, TouchInputDescriptor, TouchType, Unit, UnitType,
},
//fuchsia_zircon as zx,
};
/// Creates a `DeviceDescriptor` for a touchscreen that spans [-1000, 1000] on both axes.
pub(super) fn make_touchscreen_descriptor() -> DeviceDescriptor {
DeviceDescriptor {
touch: Some(TouchDescriptor {
input: Some(TouchInputDescriptor {
contacts: Some(
std::iter::repeat(ContactInputDescriptor {
position_x: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
position_y: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
contact_width: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
contact_height: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
..Default::default()
})
.take(10)
.collect(),
),
max_contacts: Some(10),
touch_type: Some(TouchType::Touchscreen),
buttons: Some(vec![]),
..Default::default()
}),
..Default::default()
}),
..Default::default()
}
}
/// Creates an `InputDeviceProxy`, for sending `fuchsia.input.report.InputDevice`
/// requests, and an `InputDevice` struct that will receive the FIDL requests
/// from the `InputDeviceProxy`.S
///
/// # Returns
/// A tuple of the proxy and struct. The struct is `Box`-ed so that the caller
/// can easily invoke `flush()`.
pub(super) fn make_input_device_proxy_and_struct(
) -> (InputDeviceProxy, Box<InputDevice>, AsyncEvent) {
let (input_device_proxy, input_device_request_stream) =
endpoints::create_proxy_and_stream::<InputDeviceMarker>()
.expect("creating InputDevice proxy and stream");
let got_input_reports_reader = AsyncEvent::new();
let input_device = Box::new(InputDevice::new(
input_device_request_stream,
DeviceDescriptor::default(),
got_input_reports_reader.clone(),
));
(input_device_proxy, input_device, got_input_reports_reader)
}
/// Creates an `InputReportsReaderProxy`, for sending
/// `fuchsia.input.report.InputReportsReader` requests, and registers that
/// `InputReportsReader` with the `InputDevice` bound to `InputDeviceProxy`.
///
/// # Returns
/// The newly created `InputReportsReaderProxy`.
pub(super) fn make_input_reports_reader_proxy(
input_device_proxy: &InputDeviceProxy,
) -> InputReportsReaderProxy {
let (input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>()
.expect("internal error creating InputReportsReader proxy and server end");
input_device_proxy
.get_input_reports_reader(input_reports_reader_server_end)
.expect("sending get_input_reports_reader request");
input_reports_reader_proxy
}
}
}