[ledger] Test that watch notifications are aggregated.
LE-661 #done
Test: PageWatcherIntegrationTest.PageWatcherAggregatedNotifications
Change-Id: Icfd27e45d390c6186fe2cabad1c7f97540d47d0f
diff --git a/bin/ledger/tests/integration/page_watcher_tests.cc b/bin/ledger/tests/integration/page_watcher_tests.cc
index 355102c..935da3a 100644
--- a/bin/ledger/tests/integration/page_watcher_tests.cc
+++ b/bin/ledger/tests/integration/page_watcher_tests.cc
@@ -15,6 +15,7 @@
#include <lib/fxl/strings/string_printf.h>
#include "garnet/public/lib/callback/capture.h"
+#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "peridot/bin/ledger/app/constants.h"
#include "peridot/bin/ledger/app/fidl/serialization_size.h"
@@ -26,6 +27,8 @@
namespace ledger {
namespace {
+using ::testing::SizeIs;
+
class PageWatcherIntegrationTest : public IntegrationTest {
public:
PageWatcherIntegrationTest() {}
@@ -43,6 +46,14 @@
: binding_(this, std::move(request)),
change_callback_(std::move(change_callback)) {}
+ void DelayCallback(bool delay_callback) { delay_callback_ = delay_callback; }
+
+ void CallOnChangeCallback() {
+ FXL_CHECK(on_change_callback_);
+ on_change_callback_(last_snapshot_.NewRequest());
+ on_change_callback_ = nullptr;
+ }
+
uint changes_seen = 0;
ResultState last_result_state_;
PageSnapshotPtr last_snapshot_;
@@ -56,11 +67,17 @@
last_result_state_ = result_state;
last_page_change_ = std::move(page_change);
last_snapshot_.Unbind();
- callback(last_snapshot_.NewRequest());
+ FXL_CHECK(!on_change_callback_);
+ on_change_callback_ = std::move(callback);
+ if (!delay_callback_) {
+ CallOnChangeCallback();
+ }
change_callback_();
}
fidl::Binding<PageWatcher> binding_;
+ bool delay_callback_ = false;
+ OnChangeCallback on_change_callback_;
fit::closure change_callback_;
};
@@ -95,6 +112,65 @@
EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value));
}
+TEST_P(PageWatcherIntegrationTest, PageWatcherAggregatedNotifications) {
+ auto instance = NewLedgerAppInstance();
+ PagePtr page = instance->GetTestPage();
+ PageWatcherPtr watcher_ptr;
+ auto watcher_waiter = NewWaiter();
+ Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback());
+
+ // Call Put and don't let the OnChange callback be called, yet.
+ watcher.DelayCallback(true);
+ PageSnapshotPtr snapshot;
+ auto waiter = NewWaiter();
+ Status status;
+ page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0),
+ std::move(watcher_ptr),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ page->Put(convert::ToArray("key"), convert::ToArray("value1"),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ ASSERT_TRUE(watcher_waiter->RunUntilCalled());
+ EXPECT_EQ(1u, watcher.changes_seen);
+ EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_);
+ auto changed_entries = std::move(watcher.last_page_change_.changed_entries);
+ ASSERT_THAT(*changed_entries, SizeIs(1));
+ EXPECT_EQ("key", convert::ToString(changed_entries->at(0).key));
+ EXPECT_EQ("value1", ToString(changed_entries->at(0).value));
+
+ // Update the value of "key" initially to "value2" and then to "value3".
+ page->Put(convert::ToArray("key"), convert::ToArray("value2"),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+ page->Put(convert::ToArray("key"), convert::ToArray("value3"),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ // Since the previous OnChange callback hasn't been called yet, the next
+ // notification should be blocked.
+ ASSERT_FALSE(watcher_waiter->RunUntilCalled());
+
+ // Call the OnChange callback and expect a new OnChange call.
+ watcher.CallOnChangeCallback();
+ watcher.DelayCallback(false);
+ ASSERT_TRUE(watcher_waiter->RunUntilCalled());
+
+ // Only the last value of "key" should be found in the changed entries set.
+ EXPECT_EQ(2u, watcher.changes_seen);
+ EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_);
+ changed_entries = std::move(watcher.last_page_change_.changed_entries);
+ ASSERT_THAT(*changed_entries, SizeIs(1));
+ EXPECT_EQ("key", convert::ToString(changed_entries->at(0).key));
+ EXPECT_EQ("value3", ToString(changed_entries->at(0).value));
+}
+
TEST_P(PageWatcherIntegrationTest, PageWatcherDisconnectClient) {
auto instance = NewLedgerAppInstance();
Status status;