[session] add InputDeviceRegistry support

Serve the `fuchsia.input.injection.InputDeviceRegistry` protocol
from the session, so that we can inject input into that session
for testing purposes.

Note that further work is needed before we can actually run
a test with input injection. The overall work is tracked
in fxbug.dev/60787 (children of that bug are sufficient,
but not all strictly necessary).

Bug: 62127
Test: fx test workstation_session_tests
Change-Id: Ib91ec1ed944da1393c383fe53e09ec1c62e77058
Reviewed-on: https://fuchsia-review.googlesource.com/c/experiences/+/443760
Reviewed-by: Chase Latta <chaselatta@google.com>
Testability-Review: Chase Latta <chaselatta@google.com>
Commit-Queue: Mukesh Agrawal <quiche@google.com>
diff --git a/session_shells/ermine/session/BUILD.gn b/session_shells/ermine/session/BUILD.gn
index 31bbddb..aed89f0 100644
--- a/session_shells/ermine/session/BUILD.gn
+++ b/session_shells/ermine/session/BUILD.gn
@@ -18,6 +18,7 @@
 
   deps = [
     "//sdk/fidl/fuchsia.component:fuchsia.component-rustc",
+    "//sdk/fidl/fuchsia.input.injection:fuchsia.input.injection-rustc",
     "//sdk/fidl/fuchsia.input.report:fuchsia.input.report-rustc",
     "//sdk/fidl/fuchsia.session:fuchsia.session-rustc",
     "//sdk/fidl/fuchsia.sys:fuchsia.sys-rustc",
@@ -43,6 +44,7 @@
     "//third_party/rust_crates:anyhow",
     "//third_party/rust_crates:async-trait",
     "//third_party/rust_crates:futures",
+    "//third_party/rust_crates:matches",
     "//third_party/rust_crates:rand",
   ]
 
@@ -53,6 +55,7 @@
     "src/element_repository/event_handler.rs",
     "src/element_repository/mod.rs",
     "src/element_repository/testing_utils.rs",
+    "src/input_device_registry_server.rs",
     "src/input_testing_utilities.rs",
     "src/main.rs",
     "src/mouse_pointer_hack.rs",
diff --git a/session_shells/ermine/session/meta/workstation_session.cml b/session_shells/ermine/session/meta/workstation_session.cml
index 916f67d..f89e552 100644
--- a/session_shells/ermine/session/meta/workstation_session.cml
+++ b/session_shells/ermine/session/meta/workstation_session.cml
@@ -13,7 +13,12 @@
         },
     ],
     capabilities: [
-        { protocol: "fuchsia.session.ElementManager" },
+        {
+            protocol: [
+                "fuchsia.input.injection.InputDeviceRegistry",
+                "fuchsia.session.ElementManager",
+            ],
+        },
     ],
     use: [
         { runner: "elf" },
@@ -49,7 +54,10 @@
     ],
     expose: [
         {
-            protocol: "fuchsia.session.ElementManager",
+            protocol: [
+                "fuchsia.input.injection.InputDeviceRegistry",
+                "fuchsia.session.ElementManager",
+            ],
             from: "self",
         },
     ],
