blob: d62b72389062edccac394a43cb2a508929d083bc [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::agent::storage::device_storage::DeviceStorageAccess;
use crate::base::SettingInfo;
use crate::display::light_sensor::{open_sensor, read_sensor, Sensor};
use crate::display::types::LightData;
use crate::handler::base::Request;
use crate::handler::setting_handler::{
controller, ClientImpl, ControllerError, ControllerStateResult, Event, SettingHandlerResult,
State,
};
use async_trait::async_trait;
use fidl_fuchsia_input_report::InputDeviceMarker;
use fuchsia_async::{self as fasync, DurationExt, Task};
use fuchsia_syslog::fx_log_err;
use fuchsia_zircon::Duration;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::channel::oneshot::{self, Receiver, Sender};
use futures::future::{AbortHandle, Abortable, FusedFuture};
use futures::lock::Mutex;
use futures::prelude::*;
use std::fs::File;
use std::pin::Pin;
use std::sync::Arc;
pub const LIGHT_SENSOR_SERVICE_NAME: &str = "light_sensor_hid";
pub const LIGHT_SENSOR_CONFIG_PATH: &str = "/config/data/light_sensor_configuration.json";
const SCAN_DURATION_MS: i64 = 1000;
pub struct LightSensorController {
client: Arc<ClientImpl>,
sensor: Sensor,
sensor_task: Option<(Task<()>, Sender<()>)>,
current_value: Arc<Mutex<LightData>>,
notifier_abort: Option<AbortHandle>,
}
impl DeviceStorageAccess for LightSensorController {
const STORAGE_KEYS: &'static [&'static str] = &[];
}
#[async_trait]
impl controller::Create for LightSensorController {
async fn create(client: Arc<ClientImpl>) -> Result<Self, ControllerError> {
let service_context = client.get_service_context();
let sensor_proxy_result =
service_context.connect_named::<InputDeviceMarker>(LIGHT_SENSOR_SERVICE_NAME).await;
let sensor = if let Ok(proxy) = sensor_proxy_result {
Sensor::new(&proxy, &service_context).await.map_err(|e| {
fx_log_err!("Failed to connect to sensor: {:?}", e);
ControllerError::InitFailure(format!("Could not connect to proxy: {:?}", e).into())
})?
} else {
let file = File::open(LIGHT_SENSOR_CONFIG_PATH).map_err(|e| {
ControllerError::InitFailure(
format!("Could not open sensor configuration file: {:?}", e).into(),
)
})?;
let config = serde_json::from_reader(file).map_err(|e| {
ControllerError::InitFailure(
format!("Could not read sensor configuration file: {:?}", e).into(),
)
})?;
open_sensor(service_context, config).await.map_err(|e| {
ControllerError::InitFailure(format!("Could not connect to proxy: {:?}", e).into())
})?
};
let current_data = Arc::new(Mutex::new(get_sensor_data(&sensor).await));
Ok(Self {
client,
sensor,
sensor_task: None,
current_value: current_data,
notifier_abort: None,
})
}
}
#[async_trait]
impl controller::Handle for LightSensorController {
async fn handle(&self, request: Request) -> Option<SettingHandlerResult> {
match request {
Request::Get => {
Some(Ok(Some(SettingInfo::LightSensor(*self.current_value.lock().await))))
}
_ => None,
}
}
async fn change_state(&mut self, state: State) -> Option<ControllerStateResult> {
match state {
State::Listen => {
let (cancellation_tx, cancellation_rx) = oneshot::channel();
let (task, change_receiver) = start_light_sensor_scanner(
self.sensor.clone(),
|| fasync::Timer::new(Duration::from_millis(SCAN_DURATION_MS).after_now()),
cancellation_rx,
);
self.sensor_task = Some((task, cancellation_tx));
self.notifier_abort = Some(
notify_on_change(
change_receiver,
ClientNotifier::create(Arc::clone(&self.client)),
self.current_value.clone(),
)
.await,
);
}
State::EndListen => {
if let Some((task, cancellation_tx)) = self.sensor_task.take() {
// If this fails to cancel, the task may already be ending since that implies
// the receiving end is already dropped.
let _ = cancellation_tx.send(());
// Wait for the task to cleanly exit. If we don't we could break the proxy to
// the light sensor and prevent any future calls from returning successfully.
task.await;
}
if let Some(abort_handle) = &self.notifier_abort {
abort_handle.abort();
}
self.notifier_abort = None;
}
State::Teardown => {
if self.sensor_task.is_some() {
fx_log_err!("Sensor task is still running when teardown requested!");
}
}
_ => {}
};
None
}
}
#[async_trait]
trait LightNotifier {
async fn notify(&self, setting_info: SettingInfo);
}
struct ClientNotifier {
client: Arc<ClientImpl>,
}
impl ClientNotifier {
fn create(client: Arc<ClientImpl>) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self { client }))
}
}
#[async_trait]
impl LightNotifier for ClientNotifier {
async fn notify(&self, setting_info: SettingInfo) {
self.client.notify(Event::Changed(setting_info)).await;
}
}
/// Sends out a notificaiton when value changes. Abortable to allow for
/// startListen and endListen.
async fn notify_on_change(
mut change_receiver: UnboundedReceiver<LightData>,
notifier: Arc<Mutex<dyn LightNotifier + Send + Sync>>,
current_value: Arc<Mutex<LightData>>,
) -> AbortHandle {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
fasync::Task::spawn(
Abortable::new(
async move {
while let Some(value) = change_receiver.next().await {
*current_value.lock().await = value;
notifier.lock().await.notify(value.into()).await;
}
},
abort_registration,
)
.unwrap_or_else(|_| ()),
)
.detach();
abort_handle
}
/// Runs a task that periodically checks for changes on the light sensor and sends notifications
/// when the value changes. Will not send any initial value if nothing changes. This terminates when
/// the receiver closes without panicking.
///
/// Arguments:
///
/// * `sensor`: The sensor to read events from.
/// * `trigger_factory`: A factory that creates futures that, once resolved, signify it's time to
/// process the next event from the sensor.
/// * `cancellation_rx`: A receiver for a cancellation event. Once a signal is received, any
/// event currently being handled will finish processing, but no further events
/// will be processed.
fn start_light_sensor_scanner<F>(
sensor: Sensor,
trigger_factory: impl Fn() -> F + Send + 'static,
cancellation_rx: Receiver<()>,
) -> (Task<()>, UnboundedReceiver<LightData>)
where
F: Future<Output = ()> + Send,
{
let (sender, receiver) = unbounded::<LightData>();
(
fasync::Task::spawn(async move {
let stream = futures::stream::unfold(sensor, |sensor| async move {
let light_data = get_sensor_data(&sensor).await;
Some((light_data, sensor))
});
let trigger: Pin<Box<dyn FusedFuture<Output = ()> + Send>> =
Box::pin(futures::future::ready(()).fuse());
futures::pin_mut!(trigger, stream, cancellation_rx);
let mut data = stream.next().await.expect("There should always be a first value");
loop {
futures::select! {
_ = trigger => {
if let Some(new_data) = stream.next().await {
if data != new_data {
data = new_data;
if sender.unbounded_send(data).is_err() {
break;
}
}
}
*trigger = Box::pin(trigger_factory().fuse());
}
_ = cancellation_rx => break,
}
}
}),
receiver,
)
}
async fn get_sensor_data(sensor: &Sensor) -> LightData {
loop {
if let Some(sensor_data) =
read_sensor(&sensor).await.expect("Could not read from the sensor")
{
let lux = sensor_data.illuminance as f32;
let red = sensor_data.red as f32;
let green = sensor_data.green as f32;
let blue = sensor_data.blue as f32;
return LightData {
illuminance: lux,
color: fidl_fuchsia_ui_types::ColorRgb { red, green, blue },
};
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::base::SettingType;
use crate::display::light_sensor::testing;
use crate::service_context::{ExternalServiceProxy, ServiceContext};
use assert_matches::assert_matches;
use fidl_fuchsia_input_report::{InputReport, InputReportsReaderReadInputReportsResponder};
use fuchsia_async as fasync;
use fuchsia_zircon as zx;
use futures::channel::mpsc::{self, UnboundedSender};
type Notifier = UnboundedSender<SettingType>;
struct TestNotifier {
notifier: Notifier,
}
impl TestNotifier {
fn create(notifier: Notifier) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self { notifier }))
}
}
#[async_trait]
impl LightNotifier for TestNotifier {
async fn notify(&self, _: SettingInfo) {
self.notifier.unbounded_send(SettingType::LightSensor).unwrap();
}
}
struct DataProducer<F>
where
F: Fn() -> Vec<InputReport>,
{
producer: F,
turn_on: bool,
}
impl<F> DataProducer<F>
where
F: Fn() -> Vec<InputReport>,
{
fn new(producer: F) -> Self {
Self { producer, turn_on: false }
}
fn trigger(&mut self) {
self.turn_on = true;
}
fn produce(&self) -> Vec<InputReport> {
let mut data = (self.producer)();
if self.turn_on {
// Set illuminance value on second data report to 32
data[0].sensor.as_mut().unwrap().values.as_mut().unwrap()[1] = 32;
}
data
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_start_auto_brightness_task() {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<InputDeviceMarker>().unwrap();
let proxy = ExternalServiceProxy::new(proxy, None);
let (sensor_axes, data_fn) = testing::get_mock_sensor_response();
let data_producer = Arc::new(Mutex::new(DataProducer::new(data_fn)));
let data_producer_clone = Arc::clone(&data_producer);
let data_fn = move || {
let data_producer = Arc::clone(&data_producer_clone);
async move { data_producer.lock().await.produce() }
};
testing::spawn_mock_sensor_with_data(stream, sensor_axes, data_fn);
let service_context = ServiceContext::new(None, None);
let sensor = Sensor::new(&proxy, &service_context).await.unwrap();
let (cancellation_tx, cancellation_rx) = oneshot::channel();
let (mut trigger_tx, trigger_rx) = unbounded();
let (create_tx, mut create_rx) = unbounded();
let trigger_rx = Arc::new(Mutex::new(trigger_rx));
let trigger_factory = move || {
let mut create_tx = create_tx.clone();
let trigger_rx = Arc::clone(&trigger_rx);
async move {
create_tx.send(()).await.expect("should be able to send create signal");
trigger_rx.lock().await.next().await.expect("should be able to get trigger")
}
};
let (task, mut receiver) =
start_light_sensor_scanner(sensor, trigger_factory, cancellation_rx);
// Wait for the the factory to be called to get a new trigger.
create_rx.next().await.expect("should be able to get creation signal");
// Signal the trigger so the inner loop begins.
trigger_tx.send(()).await.expect("should be able to trigger scanner");
// Wait for the factory to be called again so we now the previous trigger was processed.
create_rx.next().await.expect("should be able to get creation signal");
let next = receiver.try_next();
if next.is_ok() {
panic!("No notifications should happen before value changes")
};
data_producer.lock().await.trigger();
// Signal the trigger so the inner loop begins.
trigger_tx.send(()).await.expect("should be able to trigger scanner");
// Wait for the factory to be called again so we now the previous trigger was processed.
create_rx.next().await.expect("should be able to get creation signal");
let data = receiver.next().await;
// For verifying default that's directly converted from u32. Direct float comparison is ok.
#[allow(clippy::float_cmp)]
{
assert_eq!(data.unwrap().illuminance, 32.0);
}
cancellation_tx.send(()).expect("could not send cancellation message");
task.await;
receiver.close();
// Make sure the final future is dropped.
assert_matches!(
trigger_tx.send(()).await,
Err(send_error) if send_error.is_disconnected()
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_start_light_sensor_scanner_scope() {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<InputDeviceMarker>().unwrap();
let proxy = ExternalServiceProxy::new(proxy, None);
let receiver: Arc<Mutex<Option<UnboundedReceiver<LightData>>>> = Arc::new(Mutex::new(None));
let (axes, data_fn) = testing::get_mock_sensor_response();
let mut counter = 0;
let receiver_clone = Arc::clone(&receiver);
let (completed_tx, mut completed_rx) = mpsc::channel(0);
let data_fn = move |responder: InputReportsReaderReadInputReportsResponder| {
let mut data = data_fn();
counter += 1;
// Close the receiver on the second request (once in the loop)
let should_close_receiver = counter == 2;
let receiver = Arc::clone(&receiver_clone);
let mut completed_tx = completed_tx.clone();
async move {
if should_close_receiver {
receiver.lock().await.as_mut().unwrap().close();
}
// Trigger a change
data[0].sensor.as_mut().unwrap().values.as_mut().unwrap()[3] += counter;
responder.send(&mut Ok(data)).unwrap();
if should_close_receiver {
completed_tx.send(()).await.expect("Could not send completion signal");
}
}
};
// likely needs to take fn that allows control of when responder sends
testing::spawn_mock_sensor_with_handler(stream, axes, data_fn);
let service_context = ServiceContext::new(None, None);
let sensor = Sensor::new(&proxy, &service_context).await.unwrap();
let (cancellation_tx, cancellation_rx) = oneshot::channel();
let trigger_factory = || futures::future::pending();
let (task, data_receiver) =
start_light_sensor_scanner(sensor, trigger_factory, cancellation_rx);
*receiver.lock().await = Some(data_receiver);
assert_eq!(completed_rx.next().await, Some(()));
cancellation_tx.send(()).expect("could not send cancellation message");
task.await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_notify_on_change() {
let data1 = LightData {
illuminance: 10.0,
color: fidl_fuchsia_ui_types::ColorRgb { red: 3.0, green: 6.0, blue: 1.0 },
};
let data2 = LightData {
illuminance: 15.0,
color: fidl_fuchsia_ui_types::ColorRgb { red: 1.0, green: 9.0, blue: 5.0 },
};
let (light_sender, light_receiver) = unbounded::<LightData>();
let (notifier_sender, mut notifier_receiver) = unbounded::<SettingType>();
let data: Arc<Mutex<LightData>> = Arc::new(Mutex::new(data1));
let aborter =
notify_on_change(light_receiver, TestNotifier::create(notifier_sender), data).await;
light_sender.unbounded_send(data2).unwrap();
assert_eq!(notifier_receiver.next().await.unwrap(), SettingType::LightSensor);
let next = notifier_receiver.try_next();
if next.is_ok() {
panic!("Only one change should have happened")
};
aborter.abort();
let sleep_duration = zx::Duration::from_millis(5);
fasync::Timer::new(sleep_duration.after_now()).await;
assert!(light_sender.is_closed());
}
}