| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::base::{SettingInfo, SettingType}, |
| crate::handler::base::{Error, Payload, Request}, |
| crate::message::base::Audience, |
| crate::service, |
| crate::service::TryFromWithClient, |
| fuchsia_async as fasync, |
| futures::channel::mpsc::UnboundedSender, |
| futures::lock::Mutex, |
| futures::stream::StreamExt, |
| std::collections::HashMap, |
| std::convert::TryFrom, |
| std::hash::Hash, |
| std::marker::PhantomData, |
| std::sync::Arc, |
| }; |
| |
| type ChangeFunction<T> = Box<dyn Fn(&T, &T) -> bool + Send + Sync + 'static>; |
| |
| /// Controller that determines whether or not a change should be sent to the |
| /// hanging get. T is the type of the value to be watched and sent to back to |
| /// the client via the sender ST. |
| struct HangingGetController<T, ST> |
| where |
| T: From<SettingInfo> + Send + Sync + 'static, |
| ST: Sender<T> + Send + Sync + 'static, |
| { |
| /// The last value that was sent to the client |
| last_sent_value: Option<T>, |
| /// Function called on change. If function returns true, tells the |
| /// handler that it should send to the hanging get. |
| change_function: ChangeFunction<T>, |
| /// If true, should send value next time watch |
| /// is called or if there is a hanging watch. |
| should_send: bool, |
| /// List of responders to send the changed value to. |
| pending_responders: Vec<(ST, Option<UnboundedSender<()>>)>, |
| } |
| |
| impl<T, ST> HangingGetController<T, ST> |
| where |
| T: From<SettingInfo> + Send + Sync + 'static, |
| ST: Sender<T> + Send + Sync + 'static, |
| { |
| fn new(change_function: ChangeFunction<T>) -> HangingGetController<T, ST> { |
| let controller = HangingGetController { |
| last_sent_value: None, |
| change_function, |
| should_send: true, |
| pending_responders: Vec::new(), |
| }; |
| |
| controller |
| } |
| |
| fn initialize(&mut self) { |
| self.last_sent_value = None; |
| self.should_send = true; |
| self.change_function = Box::new(|_old: &T, _new: &T| true); |
| } |
| |
| /// Add a pending responder that wants to be notified of the next value change. |
| fn add_pending_responder(&mut self, responder: ST, error_sender: Option<UnboundedSender<()>>) { |
| self.pending_responders.push((responder, error_sender)); |
| } |
| |
| fn on_error(&mut self, error: &anyhow::Error) { |
| self.initialize(); |
| |
| // Notify responders of error. |
| while let Some((responder, optional_exit_tx)) = self.pending_responders.pop() { |
| responder.on_error(&error); |
| if let Some(exit_tx) = optional_exit_tx { |
| exit_tx.unbounded_send(()).ok(); |
| } |
| } |
| } |
| |
| /// Should be called whenever the underlying value changes. |
| fn on_change(&mut self, new_value: &T) -> bool { |
| self.should_send = match self.last_sent_value.as_ref() { |
| Some(last_value) => (self.change_function)(&last_value, new_value), |
| None => true, |
| }; |
| self.should_send |
| } |
| |
| /// Should be called to check if we should immediately return the hanging |
| /// get. |
| fn on_watch(&self) -> bool { |
| self.should_send |
| } |
| |
| /// Called when receiving a notification that value has changed. |
| async fn send_if_needed(&mut self, response: SettingInfo) { |
| if !self.pending_responders.is_empty() { |
| while let Some((responder, _)) = self.pending_responders.pop() { |
| responder.send_response(T::from(response.clone())); |
| } |
| self.on_send(T::from(response)); |
| } |
| } |
| |
| /// Should be called whenever a value is sent to the hanging get. |
| fn on_send(&mut self, new_value: T) { |
| self.last_sent_value = Some(new_value); |
| self.should_send = false; |
| } |
| } |
| |
| /// Handler for hanging gets. |
| /// We never use the data type T directly, but it is used to constrain ST as the sender |
| /// for that type. |
| /// To use, one should implement a sender, as well as a way to convert SettingInfo into |
| /// something that sender can use. |
| /// K is the type of the key for the change_function. |
| pub struct HangingGetHandler<T, ST, K> |
| where |
| T: From<SettingInfo> + Send + Sync + 'static, |
| ST: Sender<T> + Send + Sync + 'static, |
| K: Eq + Hash + Clone + Send + Sync + 'static, |
| { |
| messenger: service::message::Messenger, |
| listen_exit_tx: Option<UnboundedSender<()>>, |
| data_type: PhantomData<T>, |
| setting_type: SettingType, |
| |
| // This controller is used for generic watch calls without type parameters. |
| default_controller: HangingGetController<T, ST>, |
| |
| // Controllers for watch calls with type parameters are stored here. |
| controllers_by_key: HashMap<K, HangingGetController<T, ST>>, |
| command_tx: UnboundedSender<ListenCommand>, |
| } |
| |
| /// Trait that should be implemented to send data to the hanging get watcher. |
| pub trait Sender<T> { |
| fn send_response(self, data: T); |
| fn on_error(self, error: &anyhow::Error); |
| } |
| |
| enum ListenCommand { |
| Change(SettingInfo), |
| Exit, |
| } |
| |
| impl<T, ST, K> Drop for HangingGetHandler<T, ST, K> |
| where |
| T: From<SettingInfo> + Send + Sync + 'static, |
| ST: Sender<T> + Send + Sync + 'static, |
| K: Eq + Hash + Clone + Send + Sync + 'static, |
| { |
| fn drop(&mut self) { |
| self.close(); |
| } |
| } |
| |
| impl<T, ST, K> HangingGetHandler<T, ST, K> |
| where |
| T: From<SettingInfo> + Send + Sync + 'static, |
| ST: Sender<T> + Send + Sync + 'static, |
| K: Eq + Hash + Clone + Send + Sync + 'static, |
| { |
| pub async fn create( |
| messenger: service::message::Messenger, |
| setting_type: SettingType, |
| ) -> Arc<Mutex<HangingGetHandler<T, ST, K>>> { |
| let (on_command_sender, mut on_command_receiver) = |
| futures::channel::mpsc::unbounded::<ListenCommand>(); |
| let hanging_get_handler = Arc::new(Mutex::new(HangingGetHandler::<T, ST, K> { |
| messenger: messenger, |
| listen_exit_tx: None, |
| data_type: PhantomData, |
| setting_type, |
| default_controller: HangingGetController::new(Box::new(|_old: &T, _new: &T| true)), |
| controllers_by_key: Default::default(), |
| command_tx: on_command_sender.clone(), |
| })); |
| |
| { |
| let hanging_get_handler_clone = hanging_get_handler.clone(); |
| fasync::Task::spawn(async move { |
| while let Some(command) = on_command_receiver.next().await { |
| match command { |
| ListenCommand::Change(setting_info) => { |
| let mut handler_lock = hanging_get_handler_clone.lock().await; |
| handler_lock.on_change(setting_info).await; |
| } |
| ListenCommand::Exit => { |
| return; |
| } |
| } |
| } |
| }) |
| .detach(); |
| } |
| |
| hanging_get_handler |
| } |
| |
| pub fn close(&mut self) { |
| if let Some(exit_tx) = self.listen_exit_tx.take() { |
| exit_tx.unbounded_send(()).ok(); |
| } |
| |
| self.command_tx.unbounded_send(ListenCommand::Exit).ok(); |
| } |
| |
| /// Park a new hanging get in the handler |
| pub async fn watch(&mut self, responder: ST, error_sender: Option<UnboundedSender<()>>) { |
| self.watch_with_change_fn( |
| None, |
| Box::new(|_old: &T, _new: &T| true), |
| responder, |
| error_sender, |
| ) |
| .await; |
| } |
| |
| /// Park a new hanging get in the handler, along with a change function. |
| /// The hanging get will only return when the change function evaluates |
| /// to true when comparing the last value sent to the client and the current |
| /// value obtained by the hanging_get_handler. |
| /// A change function is applied on change only, and not on the current state. |
| pub async fn watch_with_change_fn( |
| &mut self, |
| change_function_key: Option<K>, |
| change_function: ChangeFunction<T>, |
| responder: ST, |
| error_sender: Option<UnboundedSender<()>>, |
| ) { |
| let controller = match change_function_key.clone() { |
| None => &mut self.default_controller, |
| Some(key) => self |
| .controllers_by_key |
| .entry(key) |
| .or_insert(HangingGetController::new(change_function)), |
| }; |
| |
| controller.add_pending_responder(responder, error_sender); |
| |
| if self.listen_exit_tx.is_none() { |
| let command_tx_clone = self.command_tx.clone(); |
| let receptor = self |
| .messenger |
| .message( |
| Payload::Request(Request::Listen).into(), |
| Audience::Address(service::Address::Handler(self.setting_type)), |
| ) |
| .send(); |
| |
| let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>(); |
| self.listen_exit_tx = Some(exit_tx); |
| |
| fasync::Task::spawn(async move { |
| let receptor_fuse = receptor.fuse(); |
| futures::pin_mut!(receptor_fuse); |
| |
| loop { |
| futures::select! { |
| update = receptor_fuse.select_next_some() => { |
| if let Ok((Payload::Response(Ok(Some(setting_info))), _)) = |
| Payload::try_from_with_client(update) { |
| command_tx_clone.unbounded_send( |
| ListenCommand::Change(setting_info)).ok(); |
| } |
| } |
| _ = exit_rx.next() => { |
| return; |
| } |
| } |
| } |
| }) |
| .detach(); |
| } |
| |
| if !controller.on_watch() { |
| // Value hasn't changed, no need to send to responder. |
| return; |
| } |
| |
| match self.get_response().await { |
| Ok(response) => { |
| // We have to borrow the controllers again since |
| // self.get_response expects an immutable borrow, so we can't |
| // use it in between uses of the local variable controller. |
| match change_function_key { |
| None => self.default_controller.send_if_needed(response).await, |
| Some(key) => { |
| self.controllers_by_key |
| .get_mut(&key) |
| .unwrap() |
| .send_if_needed(response) |
| .await |
| } |
| }; |
| } |
| Err(error) => { |
| self.on_error(&error); |
| } |
| } |
| } |
| |
| /// Called when receiving a notification that value has changed. |
| async fn on_change(&mut self, setting_info: SettingInfo) { |
| let response: SettingInfo = setting_info.into(); |
| for controller in self.controllers_by_key.values_mut() { |
| if controller.on_change(&T::from(response.clone())) { |
| controller.send_if_needed(response.clone()).await; |
| } |
| } |
| if self.default_controller.on_change(&T::from(response.clone())) { |
| self.default_controller.send_if_needed(response).await; |
| } |
| } |
| |
| fn on_error(&mut self, error: &anyhow::Error) { |
| if let Some(exit_tx) = self.listen_exit_tx.take() { |
| exit_tx.unbounded_send(()).ok(); |
| } |
| |
| self.default_controller.on_error(&error); |
| for controller in self.controllers_by_key.values_mut() { |
| controller.on_error(&error); |
| } |
| } |
| |
| async fn get_response(&self) -> Result<SettingInfo, anyhow::Error> { |
| let mut receptor = self |
| .messenger |
| .message( |
| Payload::Request(Request::Get).into(), |
| Audience::Address(service::Address::Handler(self.setting_type)), |
| ) |
| .send(); |
| |
| match Payload::try_from( |
| receptor |
| .next_payload() |
| .await |
| .map_err(|_| anyhow::Error::new(Error::UnhandledType(self.setting_type)))? |
| .0, |
| ) |
| .map_err(|_| { |
| anyhow::Error::new(Error::UnexpectedError("Could not convert payload".into())) |
| })? { |
| Payload::Response(Ok(Some(setting_response))) => Ok(setting_response), |
| Payload::Response(Err(err)) => Err(anyhow::Error::new(err)), |
| _ => Err(anyhow::Error::new(Error::UnexpectedError("Unexpected payload".into()))), |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use fuchsia_async::DurationExt; |
| use fuchsia_zircon as zx; |
| use futures::channel::mpsc::UnboundedSender; |
| use std::borrow::Cow; |
| |
| use crate::base::SettingInfo; |
| use crate::display::types::{DisplayInfo, LowLightMode}; |
| use crate::message::base::MessengerType; |
| |
| use super::*; |
| |
| const ID1: f32 = 1.0; |
| const ID2: f32 = 2.0; |
| |
| const SET_ERROR: &str = "set failure"; |
| const DEFAULT_AUTO_BRIGHTNESS_VALUE: f32 = 0.5; |
| |
| #[derive(PartialEq, Debug, Clone)] |
| struct TestStruct { |
| id: f32, |
| } |
| |
| #[derive(PartialEq, Debug, Clone)] |
| enum Event { |
| Data(TestStruct), |
| Error(Error), |
| UnknownError, |
| } |
| |
| impl<C: Into<Cow<'static, str>>> From<C> for Error { |
| fn from(c: C) -> Self { |
| Error::UnexpectedError(c.into()) |
| } |
| } |
| |
| struct TestSender { |
| sender: UnboundedSender<Event>, |
| } |
| |
| struct TestSettingHandlerBuilder { |
| id_to_send: Option<f32>, |
| always_fail: bool, |
| messenger_factory: service::message::Factory, |
| setting_type: SettingType, |
| } |
| |
| impl TestSettingHandlerBuilder { |
| fn new(messenger_factory: service::message::Factory, setting_type: SettingType) -> Self { |
| Self { messenger_factory, id_to_send: None, always_fail: false, setting_type } |
| } |
| |
| fn set_initial_id(mut self, id: f32) -> Self { |
| self.id_to_send = Some(id); |
| self |
| } |
| |
| fn set_always_fail(mut self, always_fail: bool) -> Self { |
| self.always_fail = always_fail; |
| self |
| } |
| |
| async fn build(self) -> Arc<Mutex<TestSettingHandler>> { |
| TestSettingHandler::create( |
| self.messenger_factory, |
| self.id_to_send, |
| self.always_fail, |
| self.setting_type, |
| ) |
| .await |
| } |
| } |
| |
| struct TestSettingHandler { |
| id_to_send: Option<f32>, |
| listener: Option<service::message::MessageClient>, |
| always_fail: bool, |
| } |
| |
| impl TestSettingHandler { |
| async fn create( |
| messenger_factory: service::message::Factory, |
| id_to_send: Option<f32>, |
| always_fail: bool, |
| setting_type: SettingType, |
| ) -> Arc<Mutex<TestSettingHandler>> { |
| let handler = Arc::new(Mutex::new(TestSettingHandler { |
| id_to_send, |
| listener: None, |
| always_fail: always_fail, |
| })); |
| |
| let (_, mut receptor) = messenger_factory |
| .create(MessengerType::Addressable(service::Address::Handler(setting_type))) |
| .await |
| .expect("messenger should have been created"); |
| |
| let handler_clone = handler.clone(); |
| fasync::Task::spawn(async move { |
| while let Ok((payload, client)) = receptor.next_payload().await { |
| let mut handler = handler_clone.lock().await; |
| match Payload::try_from(payload) { |
| Ok(Payload::Request(request)) => { |
| handler.request(client, request); |
| } |
| _ => { |
| panic!("unexpected payload"); |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| handler |
| } |
| |
| fn set_id(&mut self, id: f32) { |
| self.id_to_send = Some(id); |
| } |
| |
| fn notify_listener(&self, value: f32) { |
| if let Some(listener) = self.listener.clone() { |
| listener |
| .reply( |
| Payload::Response(Ok(Some(SettingInfo::Brightness(DisplayInfo::new( |
| false, |
| value, |
| DEFAULT_AUTO_BRIGHTNESS_VALUE, |
| true, |
| LowLightMode::Disable, |
| None, |
| ))))) |
| .into(), |
| ) |
| .send(); |
| return; |
| } |
| panic!("Missing listener to notify"); |
| } |
| |
| fn listen(&mut self, listener: service::message::MessageClient) { |
| self.listener = Some(listener); |
| } |
| |
| fn request(&mut self, requestor: service::message::MessageClient, request: Request) { |
| match request { |
| Request::Listen => { |
| self.listen(requestor); |
| } |
| Request::Get => { |
| let mut response = None; |
| if self.always_fail { |
| response = Some(Err(Error::from(SET_ERROR))); |
| } else if let Some(value) = self.id_to_send { |
| response = Some(Ok(Some(SettingInfo::Brightness(DisplayInfo::new( |
| false, |
| value, |
| DEFAULT_AUTO_BRIGHTNESS_VALUE, |
| true, |
| LowLightMode::Disable, |
| None, |
| ))))); |
| } |
| |
| if let Some(response) = response.take() { |
| requestor.reply(Payload::Response(response).into()).send(); |
| } |
| } |
| _ => { |
| panic!("Unexpected request type"); |
| } |
| } |
| } |
| } |
| |
| impl Sender<TestStruct> for TestSender { |
| fn send_response(self, data: TestStruct) { |
| self.sender.unbounded_send(Event::Data(data)).unwrap(); |
| } |
| |
| fn on_error(self, error: &anyhow::Error) { |
| let error = match error.root_cause().downcast_ref::<Error>() { |
| Some(request_error) => Event::Error(request_error.clone()), |
| _ => Event::UnknownError, |
| }; |
| self.sender.unbounded_send(error).unwrap(); |
| } |
| } |
| |
| impl From<SettingInfo> for TestStruct { |
| fn from(response: SettingInfo) -> Self { |
| if let SettingInfo::Brightness(info) = response { |
| return TestStruct { id: info.manual_brightness_value }; |
| } |
| panic!("bad response:{:?}", response); |
| } |
| } |
| |
| fn verify_id(event: Event, id: f32) { |
| if let Event::Data(data) = event { |
| assert_eq!(data.id, id); |
| } else { |
| panic!("Should be data {:?}", event); |
| } |
| } |
| |
| /// Ensures errors are gracefully handed back by the hanging_get |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_error_resolution() { |
| let messenger_factory = service::message::create_hub(); |
| let setting_type = SettingType::Display; |
| let _ = TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type) |
| .set_always_fail(true) |
| .build() |
| .await; |
| |
| let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> = |
| HangingGetHandler::create( |
| messenger_factory.create(MessengerType::Unbound).await.unwrap().0, |
| setting_type, |
| ) |
| .await; |
| |
| let (hanging_get_responder, mut hanging_get_listener) = |
| futures::channel::mpsc::unbounded::<Event>(); |
| |
| let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>(); |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch(TestSender { sender: hanging_get_responder.clone() }, Some(exit_tx)) |
| .await; |
| |
| // The responder should receive an error |
| assert_eq!( |
| hanging_get_listener.next().await.unwrap(), |
| Event::Error(Error::from(SET_ERROR)) |
| ); |
| |
| // When set, the exit sender should also be fired |
| assert_eq!(exit_rx.next().await, Some(())); |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_change_after_watch() { |
| let messenger_factory = service::message::create_hub(); |
| let setting_type = SettingType::Display; |
| |
| let setting_handler_handle = |
| TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type) |
| .set_initial_id(ID1) |
| .build() |
| .await; |
| |
| let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> = |
| HangingGetHandler::create( |
| messenger_factory.create(MessengerType::Unbound).await.unwrap().0, |
| setting_type, |
| ) |
| .await; |
| |
| let (hanging_get_responder, mut hanging_get_listener) = |
| futures::channel::mpsc::unbounded::<Event>(); |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch(TestSender { sender: hanging_get_responder.clone() }, None) |
| .await; |
| |
| // First get should return immediately |
| verify_id(hanging_get_listener.next().await.unwrap(), ID1); |
| |
| // Subsequent one should wait until new value is notified |
| hanging_get_handler |
| .lock() |
| .await |
| .watch(TestSender { sender: hanging_get_responder.clone() }, None) |
| .await; |
| |
| setting_handler_handle.lock().await.set_id(ID2); |
| |
| setting_handler_handle.lock().await.notify_listener(ID2); |
| |
| verify_id(hanging_get_listener.next().await.unwrap(), ID2); |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_watch_after_change() { |
| let messenger_factory = service::message::create_hub(); |
| let setting_type = SettingType::Display; |
| |
| let setting_handler_handle = |
| TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type) |
| .set_initial_id(ID1) |
| .build() |
| .await; |
| |
| let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> = |
| HangingGetHandler::create( |
| messenger_factory.create(MessengerType::Unbound).await.unwrap().0, |
| setting_type, |
| ) |
| .await; |
| |
| let (hanging_get_responder, mut hanging_get_listener) = |
| futures::channel::mpsc::unbounded::<Event>(); |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch(TestSender { sender: hanging_get_responder.clone() }, None) |
| .await; |
| |
| // First get should return immediately |
| verify_id(hanging_get_listener.next().await.unwrap(), ID1); |
| |
| setting_handler_handle.lock().await.set_id(ID2); |
| |
| setting_handler_handle.lock().await.notify_listener(ID2); |
| |
| // Subsequent one should wait until new value is notified |
| hanging_get_handler |
| .lock() |
| .await |
| .watch(TestSender { sender: hanging_get_responder.clone() }, None) |
| .await; |
| |
| verify_id(hanging_get_listener.next().await.unwrap(), ID2); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_watch_with_change_function() { |
| let messenger_factory = service::message::create_hub(); |
| let setting_type = SettingType::Display; |
| let setting_handler_handle = |
| TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type) |
| .set_initial_id(ID1) |
| .build() |
| .await; |
| |
| let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> = |
| HangingGetHandler::create( |
| messenger_factory.create(MessengerType::Unbound).await.unwrap().0, |
| setting_type, |
| ) |
| .await; |
| |
| let (hanging_get_responder, mut hanging_get_listener) = |
| futures::channel::mpsc::unbounded::<Event>(); |
| |
| let min_difference = 2.0; |
| let change_function = |
| move |old: &TestStruct, new: &TestStruct| -> bool { new.id - old.id > min_difference }; |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference.to_string()), |
| Box::new(change_function), |
| TestSender { sender: hanging_get_responder.clone() }, |
| None, |
| ) |
| .await; |
| |
| // First get should return immediately even with change function |
| verify_id(hanging_get_listener.next().await.unwrap(), ID1); |
| |
| setting_handler_handle.lock().await.set_id(ID2); |
| |
| setting_handler_handle.lock().await.notify_listener(ID2); |
| |
| // Subsequent watch should return ignoring change function |
| hanging_get_handler |
| .lock() |
| .await |
| .watch(TestSender { sender: hanging_get_responder.clone() }, None) |
| .await; |
| |
| verify_id(hanging_get_listener.next().await.unwrap(), ID2); |
| |
| // Subsequent watch with change function should only return if change is big enough |
| setting_handler_handle.lock().await.set_id(ID2 + 1.0); |
| |
| setting_handler_handle.lock().await.notify_listener(ID2 + 1.0); |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference.to_string()), |
| Box::new(change_function), |
| TestSender { sender: hanging_get_responder.clone() }, |
| None, |
| ) |
| .await; |
| |
| // Must wait for some a short time to allow change to propagate. |
| let sleep_duration = zx::Duration::from_millis(1); |
| fasync::Timer::new(sleep_duration.after_now()).await; |
| |
| setting_handler_handle.lock().await.set_id(ID2 + 3.0); |
| |
| setting_handler_handle.lock().await.notify_listener(ID2 + 3.0); |
| |
| verify_id(hanging_get_listener.next().await.unwrap(), ID2 + 3.0); |
| } |
| |
| #[fuchsia_async::run_until_stalled(test)] |
| async fn test_watch_with_change_function_multiple() { |
| let messenger_factory = service::message::create_hub(); |
| let setting_type = SettingType::Display; |
| |
| let setting_handler_handle = |
| TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type) |
| .set_initial_id(ID1) |
| .build() |
| .await; |
| |
| let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> = |
| HangingGetHandler::create( |
| messenger_factory.create(MessengerType::Unbound).await.unwrap().0, |
| setting_type, |
| ) |
| .await; |
| |
| // Register first change function with a large min difference. |
| let (hanging_get_responder, mut hanging_get_listener) = |
| futures::channel::mpsc::unbounded::<Event>(); |
| let min_difference = 10.0; |
| let change_function = |
| move |old: &TestStruct, new: &TestStruct| -> bool { new.id - old.id >= min_difference }; |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference.to_string()), |
| Box::new(change_function), |
| TestSender { sender: hanging_get_responder.clone() }, |
| None, |
| ) |
| .await; |
| |
| // Register second change function with a smaller min difference. |
| let (hanging_get_responder2, mut hanging_get_listener2) = |
| futures::channel::mpsc::unbounded::<Event>(); |
| let min_difference2 = 1.0; |
| let change_function2 = move |old: &TestStruct, new: &TestStruct| -> bool { |
| new.id - old.id >= min_difference2 |
| }; |
| |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference2.to_string()), |
| Box::new(change_function2), |
| TestSender { sender: hanging_get_responder2.clone() }, |
| None, |
| ) |
| .await; |
| |
| // First get should return immediately even with change function |
| verify_id(hanging_get_listener.next().await.unwrap(), ID1); |
| verify_id(hanging_get_listener2.next().await.unwrap(), ID1); |
| |
| // Register listeners again. |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference.to_string()), |
| Box::new(change_function), |
| TestSender { sender: hanging_get_responder.clone() }, |
| None, |
| ) |
| .await; |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference2.to_string()), |
| Box::new(change_function2), |
| TestSender { sender: hanging_get_responder2.clone() }, |
| None, |
| ) |
| .await; |
| |
| // Send a value big enough to trigger the smaller change function but not the larger one. |
| setting_handler_handle.lock().await.set_id(ID2); |
| setting_handler_handle.lock().await.notify_listener(ID2); |
| |
| verify_id(hanging_get_listener2.next().await.unwrap(), ID2); |
| |
| // Re-register the listener that just finished. |
| hanging_get_handler |
| .lock() |
| .await |
| .watch_with_change_fn( |
| Some(min_difference2.to_string()), |
| Box::new(change_function2), |
| TestSender { sender: hanging_get_responder2.clone() }, |
| None, |
| ) |
| .await; |
| |
| // Send a value big enough to trigger both change functions. |
| let big_value = ID1 + min_difference; |
| setting_handler_handle.lock().await.set_id(big_value); |
| setting_handler_handle.lock().await.notify_listener(big_value); |
| |
| // Both hanging gets got the value. |
| verify_id(hanging_get_listener.next().await.unwrap(), big_value); |
| verify_id(hanging_get_listener2.next().await.unwrap(), big_value); |
| } |
| |
| #[test] |
| fn test_hanging_get_controller() { |
| let mut controller: HangingGetController<TestStruct, TestSender> = |
| HangingGetController::new(Box::new(|_old: &TestStruct, _new: &TestStruct| true)); |
| |
| // Should send change on launch |
| assert_eq!(controller.on_watch(), true); |
| assert_eq!(controller.on_change(&TestStruct { id: 1.0 }), true); |
| controller.on_send(TestStruct { id: 1.0 }); |
| |
| // After sent, without change, shouldn't send |
| assert_eq!(controller.on_watch(), false); |
| assert_eq!(controller.on_change(&TestStruct { id: 2.0 }), true); |
| controller.on_send(TestStruct { id: 2.0 }); |
| } |
| |
| #[test] |
| fn test_hanging_get_controller_with_change_function() { |
| let mut controller: HangingGetController<TestStruct, TestSender> = |
| HangingGetController::new(Box::new(Box::new(|old: &TestStruct, new: &TestStruct| { |
| old.id < new.id |
| }))); |
| |
| // Should send change on launch. |
| assert_eq!(controller.on_watch(), true); |
| assert_eq!(controller.on_change(&TestStruct { id: 1.0 }), true); |
| controller.on_send(TestStruct { id: 1.0 }); |
| |
| // Won't send if change function not triggered. |
| assert_eq!(controller.on_change(&TestStruct { id: 1.0 }), false); |
| assert_eq!(controller.on_watch(), false); |
| |
| // Will send once we get a change. |
| assert_eq!(controller.on_change(&TestStruct { id: 2.0 }), true); |
| assert_eq!(controller.on_watch(), true); |
| } |
| } |