| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use super::input_controller::{InputController, InputError, Request}; |
| use super::types::InputInfo; |
| use crate::input::types::{DeviceStateSource, InputDevice, InputDeviceType}; |
| use anyhow::{Error, anyhow}; |
| use async_utils::hanging_get::server; |
| use fidl_fuchsia_settings::{ |
| Error as SettingsError, InputRequest, InputRequestStream, InputSettings, InputState, |
| InputWatchResponder, |
| }; |
| use fuchsia_async as fasync; |
| use futures::StreamExt; |
| use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; |
| use futures::channel::oneshot; |
| use settings_common::inspect::event::{ |
| RequestType, ResponseType, UsagePublisher, UsageResponsePublisher, |
| }; |
| |
| fn to_request(fidl_input_states: Vec<InputState>) -> Result<Vec<InputDevice>, Error> { |
| // Every device requires at least a device type and state flags. |
| let mut input_states_invalid_args = fidl_input_states |
| .iter() |
| .filter(|input_state| input_state.device_type.is_none() || input_state.state.is_none()); |
| |
| // If any devices were filtered out, the args were invalid, so exit. |
| if input_states_invalid_args.next().is_some() { |
| return Err(anyhow!("Failed to parse input request: missing args")); |
| } |
| |
| let input_states = fidl_input_states |
| .iter() |
| .map(|input_state| { |
| let device_type: InputDeviceType = input_state.device_type.unwrap().into(); |
| let device_state = input_state.state.clone().unwrap().into(); |
| let device_name = input_state.name.clone().unwrap_or_else(|| device_type.to_string()); |
| let source_states = [(DeviceStateSource::SOFTWARE, device_state)].into(); |
| InputDevice { name: device_name, device_type, state: device_state, source_states } |
| }) |
| .collect(); |
| |
| Ok(input_states) |
| } |
| |
| pub(super) type SubscriberObject = (UsageResponsePublisher<InputInfo>, InputWatchResponder); |
| type HangingGetFn = fn(&InputInfo, SubscriberObject) -> bool; |
| pub(super) type HangingGet = server::HangingGet<InputInfo, SubscriberObject, HangingGetFn>; |
| pub(super) type Publisher = server::Publisher<InputInfo, SubscriberObject, HangingGetFn>; |
| pub(super) type Subscriber = server::Subscriber<InputInfo, SubscriberObject, HangingGetFn>; |
| |
| pub struct InputFidlHandler { |
| hanging_get: HangingGet, |
| controller_tx: UnboundedSender<Request>, |
| usage_publisher: UsagePublisher<InputInfo>, |
| } |
| |
| impl InputFidlHandler { |
| pub(crate) fn new( |
| input_controller: &mut InputController, |
| usage_publisher: UsagePublisher<InputInfo>, |
| initial_value: InputInfo, |
| ) -> (Self, UnboundedReceiver<Request>) { |
| let hanging_get = HangingGet::new(initial_value, Self::hanging_get); |
| input_controller.register_publisher(hanging_get.new_publisher()); |
| let (controller_tx, controller_rx) = mpsc::unbounded(); |
| (Self { hanging_get, controller_tx, usage_publisher }, controller_rx) |
| } |
| |
| fn hanging_get(info: &InputInfo, (usage_responder, responder): SubscriberObject) -> bool { |
| usage_responder.respond(format!("{info:?}"), ResponseType::OkSome); |
| if let Err(e) = responder.send(&InputSettings::from(info)) { |
| log::warn!("Failed to respond to watch request: {e:?}"); |
| return false; |
| } |
| true |
| } |
| |
| pub fn handle_stream(&mut self, mut stream: InputRequestStream) { |
| let request_handler = RequestHandler { |
| subscriber: self.hanging_get.new_subscriber(), |
| controller_tx: self.controller_tx.clone(), |
| usage_publisher: self.usage_publisher.clone(), |
| }; |
| fasync::Task::local(async move { |
| while let Some(Ok(request)) = stream.next().await { |
| request_handler.handle_request(request).await; |
| } |
| }) |
| .detach(); |
| } |
| } |
| |
| #[derive(Debug)] |
| enum HandlerError { |
| AlreadySubscribed, |
| InvalidArgument( |
| // Error used by Debug impl for inspect logs. |
| #[allow(dead_code)] Error, |
| ), |
| ControllerStopped, |
| Controller(InputError), |
| } |
| |
| impl From<&HandlerError> for ResponseType { |
| fn from(error: &HandlerError) -> Self { |
| match error { |
| HandlerError::AlreadySubscribed => ResponseType::AlreadySubscribed, |
| HandlerError::InvalidArgument(_) => ResponseType::InvalidArgument, |
| HandlerError::ControllerStopped => ResponseType::UnexpectedError, |
| HandlerError::Controller(e) => ResponseType::from(e), |
| } |
| } |
| } |
| |
| struct RequestHandler { |
| subscriber: Subscriber, |
| controller_tx: UnboundedSender<Request>, |
| usage_publisher: UsagePublisher<InputInfo>, |
| } |
| |
| impl RequestHandler { |
| async fn handle_request(&self, request: InputRequest) { |
| match request { |
| InputRequest::Watch { responder } => { |
| let usage_res = self.usage_publisher.request("Watch".to_string(), RequestType::Get); |
| if let Err((usage_res, responder)) = |
| self.subscriber.register2((usage_res, responder)) |
| { |
| let e = HandlerError::AlreadySubscribed; |
| usage_res.respond(format!("Err({e:?})"), ResponseType::from(&e)); |
| drop(responder); |
| } |
| } |
| InputRequest::Set { input_states, responder } => { |
| let usage_res = self |
| .usage_publisher |
| .request(format!("Set{{input_states:{input_states:?}}}"), RequestType::Set); |
| if let Err(e) = self.set(input_states).await { |
| usage_res.respond(format!("Err({e:?}"), ResponseType::from(&e)); |
| let _ = responder.send(Err(SettingsError::Failed)); |
| } else { |
| usage_res.respond("Ok(())".to_string(), ResponseType::OkNone); |
| let _ = responder.send(Ok(())); |
| } |
| } |
| } |
| } |
| |
| async fn set(&self, input_states: Vec<InputState>) -> Result<(), HandlerError> { |
| let (set_tx, set_rx) = oneshot::channel(); |
| let info = to_request(input_states).map_err(|e| HandlerError::InvalidArgument(e))?; |
| self.controller_tx |
| .unbounded_send(Request::Set(info, set_tx)) |
| .map_err(|_| HandlerError::ControllerStopped)?; |
| set_rx |
| .await |
| .map_err(|_| HandlerError::ControllerStopped) |
| .and_then(|res| res.map_err(HandlerError::Controller)) |
| } |
| } |