diff --git a/session_shells/ermine/session/src/input_device_registry_server.rs b/session_shells/ermine/session/src/input_device_registry_server.rs
new file mode 100644
index 0000000..9f21068
--- /dev/null
+++ b/session_shells/ermine/session/src/input_device_registry_server.rs
@@ -0,0 +1,71 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    anyhow::Error, fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
+    futures::channel::mpsc::UnboundedSender,
+};
+
+/// A struct which forwards `InputDeviceRegistryRequestStream`s over an
+/// `mpsc::UnboundedSender`.
+pub(crate) struct InputDeviceRegistryServer {
+    sender: UnboundedSender<InputDeviceRegistryRequestStream>,
+}
+
+/// Creates an `InputDeviceRegistryServer`.
+///
+/// Returns both the server, and the `mpsc::UnboundedReceiver` which can be
+/// used to receive `InputDeviceRegistryRequestStream`'s forwarded by the server.
+pub(crate) fn make_server_and_receiver() -> (
+    InputDeviceRegistryServer,
+    futures::channel::mpsc::UnboundedReceiver<InputDeviceRegistryRequestStream>,
+) {
+    let (sender, receiver) = futures::channel::mpsc::unbounded();
+    (InputDeviceRegistryServer { sender }, receiver)
+}
+
+impl InputDeviceRegistryServer {
+    /// Handles the incoming `InputDeviceRegistryRequestStream`.
+    ///
+    /// Simply forwards the stream over the `mpsc::UnboundedSender`.
+    pub async fn handle_request(
+        &self,
+        stream: InputDeviceRegistryRequestStream,
+    ) -> Result<(), Error> {
+        self.sender.unbounded_send(stream).map_err(anyhow::Error::from)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::*, fidl::endpoints::create_proxy_and_stream,
+        fidl_fuchsia_input_injection::InputDeviceRegistryMarker, fuchsia_async as fasync,
+        matches::assert_matches,
+    };
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_handle_request_forwards_stream_and_returns_ok() {
+        let (server, mut receiver) = make_server_and_receiver();
+        let (_proxy, stream) = create_proxy_and_stream::<InputDeviceRegistryMarker>()
+            .expect("should make proxy/stream");
+        assert_matches!(server.handle_request(stream).await, Ok(()));
+
+        // Note: can't use `assert_matches!()` here, because `InputDeviceRegistryRequestStream`
+        // does not implement `Debug`.
+        match receiver.try_next() {
+            Ok(opt) => assert!(opt.is_some()),
+            Err(e) => panic!("reading failed with {:#?}", e),
+        }
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_handle_request_returns_error_on_disconnected_receiver() {
+        let (server, receiver) = make_server_and_receiver();
+        let (_proxy, stream) = create_proxy_and_stream::<InputDeviceRegistryMarker>()
+            .expect("should make proxy/stream");
+        std::mem::drop(receiver);
+        assert_matches!(server.handle_request(stream).await, Err(_));
+    }
+}
diff --git a/session_shells/ermine/session/src/main.rs b/session_shells/ermine/session/src/main.rs
index b3f3762..67d091f 100644
--- a/session_shells/ermine/session/src/main.rs
+++ b/session_shells/ermine/session/src/main.rs
@@ -8,6 +8,7 @@
 #[macro_use]
 mod input_testing_utilities;
 mod element_repository;
+mod input_device_registry_server;
 mod mouse_pointer_hack;
 mod pointer_hack_server;
 mod touch_pointer_hack;
@@ -16,11 +17,13 @@
 use {
     crate::{
         element_repository::{ElementEventHandler, ElementManagerServer, ElementRepository},
+        input_device_registry_server::InputDeviceRegistryServer,
         pointer_hack_server::PointerHackServer,
     },
     anyhow::{Context as _, Error},
     element_management::SimpleElementManager,
     fidl::endpoints::DiscoverableService,
+    fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
     fidl_fuchsia_session::{
         ElementManagerMarker, ElementManagerRequestStream, GraphicalPresenterMarker,
     },
@@ -36,7 +39,7 @@
         client::{connect_to_service, launch_with_options, App, LaunchOptions},
         server::ServiceFs,
     },
-    fuchsia_syslog::fx_log_info,
+    fuchsia_syslog::{fx_log_err, fx_log_info},
     fuchsia_zircon as zx,
     futures::{try_join, StreamExt},
     scene_management::{self, SceneManager},
@@ -46,6 +49,7 @@
 
 enum ExposedServices {
     ElementManager(ElementManagerRequestStream),
+    InputDeviceRegistry(InputDeviceRegistryRequestStream),
 }
 
 /// The maximum number of open requests to this component.
@@ -84,15 +88,19 @@
 
 async fn expose_services(
     element_server: ElementManagerServer<SimpleElementManager>,
+    input_device_registry_server: InputDeviceRegistryServer,
 ) -> Result<(), Error> {
     let mut fs = ServiceFs::new_local();
-    fs.dir("svc").add_fidl_service(ExposedServices::ElementManager);
+    fs.dir("svc")
+        .add_fidl_service(ExposedServices::ElementManager)
+        .add_fidl_service(ExposedServices::InputDeviceRegistry);
 
     fs.take_and_serve_directory_handle()?;
 
     // create a reference so that we can use this within the `for_each_concurrent` generator.
     // If we do not create a ref we will run into issues with the borrow checker.
     let element_server_ref = &element_server;
+    let input_device_registry_server_ref = &input_device_registry_server;
     fs.for_each_concurrent(
         NUM_CONCURRENT_REQUESTS,
         move |service_request: ExposedServices| async move {
@@ -102,6 +110,32 @@
                     fx_log_info!("received incoming element manager request");
                     let _ = element_server_ref.handle_request(request_stream).await;
                 }
+                ExposedServices::InputDeviceRegistry(request_stream) => {
+                    match input_device_registry_server_ref.handle_request(request_stream).await {
+                        Ok(()) => (),
+                        Err(e) => {
+                            // If `handle_request()` returns `Err`, then the `unbounded_send()` call
+                            // from `handle_request()` failed with either:
+                            // * `TrySendError::SendErrorKind::Full`, or
+                            // * `TrySendError::SendErrorKind::Disconnected`.
+                            //
+                            // These are unexpected, because:
+                            // * `Full` can't happen, because `InputDeviceRegistryServer`
+                            //   uses an `UnboundedSender`.
+                            // * `Disconnected` is highly unlikely, because the corresponding
+                            //   `UnboundedReceiver` lives in `main::input_fut`, and `input_fut`'s
+                            //   lifetime is nearly as long as `input_device_registry_server`'s.
+                            //
+                            // Nonetheless, InputDeviceRegistry isn't critical to production use.
+                            // So we just log the error and move on.
+                            fx_log_err!(
+                                "failed to forward InputDeviceRegistryRequestStream: {:?}; \
+                                 must restart to enable input injection",
+                                e
+                            )
+                        }
+                    }
+                }
             }
         },
     )
