[ledger] Test that watch notifications are aggregated.

LE-661 #done

Test: PageWatcherIntegrationTest.PageWatcherAggregatedNotifications
Change-Id: Icfd27e45d390c6186fe2cabad1c7f97540d47d0f
diff --git a/bin/ledger/tests/integration/page_watcher_tests.cc b/bin/ledger/tests/integration/page_watcher_tests.cc
index 355102c..935da3a 100644
--- a/bin/ledger/tests/integration/page_watcher_tests.cc
+++ b/bin/ledger/tests/integration/page_watcher_tests.cc
@@ -15,6 +15,7 @@
 #include <lib/fxl/strings/string_printf.h>
 
 #include "garnet/public/lib/callback/capture.h"
+#include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "peridot/bin/ledger/app/constants.h"
 #include "peridot/bin/ledger/app/fidl/serialization_size.h"
@@ -26,6 +27,8 @@
 namespace ledger {
 namespace {
 
+using ::testing::SizeIs;
+
 class PageWatcherIntegrationTest : public IntegrationTest {
  public:
   PageWatcherIntegrationTest() {}
@@ -43,6 +46,14 @@
       : binding_(this, std::move(request)),
         change_callback_(std::move(change_callback)) {}
 
+  void DelayCallback(bool delay_callback) { delay_callback_ = delay_callback; }
+
+  void CallOnChangeCallback() {
+    FXL_CHECK(on_change_callback_);
+    on_change_callback_(last_snapshot_.NewRequest());
+    on_change_callback_ = nullptr;
+  }
+
   uint changes_seen = 0;
   ResultState last_result_state_;
   PageSnapshotPtr last_snapshot_;
@@ -56,11 +67,17 @@
     last_result_state_ = result_state;
     last_page_change_ = std::move(page_change);
     last_snapshot_.Unbind();
-    callback(last_snapshot_.NewRequest());
+    FXL_CHECK(!on_change_callback_);
+    on_change_callback_ = std::move(callback);
+    if (!delay_callback_) {
+      CallOnChangeCallback();
+    }
     change_callback_();
   }
 
   fidl::Binding<PageWatcher> binding_;
+  bool delay_callback_ = false;
+  OnChangeCallback on_change_callback_;
   fit::closure change_callback_;
 };
 
@@ -95,6 +112,65 @@
   EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value));
 }
 
+TEST_P(PageWatcherIntegrationTest, PageWatcherAggregatedNotifications) {
+  auto instance = NewLedgerAppInstance();
+  PagePtr page = instance->GetTestPage();
+  PageWatcherPtr watcher_ptr;
+  auto watcher_waiter = NewWaiter();
+  Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback());
+
+  // Call Put and don't let the OnChange callback be called, yet.
+  watcher.DelayCallback(true);
+  PageSnapshotPtr snapshot;
+  auto waiter = NewWaiter();
+  Status status;
+  page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0),
+                    std::move(watcher_ptr),
+                    callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  page->Put(convert::ToArray("key"), convert::ToArray("value1"),
+            callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  ASSERT_TRUE(watcher_waiter->RunUntilCalled());
+  EXPECT_EQ(1u, watcher.changes_seen);
+  EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_);
+  auto changed_entries = std::move(watcher.last_page_change_.changed_entries);
+  ASSERT_THAT(*changed_entries, SizeIs(1));
+  EXPECT_EQ("key", convert::ToString(changed_entries->at(0).key));
+  EXPECT_EQ("value1", ToString(changed_entries->at(0).value));
+
+  // Update the value of "key" initially to "value2" and then to "value3".
+  page->Put(convert::ToArray("key"), convert::ToArray("value2"),
+            callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+  page->Put(convert::ToArray("key"), convert::ToArray("value3"),
+            callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  // Since the previous OnChange callback hasn't been called yet, the next
+  // notification should be blocked.
+  ASSERT_FALSE(watcher_waiter->RunUntilCalled());
+
+  // Call the OnChange callback and expect a new OnChange call.
+  watcher.CallOnChangeCallback();
+  watcher.DelayCallback(false);
+  ASSERT_TRUE(watcher_waiter->RunUntilCalled());
+
+  // Only the last value of "key" should be found in the changed entries set.
+  EXPECT_EQ(2u, watcher.changes_seen);
+  EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_);
+  changed_entries = std::move(watcher.last_page_change_.changed_entries);
+  ASSERT_THAT(*changed_entries, SizeIs(1));
+  EXPECT_EQ("key", convert::ToString(changed_entries->at(0).key));
+  EXPECT_EQ("value3", ToString(changed_entries->at(0).value));
+}
+
 TEST_P(PageWatcherIntegrationTest, PageWatcherDisconnectClient) {
   auto instance = NewLedgerAppInstance();
   Status status;