| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <utility> |
| #include <vector> |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/async/default.h> |
| #include <lib/callback/capture.h> |
| #include <lib/fidl/cpp/binding.h> |
| #include <lib/fidl/cpp/optional.h> |
| #include <lib/fit/function.h> |
| #include <lib/fxl/macros.h> |
| #include <lib/fxl/strings/string_printf.h> |
| |
| #include "garnet/public/lib/callback/capture.h" |
| #include "gtest/gtest.h" |
| #include "peridot/bin/ledger/app/constants.h" |
| #include "peridot/bin/ledger/app/fidl/serialization_size.h" |
| #include "peridot/bin/ledger/fidl/include/types.h" |
| #include "peridot/bin/ledger/tests/integration/integration_test.h" |
| #include "peridot/bin/ledger/tests/integration/test_utils.h" |
| #include "peridot/lib/convert/convert.h" |
| |
| namespace ledger { |
| namespace { |
| |
| class PageWatcherIntegrationTest : public IntegrationTest { |
| public: |
| PageWatcherIntegrationTest() {} |
| ~PageWatcherIntegrationTest() override {} |
| |
| private: |
| FXL_DISALLOW_COPY_AND_ASSIGN(PageWatcherIntegrationTest); |
| }; |
| |
| class Watcher : public PageWatcher { |
| public: |
| explicit Watcher( |
| fidl::InterfaceRequest<PageWatcher> request, |
| fit::closure change_callback = [] {}) |
| : binding_(this, std::move(request)), |
| change_callback_(std::move(change_callback)) {} |
| |
| uint changes_seen = 0; |
| ResultState last_result_state_; |
| PageSnapshotPtr last_snapshot_; |
| PageChange last_page_change_; |
| |
| private: |
| // PageWatcher: |
| void OnChange(PageChange page_change, ResultState result_state, |
| OnChangeCallback callback) override { |
| changes_seen++; |
| last_result_state_ = result_state; |
| last_page_change_ = std::move(page_change); |
| last_snapshot_.Unbind(); |
| callback(last_snapshot_.NewRequest()); |
| change_callback_(); |
| } |
| |
| fidl::Binding<PageWatcher> binding_; |
| fit::closure change_callback_; |
| }; |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherSimple) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_); |
| PageChange change = std::move(watcher.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value)); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherDisconnectClient) { |
| auto instance = NewLedgerAppInstance(); |
| Status status; |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| auto watcher = std::make_unique<Watcher>(watcher_ptr.NewRequest(), |
| watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Make a change on the page and verify that it was received. |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher->changes_seen); |
| |
| // Make another change and disconnect the watcher immediately. |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("name"), convert::ToArray("Bob"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| watcher.reset(); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherDisconnectPage) { |
| auto instance = NewLedgerAppInstance(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| { |
| PagePtr page = instance->GetTestPage(); |
| PageSnapshotPtr snapshot; |
| Status status; |
| auto waiter = NewWaiter(); |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Queue many put operations on the page. |
| for (int i = 0; i < 1000; i++) { |
| page->Put(convert::ToArray("name"), convert::ToArray(std::to_string(i)), |
| [](Status status) { EXPECT_EQ(Status::OK, status); }); |
| } |
| } |
| // Page is out of scope now, but watcher is not. Verify that we don't crash |
| // and a change notification is still delivered. |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherDelete) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| auto waiter = NewWaiter(); |
| Status status; |
| page->Put(convert::ToArray("foo"), convert::ToArray("bar"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| auto watcher_waiter = NewWaiter(); |
| PageWatcherPtr watcher_ptr; |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| waiter = NewWaiter(); |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->Delete(convert::ToArray("foo"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| ASSERT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_); |
| PageChange change = std::move(watcher.last_page_change_); |
| EXPECT_EQ(0u, change.changed_entries->size()); |
| ASSERT_EQ(1u, change.deleted_keys->size()); |
| EXPECT_EQ("foo", convert::ToString(change.deleted_keys->at(0))); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherBigChangeSize) { |
| auto instance = NewLedgerAppInstance(); |
| // Put enough entries to ensure we will need more than one query to retrieve |
| // them. The number of entries that can be retrieved in one query is bound by |
| // |kMaxMessageHandles| and by size of the fidl message (determined by |
| // |kMaxInlineDataSize|), so we insert one entry more than that. |
| const size_t key_size = kMaxKeySize; |
| const size_t entry_size = fidl_serialization::GetEntrySize(key_size); |
| const size_t entry_count = |
| std::min(fidl_serialization::kMaxMessageHandles, |
| fidl_serialization::kMaxInlineDataSize / entry_size) + |
| 1; |
| const auto key_generator = [](size_t i) { |
| std::string prefix = fxl::StringPrintf("key%03" PRIuMAX, i); |
| std::string filler(key_size - prefix.size(), 'k'); |
| return prefix + filler; |
| }; |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| for (size_t i = 0; i < entry_count; ++i) { |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray(key_generator(i)), convert::ToArray("value"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| } |
| |
| RunLoopFor(zx::msec(100)); |
| EXPECT_EQ(0u, watcher.changes_seen); |
| |
| waiter = NewWaiter(); |
| page->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Get the first OnChagne call. |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(watcher.last_result_state_, ResultState::PARTIAL_STARTED); |
| PageChange change = std::move(watcher.last_page_change_); |
| size_t initial_size = change.changed_entries->size(); |
| for (size_t i = 0; i < initial_size; ++i) { |
| EXPECT_EQ(key_generator(i), |
| convert::ToString(change.changed_entries->at(i).key)); |
| EXPECT_EQ("value", ToString(change.changed_entries->at(i).value)); |
| EXPECT_EQ(Priority::EAGER, change.changed_entries->at(i).priority); |
| } |
| |
| // Get the second OnChagne call. |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(2u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::PARTIAL_COMPLETED, watcher.last_result_state_); |
| change = std::move(watcher.last_page_change_); |
| |
| ASSERT_EQ(entry_count, initial_size + change.changed_entries->size()); |
| for (size_t i = 0; i < change.changed_entries->size(); ++i) { |
| EXPECT_EQ(key_generator(i + initial_size), |
| convert::ToString(change.changed_entries->at(i).key)); |
| EXPECT_EQ("value", ToString(change.changed_entries->at(i).value)); |
| EXPECT_EQ(Priority::EAGER, change.changed_entries->at(i).priority); |
| } |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherBigChangeHandles) { |
| auto instance = NewLedgerAppInstance(); |
| size_t entry_count = 70; |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| for (size_t i = 0; i < entry_count; ++i) { |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray(fxl::StringPrintf("key%02" PRIuMAX, i)), |
| convert::ToArray("value"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| } |
| |
| RunLoopFor(zx::msec(100)); |
| EXPECT_EQ(0u, watcher.changes_seen); |
| |
| waiter = NewWaiter(); |
| page->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Get the first OnChagne call. |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(watcher.last_result_state_, ResultState::PARTIAL_STARTED); |
| PageChange change = std::move(watcher.last_page_change_); |
| size_t initial_size = change.changed_entries->size(); |
| for (size_t i = 0; i < initial_size; ++i) { |
| EXPECT_EQ(fxl::StringPrintf("key%02" PRIuMAX, i), |
| convert::ToString(change.changed_entries->at(i).key)); |
| EXPECT_EQ("value", ToString(change.changed_entries->at(i).value)); |
| EXPECT_EQ(Priority::EAGER, change.changed_entries->at(i).priority); |
| } |
| |
| // Get the second OnChagne call. |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(2u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::PARTIAL_COMPLETED, watcher.last_result_state_); |
| change = std::move(watcher.last_page_change_); |
| |
| ASSERT_EQ(entry_count, initial_size + change.changed_entries->size()); |
| for (size_t i = 0; i < change.changed_entries->size(); ++i) { |
| EXPECT_EQ(fxl::StringPrintf("key%02" PRIuMAX, i + initial_size), |
| convert::ToString(change.changed_entries->at(i).key)); |
| EXPECT_EQ("value", ToString(change.changed_entries->at(i).value)); |
| EXPECT_EQ(Priority::EAGER, change.changed_entries->at(i).priority); |
| } |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherSnapshot) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| Status status; |
| auto waiter = NewWaiter(); |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_); |
| auto entries = SnapshotGetEntries(this, &(watcher.last_snapshot_)); |
| ASSERT_EQ(1u, entries.size()); |
| EXPECT_EQ("name", convert::ToString(entries[0].key)); |
| EXPECT_EQ("Alice", ToString(entries[0].value)); |
| EXPECT_EQ(Priority::EAGER, entries[0].priority); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherTransaction) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| Status status; |
| auto waiter = NewWaiter(); |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| RunLoopFor(zx::msec(100)); |
| EXPECT_EQ(0u, watcher.changes_seen); |
| |
| waiter = NewWaiter(); |
| page->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_); |
| PageChange change = std::move(watcher.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value)); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherParallel) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page1 = instance->GetTestPage(); |
| auto waiter = NewWaiter(); |
| PageId test_page_id; |
| page1->GetId(callback::Capture(waiter->GetCallback(), &test_page_id)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| |
| PagePtr page2 = |
| instance->GetPage(fidl::MakeOptional(test_page_id), Status::OK); |
| |
| PageWatcherPtr watcher1_ptr; |
| auto watcher_waiter1 = NewWaiter(); |
| Watcher watcher1(watcher1_ptr.NewRequest(), watcher_waiter1->GetCallback()); |
| PageSnapshotPtr snapshot1; |
| Status status; |
| waiter = NewWaiter(); |
| page1->GetSnapshot(snapshot1.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher1_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| PageWatcherPtr watcher2_ptr; |
| auto watcher_waiter2 = NewWaiter(); |
| Watcher watcher2(watcher2_ptr.NewRequest(), watcher_waiter2->GetCallback()); |
| PageSnapshotPtr snapshot2; |
| waiter = NewWaiter(); |
| page2->GetSnapshot(snapshot2.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher2_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page1->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page1->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page2->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page2->Put(convert::ToArray("name"), convert::ToArray("Bob"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Verify that each change is seen by the right watcher. |
| waiter = NewWaiter(); |
| page1->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter1->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher1.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher1.last_result_state_); |
| PageChange change = std::move(watcher1.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value)); |
| |
| waiter = NewWaiter(); |
| page2->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter2->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher2.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher2.last_result_state_); |
| change = std::move(watcher2.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Bob", ToString(change.changed_entries->at(0).value)); |
| |
| RunLoopFor(zx::msec(100)); |
| |
| // A merge happens now. Only the first watcher should see a change. |
| ASSERT_TRUE(watcher_waiter1->RunUntilCalled()); |
| EXPECT_EQ(2u, watcher1.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher2.last_result_state_); |
| EXPECT_EQ(1u, watcher2.changes_seen); |
| |
| change = std::move(watcher1.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Bob", ToString(change.changed_entries->at(0).value)); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherEmptyTransaction) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| Watcher watcher(watcher_ptr.NewRequest()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| RunLoopFor(zx::msec(100)); |
| EXPECT_EQ(0u, watcher.changes_seen); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcher1Change2Pages) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page1 = instance->GetTestPage(); |
| auto waiter = NewWaiter(); |
| PageId test_page_id; |
| page1->GetId(callback::Capture(waiter->GetCallback(), &test_page_id)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| |
| PagePtr page2 = |
| instance->GetPage(fidl::MakeOptional(test_page_id), Status::OK); |
| |
| PageWatcherPtr watcher1_ptr; |
| auto watcher1_waiter = NewWaiter(); |
| Watcher watcher1(watcher1_ptr.NewRequest(), watcher1_waiter->GetCallback()); |
| PageSnapshotPtr snapshot1; |
| waiter = NewWaiter(); |
| Status status; |
| page1->GetSnapshot(snapshot1.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher1_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| auto watcher2_waiter = NewWaiter(); |
| PageWatcherPtr watcher2_ptr; |
| Watcher watcher2(watcher2_ptr.NewRequest(), watcher2_waiter->GetCallback()); |
| PageSnapshotPtr snapshot2; |
| waiter = NewWaiter(); |
| page2->GetSnapshot(snapshot2.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher2_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page1->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher1_waiter->RunUntilCalled()); |
| ASSERT_TRUE(watcher2_waiter->RunUntilCalled()); |
| |
| ASSERT_EQ(1u, watcher1.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher1.last_result_state_); |
| PageChange change = std::move(watcher1.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value)); |
| |
| ASSERT_EQ(1u, watcher2.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher2.last_result_state_); |
| change = std::move(watcher2.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("name", convert::ToString(change.changed_entries->at(0).key)); |
| EXPECT_EQ("Alice", ToString(change.changed_entries->at(0).value)); |
| } |
| |
| class WaitingWatcher : public PageWatcher { |
| public: |
| WaitingWatcher(fidl::InterfaceRequest<PageWatcher> request, |
| fit::closure change_callback) |
| : binding_(this, std::move(request)), |
| change_callback_(std::move(change_callback)) {} |
| |
| struct Change { |
| PageChange change; |
| OnChangeCallback callback; |
| |
| Change(PageChange change, OnChangeCallback callback) |
| : change(std::move(change)), callback(std::move(callback)) {} |
| }; |
| |
| std::vector<Change> changes; |
| |
| private: |
| // PageWatcher: |
| void OnChange(PageChange page_change, ResultState result_state, |
| OnChangeCallback callback) override { |
| FXL_DCHECK(result_state == ResultState::COMPLETED) |
| << "Handling OnChange pagination not implemented yet"; |
| changes.emplace_back(std::move(page_change), std::move(callback)); |
| change_callback_(); |
| } |
| |
| fidl::Binding<PageWatcher> binding_; |
| fit::closure change_callback_; |
| }; |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherConcurrentTransaction) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| WaitingWatcher watcher(watcher_ptr.NewRequest(), |
| watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("name"), convert::ToArray("Alice"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes.size()); |
| |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("foo"), convert::ToArray("bar"), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| auto transaction_waiter = NewWaiter(); |
| Status start_transaction_status; |
| page->StartTransaction(callback::Capture(transaction_waiter->GetCallback(), |
| &start_transaction_status)); |
| |
| RunLoopFor(zx::msec(100)); |
| |
| // We haven't sent the callback of the first change, so nothing should have |
| // happened. |
| EXPECT_EQ(1u, watcher.changes.size()); |
| EXPECT_TRUE(transaction_waiter->NotCalledYet()); |
| |
| watcher.changes[0].callback(nullptr); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(2u, watcher.changes.size()); |
| EXPECT_TRUE(transaction_waiter->NotCalledYet()); |
| |
| RunLoopFor(zx::msec(100)); |
| |
| // We haven't sent the callback of the first change, so nothing should have |
| // happened. |
| EXPECT_EQ(2u, watcher.changes.size()); |
| EXPECT_TRUE(transaction_waiter->NotCalledYet()); |
| |
| watcher.changes[1].callback(nullptr); |
| |
| ASSERT_TRUE(transaction_waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, start_transaction_status); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherPrefix) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), convert::ToArray("01"), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("00-key"), convert::ToArray("value-00"), |
| |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("01-key"), convert::ToArray("value-01"), |
| |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("02-key"), convert::ToArray("value-02"), |
| |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| waiter = NewWaiter(); |
| page->Commit(callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| ASSERT_TRUE(watcher_waiter->RunUntilCalled()); |
| EXPECT_EQ(1u, watcher.changes_seen); |
| EXPECT_EQ(ResultState::COMPLETED, watcher.last_result_state_); |
| PageChange change = std::move(watcher.last_page_change_); |
| ASSERT_EQ(1u, change.changed_entries->size()); |
| EXPECT_EQ("01-key", convert::ToString(change.changed_entries->at(0).key)); |
| } |
| |
| TEST_P(PageWatcherIntegrationTest, PageWatcherPrefixNoChange) { |
| auto instance = NewLedgerAppInstance(); |
| PagePtr page = instance->GetTestPage(); |
| PageWatcherPtr watcher_ptr; |
| auto watcher_waiter = NewWaiter(); |
| Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback()); |
| |
| PageSnapshotPtr snapshot; |
| auto waiter = NewWaiter(); |
| Status status; |
| page->GetSnapshot(snapshot.NewRequest(), convert::ToArray("01"), |
| std::move(watcher_ptr), |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->Put(convert::ToArray("00-key"), convert::ToArray("value-00"), |
| |
| callback::Capture(waiter->GetCallback(), &status)); |
| ASSERT_TRUE(waiter->RunUntilCalled()); |
| EXPECT_EQ(Status::OK, status); |
| |
| waiter = NewWaiter(); |
| page->StartTransaction(callback::Capture(waiter->GetCallback(), &status)); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Starting a transaction drains all watcher notifications, so if we were to |
| // be called, we would know at this point. |
| EXPECT_EQ(0u, watcher.changes_seen); |
| } |
| |
| INSTANTIATE_TEST_CASE_P( |
| PageWatcherIntegrationTest, PageWatcherIntegrationTest, |
| ::testing::ValuesIn(GetLedgerAppInstanceFactoryBuilders())); |
| |
| } // namespace |
| } // namespace ledger |