@@ -163,8 +197,15 @@
         scene_manager.add_view_to_scene(view_provider, Some("Ermine".to_string())).await?;
     let set_focus_fut = set_view_focus(Arc::downgrade(&scene_manager.focuser), view_ref);
 
-    let services_fut = expose_services(element_repository.make_server());
-    let input_fut = workstation_input_pipeline::handle_input(scene_manager, &pointer_hack_server);
+    let (input_device_registry_server, input_device_registry_request_stream_receiver) =
+        input_device_registry_server::make_server_and_receiver();
+    let services_fut =
+        expose_services(element_repository.make_server(), input_device_registry_server);
+    let input_fut = workstation_input_pipeline::handle_input(
+        scene_manager,
+        &pointer_hack_server,
+        input_device_registry_request_stream_receiver,
+    );
     let element_manager_fut = element_repository.run_with_handler(&mut handler);
     let focus_fut = input::focus_listening::handle_focus_changes();
 
diff --git a/session_shells/ermine/session/src/workstation_input_pipeline.rs b/session_shells/ermine/session/src/workstation_input_pipeline.rs
index cbda697..3e593c8 100644
--- a/session_shells/ermine/session/src/workstation_input_pipeline.rs
+++ b/session_shells/ermine/session/src/workstation_input_pipeline.rs
@@ -7,13 +7,20 @@
     crate::pointer_hack_server::PointerHackServer,
     crate::touch_pointer_hack::*,
     anyhow::{Context, Error},
+    fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
     fidl_fuchsia_ui_shortcut as ui_shortcut, fuchsia_async as fasync,
     fuchsia_component::client::connect_to_service,
+    fuchsia_syslog::fx_log_warn,
     futures::StreamExt,
     input::{
-        ime_handler::ImeHandler, input_device, input_handler::InputHandler,
-        input_pipeline::InputPipeline, mouse_handler::MouseHandler,
-        shortcut_handler::ShortcutHandler, touch_handler::TouchHandler, Position, Size,
+        ime_handler::ImeHandler,
+        input_device,
+        input_handler::InputHandler,
+        input_pipeline::{self, InputPipeline},
+        mouse_handler::MouseHandler,
+        shortcut_handler::ShortcutHandler,
+        touch_handler::TouchHandler,
+        Position, Size,
     },
     scene_management::{self, FlatSceneManager, SceneManager, ScreenCoordinates},
 };
@@ -28,6 +35,9 @@
 pub async fn handle_input(
     scene_manager: FlatSceneManager,
     pointer_hack_server: &PointerHackServer,
+    input_device_registry_request_stream_receiver: futures::channel::mpsc::UnboundedReceiver<
+        InputDeviceRegistryRequestStream,
+    >,
 ) -> Result<(), Error> {
     let input_pipeline = InputPipeline::new(
         vec![
@@ -40,7 +50,14 @@
     .await
     .context("Failed to create InputPipeline.")?;
 
-    input_pipeline.handle_input_events().await;
+    let input_device_registry_fut = handle_input_device_registry_request_streams(
+        input_device_registry_request_stream_receiver,
+        input_pipeline.input_device_types.clone(),
+        input_pipeline.input_event_sender.clone(),
+        input_pipeline.input_device_bindings.clone(),
+    );
+    let input_pipeline_fut = input_pipeline.handle_input_events();
+    futures::join!(input_device_registry_fut, input_pipeline_fut);
     Ok(())
 }
 
@@ -144,3 +161,35 @@
 
     handlers.push(Box::new(touch_hack));
 }
+
+async fn handle_input_device_registry_request_streams(
+    stream_receiver: futures::channel::mpsc::UnboundedReceiver<InputDeviceRegistryRequestStream>,
+    input_device_types: Vec<input_device::InputDeviceType>,
+    input_event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
+    input_device_bindings: input_pipeline::InputDeviceBindingHashMap,
+) {
+    // It's unlikely that multiple clients will concurrently connect to the InputDeviceRegistry.
+    // However, if multiple clients do connect concurrently, we don't want said clients
+    // depending on the serialization that would be provided by `for_each()`.
+    stream_receiver
+        .for_each_concurrent(None, |stream| async {
+            match InputPipeline::handle_input_device_registry_request_stream(
+                stream,
+                &input_device_types,
+                &input_event_sender,
+                &input_device_bindings,
+            )
+            .await
+            {
+                Ok(()) => (),
+                Err(e) => {
+                    fx_log_warn!(
+                        "failure while serving InputDeviceRegistry: {}; \
+                         will continue serving other clients",
+                        e
+                    );
+                }
+            }
+        })
+        .await;
+}