blob: da93aadee38a5f962ea97513098d3c741dc9cf0f [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::base::{SettingInfo, SettingType},
crate::handler::base::{Error, Payload, Request},
crate::message::base::Audience,
crate::service,
crate::service::TryFromWithClient,
fuchsia_async as fasync,
futures::channel::mpsc::UnboundedSender,
futures::lock::Mutex,
futures::stream::StreamExt,
std::collections::HashMap,
std::convert::TryFrom,
std::hash::Hash,
std::marker::PhantomData,
std::sync::Arc,
};
type ChangeFunction<T> = Box<dyn Fn(&T, &T) -> bool + Send + Sync + 'static>;
/// Controller that determines whether or not a change should be sent to the
/// hanging get. T is the type of the value to be watched and sent to back to
/// the client via the sender ST.
struct HangingGetController<T, ST>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
{
/// The last value that was sent to the client
last_sent_value: Option<T>,
/// Function called on change. If function returns true, tells the
/// handler that it should send to the hanging get.
change_function: ChangeFunction<T>,
/// If true, should send value next time watch
/// is called or if there is a hanging watch.
should_send: bool,
/// List of responders to send the changed value to.
pending_responders: Vec<(ST, Option<UnboundedSender<()>>)>,
}
impl<T, ST> HangingGetController<T, ST>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
{
fn new(change_function: ChangeFunction<T>) -> HangingGetController<T, ST> {
let controller = HangingGetController {
last_sent_value: None,
change_function,
should_send: true,
pending_responders: Vec::new(),
};
controller
}
fn initialize(&mut self) {
self.last_sent_value = None;
self.should_send = true;
self.change_function = Box::new(|_old: &T, _new: &T| true);
}
/// Add a pending responder that wants to be notified of the next value change.
fn add_pending_responder(&mut self, responder: ST, error_sender: Option<UnboundedSender<()>>) {
self.pending_responders.push((responder, error_sender));
}
fn on_error(&mut self, error: &anyhow::Error) {
self.initialize();
// Notify responders of error.
while let Some((responder, optional_exit_tx)) = self.pending_responders.pop() {
responder.on_error(&error);
if let Some(exit_tx) = optional_exit_tx {
exit_tx.unbounded_send(()).ok();
}
}
}
/// Should be called whenever the underlying value changes.
fn on_change(&mut self, new_value: &T) -> bool {
self.should_send = match self.last_sent_value.as_ref() {
Some(last_value) => (self.change_function)(&last_value, new_value),
None => true,
};
self.should_send
}
/// Should be called to check if we should immediately return the hanging
/// get.
fn on_watch(&self) -> bool {
self.should_send
}
/// Called when receiving a notification that value has changed.
async fn send_if_needed(&mut self, response: SettingInfo) {
if !self.pending_responders.is_empty() {
while let Some((responder, _)) = self.pending_responders.pop() {
responder.send_response(T::from(response.clone()));
}
self.on_send(T::from(response));
}
}
/// Should be called whenever a value is sent to the hanging get.
fn on_send(&mut self, new_value: T) {
self.last_sent_value = Some(new_value);
self.should_send = false;
}
}
/// Handler for hanging gets.
/// We never use the data type T directly, but it is used to constrain ST as the sender
/// for that type.
/// To use, one should implement a sender, as well as a way to convert SettingInfo into
/// something that sender can use.
/// K is the type of the key for the change_function.
pub struct HangingGetHandler<T, ST, K>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
messenger: service::message::Messenger,
listen_exit_tx: Option<UnboundedSender<()>>,
data_type: PhantomData<T>,
setting_type: SettingType,
// This controller is used for generic watch calls without type parameters.
default_controller: HangingGetController<T, ST>,
// Controllers for watch calls with type parameters are stored here.
controllers_by_key: HashMap<K, HangingGetController<T, ST>>,
command_tx: UnboundedSender<ListenCommand>,
}
/// Trait that should be implemented to send data to the hanging get watcher.
pub trait Sender<T> {
fn send_response(self, data: T);
fn on_error(self, error: &anyhow::Error);
}
enum ListenCommand {
Change(SettingInfo),
Exit,
}
impl<T, ST, K> Drop for HangingGetHandler<T, ST, K>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
self.close();
}
}
impl<T, ST, K> HangingGetHandler<T, ST, K>
where
T: From<SettingInfo> + Send + Sync + 'static,
ST: Sender<T> + Send + Sync + 'static,
K: Eq + Hash + Clone + Send + Sync + 'static,
{
pub async fn create(
messenger: service::message::Messenger,
setting_type: SettingType,
) -> Arc<Mutex<HangingGetHandler<T, ST, K>>> {
let (on_command_sender, mut on_command_receiver) =
futures::channel::mpsc::unbounded::<ListenCommand>();
let hanging_get_handler = Arc::new(Mutex::new(HangingGetHandler::<T, ST, K> {
messenger: messenger,
listen_exit_tx: None,
data_type: PhantomData,
setting_type,
default_controller: HangingGetController::new(Box::new(|_old: &T, _new: &T| true)),
controllers_by_key: Default::default(),
command_tx: on_command_sender.clone(),
}));
{
let hanging_get_handler_clone = hanging_get_handler.clone();
fasync::Task::spawn(async move {
while let Some(command) = on_command_receiver.next().await {
match command {
ListenCommand::Change(setting_info) => {
let mut handler_lock = hanging_get_handler_clone.lock().await;
handler_lock.on_change(setting_info).await;
}
ListenCommand::Exit => {
return;
}
}
}
})
.detach();
}
hanging_get_handler
}
pub fn close(&mut self) {
if let Some(exit_tx) = self.listen_exit_tx.take() {
exit_tx.unbounded_send(()).ok();
}
self.command_tx.unbounded_send(ListenCommand::Exit).ok();
}
/// Park a new hanging get in the handler
pub async fn watch(&mut self, responder: ST, error_sender: Option<UnboundedSender<()>>) {
self.watch_with_change_fn(
None,
Box::new(|_old: &T, _new: &T| true),
responder,
error_sender,
)
.await;
}
/// Park a new hanging get in the handler, along with a change function.
/// The hanging get will only return when the change function evaluates
/// to true when comparing the last value sent to the client and the current
/// value obtained by the hanging_get_handler.
/// A change function is applied on change only, and not on the current state.
pub async fn watch_with_change_fn(
&mut self,
change_function_key: Option<K>,
change_function: ChangeFunction<T>,
responder: ST,
error_sender: Option<UnboundedSender<()>>,
) {
let controller = match change_function_key.clone() {
None => &mut self.default_controller,
Some(key) => self
.controllers_by_key
.entry(key)
.or_insert(HangingGetController::new(change_function)),
};
controller.add_pending_responder(responder, error_sender);
if self.listen_exit_tx.is_none() {
let command_tx_clone = self.command_tx.clone();
let receptor = self
.messenger
.message(
Payload::Request(Request::Listen).into(),
Audience::Address(service::Address::Handler(self.setting_type)),
)
.send();
let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
self.listen_exit_tx = Some(exit_tx);
fasync::Task::spawn(async move {
let receptor_fuse = receptor.fuse();
futures::pin_mut!(receptor_fuse);
loop {
futures::select! {
update = receptor_fuse.select_next_some() => {
if let Ok((Payload::Response(Ok(Some(setting_info))), _)) =
Payload::try_from_with_client(update) {
command_tx_clone.unbounded_send(
ListenCommand::Change(setting_info)).ok();
}
}
_ = exit_rx.next() => {
return;
}
}
}
})
.detach();
}
if !controller.on_watch() {
// Value hasn't changed, no need to send to responder.
return;
}
match self.get_response().await {
Ok(response) => {
// We have to borrow the controllers again since
// self.get_response expects an immutable borrow, so we can't
// use it in between uses of the local variable controller.
match change_function_key {
None => self.default_controller.send_if_needed(response).await,
Some(key) => {
self.controllers_by_key
.get_mut(&key)
.unwrap()
.send_if_needed(response)
.await
}
};
}
Err(error) => {
self.on_error(&error);
}
}
}
/// Called when receiving a notification that value has changed.
async fn on_change(&mut self, setting_info: SettingInfo) {
let response: SettingInfo = setting_info.into();
for controller in self.controllers_by_key.values_mut() {
if controller.on_change(&T::from(response.clone())) {
controller.send_if_needed(response.clone()).await;
}
}
if self.default_controller.on_change(&T::from(response.clone())) {
self.default_controller.send_if_needed(response).await;
}
}
fn on_error(&mut self, error: &anyhow::Error) {
if let Some(exit_tx) = self.listen_exit_tx.take() {
exit_tx.unbounded_send(()).ok();
}
self.default_controller.on_error(&error);
for controller in self.controllers_by_key.values_mut() {
controller.on_error(&error);
}
}
async fn get_response(&self) -> Result<SettingInfo, anyhow::Error> {
let mut receptor = self
.messenger
.message(
Payload::Request(Request::Get).into(),
Audience::Address(service::Address::Handler(self.setting_type)),
)
.send();
match Payload::try_from(
receptor
.next_payload()
.await
.map_err(|_| anyhow::Error::new(Error::UnhandledType(self.setting_type)))?
.0,
)
.map_err(|_| {
anyhow::Error::new(Error::UnexpectedError("Could not convert payload".into()))
})? {
Payload::Response(Ok(Some(setting_response))) => Ok(setting_response),
Payload::Response(Err(err)) => Err(anyhow::Error::new(err)),
_ => Err(anyhow::Error::new(Error::UnexpectedError("Unexpected payload".into()))),
}
}
}
#[cfg(test)]
mod tests {
use fuchsia_async::DurationExt;
use fuchsia_zircon as zx;
use futures::channel::mpsc::UnboundedSender;
use std::borrow::Cow;
use crate::base::SettingInfo;
use crate::display::types::{DisplayInfo, LowLightMode};
use crate::message::base::MessengerType;
use super::*;
const ID1: f32 = 1.0;
const ID2: f32 = 2.0;
const SET_ERROR: &str = "set failure";
const DEFAULT_AUTO_BRIGHTNESS_VALUE: f32 = 0.5;
#[derive(PartialEq, Debug, Clone)]
struct TestStruct {
id: f32,
}
#[derive(PartialEq, Debug, Clone)]
enum Event {
Data(TestStruct),
Error(Error),
UnknownError,
}
impl<C: Into<Cow<'static, str>>> From<C> for Error {
fn from(c: C) -> Self {
Error::UnexpectedError(c.into())
}
}
struct TestSender {
sender: UnboundedSender<Event>,
}
struct TestSettingHandlerBuilder {
id_to_send: Option<f32>,
always_fail: bool,
messenger_factory: service::message::Factory,
setting_type: SettingType,
}
impl TestSettingHandlerBuilder {
fn new(messenger_factory: service::message::Factory, setting_type: SettingType) -> Self {
Self { messenger_factory, id_to_send: None, always_fail: false, setting_type }
}
fn set_initial_id(mut self, id: f32) -> Self {
self.id_to_send = Some(id);
self
}
fn set_always_fail(mut self, always_fail: bool) -> Self {
self.always_fail = always_fail;
self
}
async fn build(self) -> Arc<Mutex<TestSettingHandler>> {
TestSettingHandler::create(
self.messenger_factory,
self.id_to_send,
self.always_fail,
self.setting_type,
)
.await
}
}
struct TestSettingHandler {
id_to_send: Option<f32>,
listener: Option<service::message::MessageClient>,
always_fail: bool,
}
impl TestSettingHandler {
async fn create(
messenger_factory: service::message::Factory,
id_to_send: Option<f32>,
always_fail: bool,
setting_type: SettingType,
) -> Arc<Mutex<TestSettingHandler>> {
let handler = Arc::new(Mutex::new(TestSettingHandler {
id_to_send,
listener: None,
always_fail: always_fail,
}));
let (_, mut receptor) = messenger_factory
.create(MessengerType::Addressable(service::Address::Handler(setting_type)))
.await
.expect("messenger should have been created");
let handler_clone = handler.clone();
fasync::Task::spawn(async move {
while let Ok((payload, client)) = receptor.next_payload().await {
let mut handler = handler_clone.lock().await;
match Payload::try_from(payload) {
Ok(Payload::Request(request)) => {
handler.request(client, request);
}
_ => {
panic!("unexpected payload");
}
}
}
})
.detach();
handler
}
fn set_id(&mut self, id: f32) {
self.id_to_send = Some(id);
}
fn notify_listener(&self, value: f32) {
if let Some(listener) = self.listener.clone() {
listener
.reply(
Payload::Response(Ok(Some(SettingInfo::Brightness(DisplayInfo::new(
false,
value,
DEFAULT_AUTO_BRIGHTNESS_VALUE,
true,
LowLightMode::Disable,
None,
)))))
.into(),
)
.send();
return;
}
panic!("Missing listener to notify");
}
fn listen(&mut self, listener: service::message::MessageClient) {
self.listener = Some(listener);
}
fn request(&mut self, requestor: service::message::MessageClient, request: Request) {
match request {
Request::Listen => {
self.listen(requestor);
}
Request::Get => {
let mut response = None;
if self.always_fail {
response = Some(Err(Error::from(SET_ERROR)));
} else if let Some(value) = self.id_to_send {
response = Some(Ok(Some(SettingInfo::Brightness(DisplayInfo::new(
false,
value,
DEFAULT_AUTO_BRIGHTNESS_VALUE,
true,
LowLightMode::Disable,
None,
)))));
}
if let Some(response) = response.take() {
requestor.reply(Payload::Response(response).into()).send();
}
}
_ => {
panic!("Unexpected request type");
}
}
}
}
impl Sender<TestStruct> for TestSender {
fn send_response(self, data: TestStruct) {
self.sender.unbounded_send(Event::Data(data)).unwrap();
}
fn on_error(self, error: &anyhow::Error) {
let error = match error.root_cause().downcast_ref::<Error>() {
Some(request_error) => Event::Error(request_error.clone()),
_ => Event::UnknownError,
};
self.sender.unbounded_send(error).unwrap();
}
}
impl From<SettingInfo> for TestStruct {
fn from(response: SettingInfo) -> Self {
if let SettingInfo::Brightness(info) = response {
return TestStruct { id: info.manual_brightness_value };
}
panic!("bad response:{:?}", response);
}
}
fn verify_id(event: Event, id: f32) {
if let Event::Data(data) = event {
assert_eq!(data.id, id);
} else {
panic!("Should be data {:?}", event);
}
}
/// Ensures errors are gracefully handed back by the hanging_get
#[fuchsia_async::run_until_stalled(test)]
async fn test_error_resolution() {
let messenger_factory = service::message::create_hub();
let setting_type = SettingType::Display;
let _ = TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type)
.set_always_fail(true)
.build()
.await;
let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> =
HangingGetHandler::create(
messenger_factory.create(MessengerType::Unbound).await.unwrap().0,
setting_type,
)
.await;
let (hanging_get_responder, mut hanging_get_listener) =
futures::channel::mpsc::unbounded::<Event>();
let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
hanging_get_handler
.lock()
.await
.watch(TestSender { sender: hanging_get_responder.clone() }, Some(exit_tx))
.await;
// The responder should receive an error
assert_eq!(
hanging_get_listener.next().await.unwrap(),
Event::Error(Error::from(SET_ERROR))
);
// When set, the exit sender should also be fired
assert_eq!(exit_rx.next().await, Some(()));
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_change_after_watch() {
let messenger_factory = service::message::create_hub();
let setting_type = SettingType::Display;
let setting_handler_handle =
TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type)
.set_initial_id(ID1)
.build()
.await;
let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> =
HangingGetHandler::create(
messenger_factory.create(MessengerType::Unbound).await.unwrap().0,
setting_type,
)
.await;
let (hanging_get_responder, mut hanging_get_listener) =
futures::channel::mpsc::unbounded::<Event>();
hanging_get_handler
.lock()
.await
.watch(TestSender { sender: hanging_get_responder.clone() }, None)
.await;
// First get should return immediately
verify_id(hanging_get_listener.next().await.unwrap(), ID1);
// Subsequent one should wait until new value is notified
hanging_get_handler
.lock()
.await
.watch(TestSender { sender: hanging_get_responder.clone() }, None)
.await;
setting_handler_handle.lock().await.set_id(ID2);
setting_handler_handle.lock().await.notify_listener(ID2);
verify_id(hanging_get_listener.next().await.unwrap(), ID2);
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_watch_after_change() {
let messenger_factory = service::message::create_hub();
let setting_type = SettingType::Display;
let setting_handler_handle =
TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type)
.set_initial_id(ID1)
.build()
.await;
let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> =
HangingGetHandler::create(
messenger_factory.create(MessengerType::Unbound).await.unwrap().0,
setting_type,
)
.await;
let (hanging_get_responder, mut hanging_get_listener) =
futures::channel::mpsc::unbounded::<Event>();
hanging_get_handler
.lock()
.await
.watch(TestSender { sender: hanging_get_responder.clone() }, None)
.await;
// First get should return immediately
verify_id(hanging_get_listener.next().await.unwrap(), ID1);
setting_handler_handle.lock().await.set_id(ID2);
setting_handler_handle.lock().await.notify_listener(ID2);
// Subsequent one should wait until new value is notified
hanging_get_handler
.lock()
.await
.watch(TestSender { sender: hanging_get_responder.clone() }, None)
.await;
verify_id(hanging_get_listener.next().await.unwrap(), ID2);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_watch_with_change_function() {
let messenger_factory = service::message::create_hub();
let setting_type = SettingType::Display;
let setting_handler_handle =
TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type)
.set_initial_id(ID1)
.build()
.await;
let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> =
HangingGetHandler::create(
messenger_factory.create(MessengerType::Unbound).await.unwrap().0,
setting_type,
)
.await;
let (hanging_get_responder, mut hanging_get_listener) =
futures::channel::mpsc::unbounded::<Event>();
let min_difference = 2.0;
let change_function =
move |old: &TestStruct, new: &TestStruct| -> bool { new.id - old.id > min_difference };
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference.to_string()),
Box::new(change_function),
TestSender { sender: hanging_get_responder.clone() },
None,
)
.await;
// First get should return immediately even with change function
verify_id(hanging_get_listener.next().await.unwrap(), ID1);
setting_handler_handle.lock().await.set_id(ID2);
setting_handler_handle.lock().await.notify_listener(ID2);
// Subsequent watch should return ignoring change function
hanging_get_handler
.lock()
.await
.watch(TestSender { sender: hanging_get_responder.clone() }, None)
.await;
verify_id(hanging_get_listener.next().await.unwrap(), ID2);
// Subsequent watch with change function should only return if change is big enough
setting_handler_handle.lock().await.set_id(ID2 + 1.0);
setting_handler_handle.lock().await.notify_listener(ID2 + 1.0);
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference.to_string()),
Box::new(change_function),
TestSender { sender: hanging_get_responder.clone() },
None,
)
.await;
// Must wait for some a short time to allow change to propagate.
let sleep_duration = zx::Duration::from_millis(1);
fasync::Timer::new(sleep_duration.after_now()).await;
setting_handler_handle.lock().await.set_id(ID2 + 3.0);
setting_handler_handle.lock().await.notify_listener(ID2 + 3.0);
verify_id(hanging_get_listener.next().await.unwrap(), ID2 + 3.0);
}
#[fuchsia_async::run_until_stalled(test)]
async fn test_watch_with_change_function_multiple() {
let messenger_factory = service::message::create_hub();
let setting_type = SettingType::Display;
let setting_handler_handle =
TestSettingHandlerBuilder::new(messenger_factory.clone(), setting_type)
.set_initial_id(ID1)
.build()
.await;
let hanging_get_handler: Arc<Mutex<HangingGetHandler<TestStruct, TestSender, String>>> =
HangingGetHandler::create(
messenger_factory.create(MessengerType::Unbound).await.unwrap().0,
setting_type,
)
.await;
// Register first change function with a large min difference.
let (hanging_get_responder, mut hanging_get_listener) =
futures::channel::mpsc::unbounded::<Event>();
let min_difference = 10.0;
let change_function =
move |old: &TestStruct, new: &TestStruct| -> bool { new.id - old.id >= min_difference };
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference.to_string()),
Box::new(change_function),
TestSender { sender: hanging_get_responder.clone() },
None,
)
.await;
// Register second change function with a smaller min difference.
let (hanging_get_responder2, mut hanging_get_listener2) =
futures::channel::mpsc::unbounded::<Event>();
let min_difference2 = 1.0;
let change_function2 = move |old: &TestStruct, new: &TestStruct| -> bool {
new.id - old.id >= min_difference2
};
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference2.to_string()),
Box::new(change_function2),
TestSender { sender: hanging_get_responder2.clone() },
None,
)
.await;
// First get should return immediately even with change function
verify_id(hanging_get_listener.next().await.unwrap(), ID1);
verify_id(hanging_get_listener2.next().await.unwrap(), ID1);
// Register listeners again.
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference.to_string()),
Box::new(change_function),
TestSender { sender: hanging_get_responder.clone() },
None,
)
.await;
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference2.to_string()),
Box::new(change_function2),
TestSender { sender: hanging_get_responder2.clone() },
None,
)
.await;
// Send a value big enough to trigger the smaller change function but not the larger one.
setting_handler_handle.lock().await.set_id(ID2);
setting_handler_handle.lock().await.notify_listener(ID2);
verify_id(hanging_get_listener2.next().await.unwrap(), ID2);
// Re-register the listener that just finished.
hanging_get_handler
.lock()
.await
.watch_with_change_fn(
Some(min_difference2.to_string()),
Box::new(change_function2),
TestSender { sender: hanging_get_responder2.clone() },
None,
)
.await;
// Send a value big enough to trigger both change functions.
let big_value = ID1 + min_difference;
setting_handler_handle.lock().await.set_id(big_value);
setting_handler_handle.lock().await.notify_listener(big_value);
// Both hanging gets got the value.
verify_id(hanging_get_listener.next().await.unwrap(), big_value);
verify_id(hanging_get_listener2.next().await.unwrap(), big_value);
}
#[test]
fn test_hanging_get_controller() {
let mut controller: HangingGetController<TestStruct, TestSender> =
HangingGetController::new(Box::new(|_old: &TestStruct, _new: &TestStruct| true));
// Should send change on launch
assert_eq!(controller.on_watch(), true);
assert_eq!(controller.on_change(&TestStruct { id: 1.0 }), true);
controller.on_send(TestStruct { id: 1.0 });
// After sent, without change, shouldn't send
assert_eq!(controller.on_watch(), false);
assert_eq!(controller.on_change(&TestStruct { id: 2.0 }), true);
controller.on_send(TestStruct { id: 2.0 });
}
#[test]
fn test_hanging_get_controller_with_change_function() {
let mut controller: HangingGetController<TestStruct, TestSender> =
HangingGetController::new(Box::new(Box::new(|old: &TestStruct, new: &TestStruct| {
old.id < new.id
})));
// Should send change on launch.
assert_eq!(controller.on_watch(), true);
assert_eq!(controller.on_change(&TestStruct { id: 1.0 }), true);
controller.on_send(TestStruct { id: 1.0 });
// Won't send if change function not triggered.
assert_eq!(controller.on_change(&TestStruct { id: 1.0 }), false);
assert_eq!(controller.on_watch(), false);
// Will send once we get a change.
assert_eq!(controller.on_change(&TestStruct { id: 2.0 }), true);
assert_eq!(controller.on_watch(), true);
}
}