| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <dirent.h> |
| #include <lib/async/cpp/task.h> |
| #include <lib/callback/capture.h> |
| #include <lib/callback/set_when_called.h> |
| #include <lib/fit/function.h> |
| #include <lib/fsl/socket/strings.h> |
| #include <lib/fsl/vmo/strings.h> |
| #include <lib/timekeeper/test_clock.h> |
| |
| #include <chrono> |
| #include <memory> |
| #include <queue> |
| #include <set> |
| |
| #include "gmock/gmock.h" |
| #include "gtest/gtest.h" |
| #include "peridot/lib/scoped_tmpfs/scoped_tmpfs.h" |
| #include "src/ledger/bin/encryption/fake/fake_encryption_service.h" |
| #include "src/ledger/bin/encryption/primitives/hash.h" |
| #include "src/ledger/bin/storage/impl/btree/encoding.h" |
| #include "src/ledger/bin/storage/impl/btree/tree_node.h" |
| #include "src/ledger/bin/storage/impl/commit_impl.h" |
| #include "src/ledger/bin/storage/impl/commit_random_impl.h" |
| #include "src/ledger/bin/storage/impl/constants.h" |
| #include "src/ledger/bin/storage/impl/journal_impl.h" |
| #include "src/ledger/bin/storage/impl/leveldb.h" |
| #include "src/ledger/bin/storage/impl/object_digest.h" |
| #include "src/ledger/bin/storage/impl/object_impl.h" |
| #include "src/ledger/bin/storage/impl/page_db_empty_impl.h" |
| #include "src/ledger/bin/storage/impl/page_storage_impl.h" |
| #include "src/ledger/bin/storage/impl/split.h" |
| #include "src/ledger/bin/storage/impl/storage_test_utils.h" |
| #include "src/ledger/bin/storage/public/commit_watcher.h" |
| #include "src/ledger/bin/storage/public/constants.h" |
| #include "src/ledger/bin/storage/public/types.h" |
| #include "src/ledger/bin/testing/test_with_environment.h" |
| #include "src/ledger/lib/coroutine/coroutine.h" |
| #include "src/lib/files/directory.h" |
| #include "src/lib/files/file.h" |
| #include "src/lib/files/path.h" |
| #include "src/lib/fxl/arraysize.h" |
| #include "src/lib/fxl/macros.h" |
| #include "src/lib/fxl/memory/ref_ptr.h" |
| #include "src/lib/fxl/strings/string_printf.h" |
| |
| namespace storage { |
| |
| class PageStorageImplAccessorForTest { |
| public: |
| static void AddPiece(const std::unique_ptr<PageStorageImpl>& storage, |
| std::unique_ptr<Piece> piece, ChangeSource source, |
| IsObjectSynced is_object_synced, |
| ObjectReferencesAndPriority references, |
| fit::function<void(Status)> callback) { |
| storage->AddPiece(std::move(piece), source, is_object_synced, |
| std::move(references), std::move(callback)); |
| } |
| |
| static PageDb& GetDb(const std::unique_ptr<PageStorageImpl>& storage) { |
| return *(storage->db_); |
| } |
| }; |
| |
| namespace { |
| |
| using ::coroutine::CoroutineHandler; |
| using ::testing::ElementsAre; |
| using ::testing::IsEmpty; |
| using ::testing::UnorderedElementsAreArray; |
| |
| std::vector<PageStorage::CommitIdAndBytes> CommitAndBytesFromCommit( |
| const Commit& commit) { |
| std::vector<PageStorage::CommitIdAndBytes> result; |
| result.emplace_back(commit.GetId(), commit.GetStorageBytes().ToString()); |
| return result; |
| } |
| |
| // DataSource that returns an error on the callback to Get(). |
| class FakeErrorDataSource : public DataSource { |
| public: |
| explicit FakeErrorDataSource(async_dispatcher_t* dispatcher) |
| : dispatcher_(dispatcher) {} |
| |
| uint64_t GetSize() override { return 1; } |
| |
| void Get(fit::function<void(std::unique_ptr<DataChunk>, Status)> callback) |
| override { |
| async::PostTask(dispatcher_, [callback = std::move(callback)] { |
| callback(nullptr, DataSource::Status::ERROR); |
| }); |
| } |
| |
| async_dispatcher_t* const dispatcher_; |
| }; |
| |
| class FakeCommitWatcher : public CommitWatcher { |
| public: |
| FakeCommitWatcher() {} |
| |
| void OnNewCommits(const std::vector<std::unique_ptr<const Commit>>& commits, |
| ChangeSource source) override { |
| ++commit_count; |
| last_commit_id = commits.back()->GetId(); |
| last_source = source; |
| } |
| |
| int commit_count = 0; |
| CommitId last_commit_id; |
| ChangeSource last_source; |
| }; |
| |
| class DelayingFakeSyncDelegate : public PageSyncDelegate { |
| public: |
| explicit DelayingFakeSyncDelegate( |
| fit::function<void(fit::closure)> on_get_object) |
| : on_get_object_(std::move(on_get_object)) {} |
| |
| void AddObject(ObjectIdentifier object_identifier, const std::string& value) { |
| digest_to_value_[std::move(object_identifier)] = value; |
| } |
| |
| void GetObject(ObjectIdentifier object_identifier, |
| fit::function<void(Status, ChangeSource, IsObjectSynced, |
| std::unique_ptr<DataSource::DataChunk>)> |
| callback) override { |
| auto value_found = digest_to_value_.find(object_identifier); |
| if (value_found == digest_to_value_.end()) { |
| callback(Status::INTERNAL_NOT_FOUND, ChangeSource::CLOUD, |
| IsObjectSynced::NO, nullptr); |
| return; |
| } |
| std::string& value = value_found->second; |
| object_requests.insert(std::move(object_identifier)); |
| on_get_object_([callback = std::move(callback), value] { |
| callback(Status::OK, ChangeSource::CLOUD, IsObjectSynced::YES, |
| DataSource::DataChunk::Create(value)); |
| }); |
| } |
| |
| size_t GetNumberOfObjectsStored() { return digest_to_value_.size(); } |
| |
| std::set<ObjectIdentifier> object_requests; |
| |
| private: |
| fit::function<void(fit::closure)> on_get_object_; |
| std::map<ObjectIdentifier, std::string> digest_to_value_; |
| }; |
| |
| class FakeSyncDelegate : public DelayingFakeSyncDelegate { |
| public: |
| FakeSyncDelegate() |
| : DelayingFakeSyncDelegate([](fit::closure callback) { callback(); }) {} |
| }; |
| |
| // Implements |Init()|, |CreateJournalId() and |StartBatch()| and fails with a |
| // |NOT_IMPLEMENTED| error in all other cases. |
| class FakePageDbImpl : public PageDbEmptyImpl { |
| public: |
| FakePageDbImpl(rng::Random* random) : random_(random) {} |
| |
| Status StartBatch(CoroutineHandler* /*handler*/, |
| std::unique_ptr<PageDb::Batch>* batch) override { |
| *batch = std::make_unique<FakePageDbImpl>(random_); |
| return Status::OK; |
| } |
| |
| private: |
| rng::Random* const random_; |
| }; |
| |
| // Shim for LevelDB that allows to selectively fail some calls. |
| class ControlledLevelDb : public Db { |
| public: |
| explicit ControlledLevelDb(async_dispatcher_t* dispatcher, |
| ledger::DetachedPath db_path) |
| : leveldb_(dispatcher, db_path) {} |
| |
| class ControlledBatch : public Batch { |
| public: |
| explicit ControlledBatch(ControlledLevelDb* controller, |
| std::unique_ptr<Batch> batch) |
| : controller_(controller), batch_(std::move(batch)) {} |
| |
| // Batch: |
| Status Put(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView key, |
| fxl::StringView value) override { |
| return batch_->Put(handler, key, value); |
| } |
| |
| Status Delete(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView key) override { |
| return batch_->Delete(handler, key); |
| } |
| |
| Status Execute(coroutine::CoroutineHandler* handler) override { |
| if (controller_->fail_batch_execute_after_ == 0) { |
| return Status::IO_ERROR; |
| } |
| if (controller_->fail_batch_execute_after_ > 0) { |
| controller_->fail_batch_execute_after_--; |
| } |
| return batch_->Execute(handler); |
| } |
| |
| private: |
| ControlledLevelDb* controller_; |
| std::unique_ptr<Batch> batch_; |
| }; |
| |
| Status Init() { return leveldb_.Init(); } |
| |
| // Sets the number of calls to |Batch::Execute()|, for batches generated by |
| // this object, after which all calls would fail. It is used to simulate write |
| // failures. |
| // If |fail_batch_execute_after| is negative, or this method is not called, |
| // |Batch::Execute()| calls will nevef fail. |
| void SetFailBatchExecuteAfter(int fail_batch_execute_after) { |
| fail_batch_execute_after_ = fail_batch_execute_after; |
| } |
| |
| // Db: |
| Status StartBatch(coroutine::CoroutineHandler* handler, |
| std::unique_ptr<Batch>* batch) override { |
| std::unique_ptr<Batch> inner_batch; |
| Status status = leveldb_.StartBatch(handler, &inner_batch); |
| *batch = std::make_unique<ControlledBatch>(this, std::move(inner_batch)); |
| return status; |
| } |
| |
| Status Get(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView key, std::string* value) override { |
| return leveldb_.Get(handler, key, value); |
| } |
| |
| Status HasKey(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView key) override { |
| return leveldb_.HasKey(handler, key); |
| } |
| |
| Status GetObject(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView key, |
| ObjectIdentifier object_identifier, |
| std::unique_ptr<const Piece>* piece) override { |
| return leveldb_.GetObject(handler, key, std::move(object_identifier), |
| piece); |
| } |
| |
| Status GetByPrefix(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView prefix, |
| std::vector<std::string>* key_suffixes) override { |
| return leveldb_.GetByPrefix(handler, prefix, key_suffixes); |
| } |
| |
| Status GetEntriesByPrefix( |
| coroutine::CoroutineHandler* handler, convert::ExtendedStringView prefix, |
| std::vector<std::pair<std::string, std::string>>* entries) override { |
| return leveldb_.GetEntriesByPrefix(handler, prefix, entries); |
| } |
| |
| Status GetIteratorAtPrefix( |
| coroutine::CoroutineHandler* handler, convert::ExtendedStringView prefix, |
| std::unique_ptr<Iterator<const std::pair<convert::ExtendedStringView, |
| convert::ExtendedStringView>>>* |
| iterator) override { |
| return leveldb_.GetIteratorAtPrefix(handler, prefix, iterator); |
| } |
| |
| private: |
| // Number of calls to |Batch::Execute()| before they start failing. If |
| // negative, |Batch::Execute()| calls will never fail. |
| int fail_batch_execute_after_ = -1; |
| LevelDb leveldb_; |
| }; |
| |
| class PageStorageTest : public ledger::TestWithEnvironment { |
| public: |
| PageStorageTest() : encryption_service_(dispatcher()) {} |
| |
| ~PageStorageTest() override {} |
| |
| // Test: |
| void SetUp() override { ResetStorage(); } |
| |
| void ResetStorage() { |
| if (storage_) { |
| storage_->SetSyncDelegate(nullptr); |
| storage_.reset(); |
| } |
| tmpfs_ = std::make_unique<scoped_tmpfs::ScopedTmpFS>(); |
| PageId id = RandomString(environment_.random(), 10); |
| auto db = std::make_unique<ControlledLevelDb>( |
| dispatcher(), ledger::DetachedPath(tmpfs_->root_fd())); |
| leveldb_ = db.get(); |
| ASSERT_EQ(Status::OK, db->Init()); |
| storage_ = std::make_unique<PageStorageImpl>( |
| &environment_, &encryption_service_, std::move(db), id); |
| |
| bool called; |
| Status status; |
| storage_->Init( |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(id, storage_->GetId()); |
| } |
| |
| protected: |
| PageStorage* GetStorage() { return storage_.get(); } |
| |
| std::vector<std::unique_ptr<const Commit>> GetHeads() { |
| std::vector<std::unique_ptr<const Commit>> heads; |
| Status status = storage_->GetHeadCommits(&heads); |
| EXPECT_EQ(Status::OK, status); |
| return heads; |
| } |
| |
| std::unique_ptr<const Commit> GetFirstHead() { |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| EXPECT_FALSE(heads.empty()); |
| return std::move(heads[0]); |
| } |
| |
| std::unique_ptr<const Commit> GetCommit(const CommitId& id) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit; |
| storage_->GetCommit(id, callback::Capture(callback::SetWhenCalled(&called), |
| &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| return commit; |
| } |
| |
| std::unique_ptr<const Commit> TryCommitFromSync() { |
| ObjectIdentifier root_identifier; |
| EXPECT_TRUE(GetEmptyNodeIdentifier(&root_identifier)); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit = |
| CommitImpl::FromContentAndParents(environment_.clock(), storage_.get(), |
| root_identifier, std::move(parent)); |
| |
| bool called; |
| Status status; |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| return commit; |
| } |
| |
| // Returns an empty pointer if |CommitJournal| times out. |
| FXL_WARN_UNUSED_RESULT std::unique_ptr<const Commit> TryCommitJournal( |
| std::unique_ptr<Journal> journal, Status expected_status) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal( |
| std::move(journal), |
| callback::Capture(callback::SetWhenCalled(&called), &status, &commit)); |
| |
| RunLoopUntilIdle(); |
| EXPECT_EQ(expected_status, status); |
| if (!called) { |
| return std::unique_ptr<const Commit>(); |
| } |
| return commit; |
| } |
| |
| // Returns an empty pointer if |TryCommitJournal| failed. |
| FXL_WARN_UNUSED_RESULT std::unique_ptr<const Commit> TryCommitFromLocal( |
| int keys, size_t min_key_size = 0) { |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| |
| for (int i = 0; i < keys; ++i) { |
| auto key = fxl::StringPrintf("key%05d", i); |
| if (key.size() < min_key_size) { |
| key.resize(min_key_size); |
| } |
| journal->Put(key, RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| } |
| |
| journal->Delete("key_does_not_exist"); |
| |
| std::unique_ptr<const Commit> commit = |
| TryCommitJournal(std::move(journal), Status::OK); |
| if (!commit) { |
| return commit; |
| } |
| |
| // Check the contents. |
| std::vector<Entry> entries = GetCommitContents(*commit); |
| EXPECT_EQ(static_cast<size_t>(keys), entries.size()); |
| for (int i = 0; i < keys; ++i) { |
| auto key = fxl::StringPrintf("key%05d", i); |
| if (key.size() < min_key_size) { |
| key.resize(min_key_size); |
| } |
| EXPECT_EQ(key, entries[i].key); |
| } |
| |
| return commit; |
| } |
| |
| void TryAddFromLocal(std::string content, |
| const ObjectIdentifier& expected_identifier) { |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, DataSource::Create(std::move(content)), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &object_identifier)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(expected_identifier, object_identifier); |
| } |
| |
| std::unique_ptr<const Object> TryGetObject( |
| const ObjectIdentifier& object_identifier, PageStorage::Location location, |
| Status expected_status = Status::OK) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Object> object; |
| storage_->GetObject( |
| object_identifier, location, |
| callback::Capture(callback::SetWhenCalled(&called), &status, &object)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(expected_status, status); |
| return object; |
| } |
| |
| fsl::SizedVmo TryGetObjectPart(const ObjectIdentifier& object_identifier, |
| size_t offset, size_t max_size, |
| PageStorage::Location location, |
| Status expected_status = Status::OK) { |
| bool called; |
| Status status; |
| fsl::SizedVmo vmo; |
| storage_->GetObjectPart( |
| object_identifier, offset, max_size, location, |
| callback::Capture(callback::SetWhenCalled(&called), &status, &vmo)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(expected_status, status); |
| return vmo; |
| } |
| |
| std::unique_ptr<const Piece> TryGetPiece( |
| const ObjectIdentifier& object_identifier, |
| Status expected_status = Status::OK) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Piece> piece; |
| storage_->GetPiece( |
| object_identifier, |
| callback::Capture(callback::SetWhenCalled(&called), &status, &piece)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(expected_status, status); |
| return piece; |
| } |
| |
| std::vector<Entry> GetCommitContents(const Commit& commit) { |
| bool called; |
| Status status; |
| std::vector<Entry> result; |
| auto on_next = [&result](Entry e) { |
| result.push_back(e); |
| return true; |
| }; |
| storage_->GetCommitContents( |
| commit, "", std::move(on_next), |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| return result; |
| } |
| |
| std::vector<std::unique_ptr<const Commit>> GetUnsyncedCommits() { |
| bool called; |
| Status status; |
| std::vector<std::unique_ptr<const Commit>> commits; |
| storage_->GetUnsyncedCommits( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &commits)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| return commits; |
| } |
| |
| Status WriteObject( |
| CoroutineHandler* handler, ObjectData* data, |
| PageDbObjectStatus object_status = PageDbObjectStatus::TRANSIENT, |
| const ObjectReferencesAndPriority& references = {}) { |
| return PageStorageImplAccessorForTest::GetDb(storage_).WriteObject( |
| handler, DataChunkPiece(data->object_identifier, data->ToChunk()), |
| object_status, references); |
| } |
| |
| Status ReadObject(CoroutineHandler* handler, |
| ObjectIdentifier object_identifier, |
| std::unique_ptr<const Piece>* piece) { |
| return PageStorageImplAccessorForTest::GetDb(storage_).ReadObject( |
| handler, object_identifier, piece); |
| } |
| |
| // Checks that |object_identifier| is referenced by |expected_references|. |
| void CheckInboundObjectReferences( |
| CoroutineHandler* handler, ObjectIdentifier object_identifier, |
| ObjectReferencesAndPriority expected_references) { |
| ObjectReferencesAndPriority stored_references; |
| ASSERT_EQ(Status::OK, |
| PageStorageImplAccessorForTest::GetDb(storage_) |
| .GetInboundObjectReferences(handler, object_identifier, |
| &stored_references)); |
| EXPECT_THAT(stored_references, |
| UnorderedElementsAreArray(expected_references)); |
| } |
| |
| // Checks that |object_identifier| is referenced by |expected_references|. |
| void CheckInboundCommitReferences( |
| CoroutineHandler* handler, ObjectIdentifier object_identifier, |
| const std::vector<CommitId>& expected_references) { |
| std::vector<CommitId> stored_references; |
| ASSERT_EQ(Status::OK, |
| PageStorageImplAccessorForTest::GetDb(storage_) |
| .GetInboundCommitReferences(handler, object_identifier, |
| &stored_references)); |
| EXPECT_THAT(stored_references, |
| UnorderedElementsAreArray(expected_references)); |
| } |
| |
| ::testing::AssertionResult ObjectIsUntracked( |
| ObjectIdentifier object_identifier, bool expected_untracked) { |
| bool called; |
| Status status; |
| bool is_untracked; |
| storage_->ObjectIsUntracked( |
| object_identifier, callback::Capture(callback::SetWhenCalled(&called), |
| &status, &is_untracked)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "ObjectIsUntracked for id " << object_identifier |
| << " didn't return."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "ObjectIsUntracked for id " << object_identifier |
| << " returned status " << status; |
| } |
| if (is_untracked != expected_untracked) { |
| return ::testing::AssertionFailure() |
| << "For id " << object_identifier |
| << " expected to find the object " << (is_untracked ? "un" : "") |
| << "tracked, but was " << (expected_untracked ? "un" : "") |
| << "tracked, instead."; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ::testing::AssertionResult IsPieceSynced(ObjectIdentifier object_identifier, |
| bool expected_synced) { |
| bool called; |
| Status status; |
| bool is_synced; |
| storage_->IsPieceSynced(object_identifier, |
| callback::Capture(callback::SetWhenCalled(&called), |
| &status, &is_synced)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "IsPieceSynced for id " << object_identifier |
| << " didn't return."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "IsPieceSynced for id " << object_identifier |
| << " returned status " << status; |
| } |
| if (is_synced != expected_synced) { |
| return ::testing::AssertionFailure() |
| << "For id " << object_identifier |
| << " expected to find the object " << (is_synced ? "un" : "") |
| << "synced, but was " << (expected_synced ? "un" : "") |
| << "synced, instead."; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ::testing::AssertionResult CreateNodeFromIdentifier( |
| ObjectIdentifier identifier, |
| std::unique_ptr<const btree::TreeNode>* node) { |
| bool called; |
| Status status; |
| std::unique_ptr<const btree::TreeNode> result; |
| btree::TreeNode::FromIdentifier( |
| GetStorage(), std::move(identifier), |
| callback::Capture(callback::SetWhenCalled(&called), &status, &result)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "TreeNode::FromIdentifier callback was not executed."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "TreeNode::FromIdentifier failed with status " << status; |
| } |
| node->swap(result); |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ::testing::AssertionResult CreateNodeFromEntries( |
| const std::vector<Entry>& entries, |
| const std::map<size_t, ObjectIdentifier>& children, |
| std::unique_ptr<const btree::TreeNode>* node) { |
| bool called; |
| Status status; |
| ObjectIdentifier identifier; |
| btree::TreeNode::FromEntries( |
| GetStorage(), 0u, entries, children, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &identifier)); |
| RunLoopUntilIdle(); |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "TreeNode::FromEntries callback was not executed."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "TreeNode::FromEntries failed with status " << status; |
| } |
| return CreateNodeFromIdentifier(identifier, node); |
| } |
| |
| ::testing::AssertionResult GetEmptyNodeIdentifier( |
| ObjectIdentifier* empty_node_identifier) { |
| bool called; |
| Status status; |
| btree::TreeNode::Empty(GetStorage(), |
| callback::Capture(callback::SetWhenCalled(&called), |
| &status, empty_node_identifier)); |
| RunLoopUntilIdle(); |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "TreeNode::Empty callback was not executed."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "TreeNode::Empty failed with status " << status; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ControlledLevelDb* leveldb_; |
| std::unique_ptr<scoped_tmpfs::ScopedTmpFS> tmpfs_; |
| encryption::FakeEncryptionService encryption_service_; |
| std::unique_ptr<PageStorageImpl> storage_; |
| |
| private: |
| FXL_DISALLOW_COPY_AND_ASSIGN(PageStorageTest); |
| }; |
| |
| TEST_F(PageStorageTest, AddGetLocalCommits) { |
| // Search for a commit id that doesn't exist and see the error. |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> lookup_commit; |
| storage_->GetCommit(RandomCommitId(environment_.random()), |
| callback::Capture(callback::SetWhenCalled(&called), |
| &status, &lookup_commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status); |
| EXPECT_FALSE(lookup_commit); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| CommitId id = commit->GetId(); |
| ObjectIdentifier root_node = commit->GetRootIdentifier(); |
| std::string storage_bytes = commit->GetStorageBytes().ToString(); |
| |
| // Search for a commit that exists and check the content. |
| storage_->AddCommitFromLocal( |
| std::move(commit), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| std::unique_ptr<const Commit> found = GetCommit(id); |
| EXPECT_EQ(storage_bytes, found->GetStorageBytes()); |
| } |
| |
| TEST_F(PageStorageTest, AddLocalCommitsReferences) { |
| // Create two commits pointing to the same object identifier and check that |
| // both are stored as inbound references of said object. |
| ObjectIdentifier root_node = RandomObjectIdentifier(environment_.random()); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit1 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_node, std::move(parent)); |
| CommitId id1 = commit1->GetId(); |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commit1), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| parent.clear(); |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit2 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_node, std::move(parent)); |
| CommitId id2 = commit2->GetId(); |
| storage_->AddCommitFromLocal( |
| std::move(commit2), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| RunInCoroutine([this, root_node, id1, id2](CoroutineHandler* handler) { |
| CheckInboundCommitReferences(handler, root_node, {id1, id2}); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddCommitFromLocalDoNotMarkUnsynedAlreadySyncedCommit) { |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| CommitId id = commit->GetId(); |
| std::string storage_bytes = commit->GetStorageBytes().ToString(); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| commit->Clone(), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| auto commits = GetUnsyncedCommits(); |
| EXPECT_EQ(1u, commits.size()); |
| EXPECT_EQ(id, commits[0]->GetId()); |
| |
| storage_->MarkCommitSynced( |
| id, callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Add the commit again. |
| storage_->AddCommitFromLocal( |
| commit->Clone(), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| // Check that the commit is not marked unsynced. |
| commits = GetUnsyncedCommits(); |
| EXPECT_EQ(0u, commits.size()); |
| } |
| |
| TEST_F(PageStorageTest, AddCommitBeforeParentsError) { |
| // Try to add a commit before its parent and see the error. |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back( |
| std::make_unique<CommitRandomImpl>(environment_.random())); |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commit), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status); |
| } |
| |
| TEST_F(PageStorageTest, AddCommitsOutOfOrderError) { |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| auto commit1 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_identifier, std::move(parent)); |
| parent.clear(); |
| parent.push_back(commit1->Clone()); |
| auto commit2 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_identifier, std::move(parent)); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit2->GetId(), |
| commit2->GetStorageBytes().ToString()); |
| commits_and_bytes.emplace_back(commit1->GetId(), |
| commit1->GetStorageBytes().ToString()); |
| |
| bool called; |
| Status status; |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status); |
| } |
| |
| TEST_F(PageStorageTest, AddGetSyncedCommits) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| FakeSyncDelegate sync; |
| storage_->SetSyncDelegate(&sync); |
| |
| // Create a node with 2 values. |
| ObjectData lazy_value("Some data", InlineBehavior::PREVENT); |
| ObjectData eager_value("More data", InlineBehavior::PREVENT); |
| std::vector<Entry> entries = { |
| Entry{"key0", lazy_value.object_identifier, KeyPriority::LAZY}, |
| Entry{"key1", eager_value.object_identifier, KeyPriority::EAGER}, |
| }; |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| // Add the three objects to FakeSyncDelegate. |
| sync.AddObject(lazy_value.object_identifier, lazy_value.value); |
| sync.AddObject(eager_value.object_identifier, eager_value.value); |
| |
| { |
| // Ensure root_object is not kept, as the storage it depends on will be |
| // deleted. |
| std::unique_ptr<const Object> root_object = |
| TryGetObject(root_identifier, PageStorage::Location::NETWORK); |
| |
| fxl::StringView root_data; |
| ASSERT_EQ(Status::OK, root_object->GetData(&root_data)); |
| sync.AddObject(root_identifier, root_data.ToString()); |
| } |
| |
| // Reset and clear the storage. |
| ResetStorage(); |
| storage_->SetSyncDelegate(&sync); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit = |
| CommitImpl::FromContentAndParents(environment_.clock(), storage_.get(), |
| root_identifier, std::move(parent)); |
| CommitId id = commit->GetId(); |
| |
| // Adding the commit should only request the tree node and the eager value. |
| sync.object_requests.clear(); |
| bool called; |
| Status status; |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(2u, sync.object_requests.size()); |
| EXPECT_TRUE(sync.object_requests.find(root_identifier) != |
| sync.object_requests.end()); |
| EXPECT_TRUE(sync.object_requests.find(eager_value.object_identifier) != |
| sync.object_requests.end()); |
| |
| // Adding the same commit twice should not request any objects from sync. |
| sync.object_requests.clear(); |
| storage_->AddCommitsFromSync( |
| CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_TRUE(sync.object_requests.empty()); |
| |
| std::unique_ptr<const Commit> found = GetCommit(id); |
| EXPECT_EQ(commit->GetStorageBytes(), found->GetStorageBytes()); |
| |
| // Check that the commit is not marked as unsynced. |
| std::vector<std::unique_ptr<const Commit>> commits = GetUnsyncedCommits(); |
| EXPECT_TRUE(commits.empty()); |
| }); |
| } |
| |
| // Check that receiving a remote commit that is already present locally but not |
| // synced will mark the commit as synced. |
| TEST_F(PageStorageTest, MarkRemoteCommitSynced) { |
| FakeSyncDelegate sync; |
| storage_->SetSyncDelegate(&sync); |
| |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| std::unique_ptr<const Object> root_object = |
| TryGetObject(root_identifier, PageStorage::Location::NETWORK); |
| |
| fxl::StringView root_data; |
| ASSERT_EQ(Status::OK, root_object->GetData(&root_data)); |
| sync.AddObject(root_identifier, root_data.ToString()); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_identifier, std::move(parent)); |
| CommitId id = commit->GetId(); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commit), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| EXPECT_EQ(1u, GetUnsyncedCommits().size()); |
| storage_->GetCommit(id, callback::Capture(callback::SetWhenCalled(&called), |
| &status, &commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit->GetId(), |
| commit->GetStorageBytes().ToString()); |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| EXPECT_EQ(0u, GetUnsyncedCommits().size()); |
| } |
| |
| TEST_F(PageStorageTest, SyncCommits) { |
| std::vector<std::unique_ptr<const Commit>> commits = GetUnsyncedCommits(); |
| |
| // Initially there should be no unsynced commits. |
| EXPECT_TRUE(commits.empty()); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| // After adding a commit it should marked as unsynced. |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| CommitId id = commit->GetId(); |
| std::string storage_bytes = commit->GetStorageBytes().ToString(); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commit), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| commits = GetUnsyncedCommits(); |
| EXPECT_EQ(1u, commits.size()); |
| EXPECT_EQ(storage_bytes, commits[0]->GetStorageBytes()); |
| |
| // Mark it as synced. |
| storage_->MarkCommitSynced( |
| id, callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| commits = GetUnsyncedCommits(); |
| EXPECT_TRUE(commits.empty()); |
| } |
| |
| TEST_F(PageStorageTest, HeadCommits) { |
| // Every page should have one initial head commit. |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| EXPECT_EQ(1u, heads.size()); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| // Adding a new commit with the previous head as its parent should replace the |
| // old head. |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| CommitId id = commit->GetId(); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commit), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| heads = GetHeads(); |
| ASSERT_EQ(1u, heads.size()); |
| EXPECT_EQ(id, heads[0]->GetId()); |
| } |
| |
| TEST_F(PageStorageTest, OrderHeadCommitsByTimestampThenId) { |
| timekeeper::TestClock test_clock; |
| // We generate a few timestamps: some random, and a few equal constants to |
| // test ID ordering. |
| std::vector<zx::time_utc> timestamps(7); |
| std::generate(timestamps.begin(), timestamps.end(), |
| [this] { return environment_.random()->Draw<zx::time_utc>(); }); |
| timestamps.insert(timestamps.end(), {zx::time_utc(1000), zx::time_utc(1000), |
| zx::time_utc(1000)}); |
| |
| // We first generate the commits. The will be shuffled at a later time. |
| std::vector<std::unique_ptr<const Commit>> commits; |
| std::vector<std::pair<zx::time_utc, CommitId>> sorted_commits; |
| |
| for (size_t i = 0; i < timestamps.size(); i++) { |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| |
| test_clock.Set(timestamps[i]); |
| // Adding a new commit with the previous head as its parent should replace |
| // the old head. |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| &test_clock, storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| |
| sorted_commits.emplace_back(timestamps[i], commit->GetId()); |
| commits.push_back(std::move(commit)); |
| } |
| |
| // Insert the commits in a random order. |
| auto rng = environment_.random()->NewBitGenerator<uint64_t>(); |
| std::shuffle(commits.begin(), commits.end(), rng); |
| for (size_t i = 0; i < commits.size(); i++) { |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commits[i]), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| } |
| |
| // Check that GetHeadCommitIds returns sorted commits. |
| std::vector<std::unique_ptr<const Commit>> heads; |
| Status status = storage_->GetHeadCommits(&heads); |
| EXPECT_EQ(Status::OK, status); |
| std::sort(sorted_commits.begin(), sorted_commits.end()); |
| for (size_t i = 0; i < sorted_commits.size(); ++i) { |
| EXPECT_EQ(sorted_commits[i].second, heads[i]->GetId()); |
| } |
| } |
| |
| TEST_F(PageStorageTest, CreateJournals) { |
| // Explicit journal. |
| auto left_commit = TryCommitFromLocal(5); |
| ASSERT_TRUE(left_commit); |
| auto right_commit = TryCommitFromLocal(10); |
| ASSERT_TRUE(right_commit); |
| |
| // Journal for merge commit. |
| std::unique_ptr<Journal> journal = storage_->StartMergeCommit( |
| std::move(left_commit), std::move(right_commit)); |
| } |
| |
| TEST_F(PageStorageTest, CreateJournalHugeNode) { |
| std::unique_ptr<const Commit> commit = TryCommitFromLocal(500, 1024); |
| ASSERT_TRUE(commit); |
| std::vector<Entry> entries = GetCommitContents(*commit); |
| |
| EXPECT_EQ(500u, entries.size()); |
| for (const auto& entry : entries) { |
| EXPECT_EQ(1024u, entry.key.size()); |
| } |
| |
| // Check that all node's parts are marked as unsynced. |
| bool called; |
| Status status; |
| std::vector<ObjectIdentifier> object_identifiers; |
| storage_->GetUnsyncedPieces(callback::Capture( |
| callback::SetWhenCalled(&called), &status, &object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| bool found_index = false; |
| std::set<ObjectIdentifier> unsynced_identifiers(object_identifiers.begin(), |
| object_identifiers.end()); |
| for (const auto& identifier : unsynced_identifiers) { |
| EXPECT_FALSE(GetObjectDigestInfo(identifier.object_digest()).is_inlined()); |
| |
| if (GetObjectDigestInfo(identifier.object_digest()).piece_type == |
| PieceType::INDEX) { |
| found_index = true; |
| std::set<ObjectIdentifier> sub_identifiers; |
| IterationStatus iteration_status = IterationStatus::ERROR; |
| CollectPieces( |
| identifier, |
| [this](ObjectIdentifier identifier, |
| fit::function<void(Status, fxl::StringView)> callback) { |
| storage_->GetPiece( |
| std::move(identifier), |
| [callback = std::move(callback)]( |
| Status status, std::unique_ptr<const Piece> piece) { |
| if (status != Status::OK) { |
| callback(status, ""); |
| return; |
| } |
| callback(status, piece->GetData()); |
| }); |
| }, |
| [&iteration_status, &sub_identifiers](IterationStatus status, |
| ObjectIdentifier identifier) { |
| iteration_status = status; |
| if (status == IterationStatus::IN_PROGRESS) { |
| EXPECT_TRUE(sub_identifiers.insert(identifier).second); |
| } |
| return true; |
| }); |
| RunLoopUntilIdle(); |
| EXPECT_EQ(IterationStatus::DONE, iteration_status); |
| for (const auto& identifier : sub_identifiers) { |
| EXPECT_EQ(1u, unsynced_identifiers.count(identifier)); |
| } |
| } |
| } |
| EXPECT_TRUE(found_index); |
| } |
| |
| TEST_F(PageStorageTest, DestroyUncommittedJournal) { |
| // It is not an error if a journal is not committed or rolled back. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| } |
| |
| TEST_F(PageStorageTest, AddObjectFromLocal) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(data.object_identifier, object_identifier); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(Status::OK, ReadObject(handler, object_identifier, &piece)); |
| EXPECT_EQ(data.value, piece->GetData()); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddSmallObjectFromLocal) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("Some data"); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(data.object_identifier, object_identifier); |
| EXPECT_EQ(data.value, |
| ExtractObjectDigestData(object_identifier.object_digest())); |
| |
| std::unique_ptr<const Piece> piece; |
| EXPECT_EQ(Status::INTERNAL_NOT_FOUND, |
| ReadObject(handler, object_identifier, &piece)); |
| // Inline objects do not need to ever be tracked. |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, false)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, InterruptAddObjectFromLocal) { |
| ObjectData data("Some data"); |
| |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| [](Status returned_status, ObjectIdentifier object_identifier) {}); |
| |
| // Checking that we do not crash when deleting the storage while an AddObject |
| // call is in progress. |
| storage_.reset(); |
| } |
| |
| TEST_F(PageStorageTest, AddObjectFromLocalError) { |
| auto data_source = std::make_unique<FakeErrorDataSource>(dispatcher()); |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, std::move(data_source), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::IO_ERROR, status); |
| } |
| |
| TEST_F(PageStorageTest, AddLocalPiece) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| const ObjectIdentifier reference = |
| RandomObjectIdentifier(environment_.random()); |
| |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data.ToPiece(), ChangeSource::LOCAL, IsObjectSynced::NO, |
| {{reference.object_digest(), KeyPriority::LAZY}}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(Status::OK, ReadObject(handler, data.object_identifier, &piece)); |
| EXPECT_EQ(data.value, piece->GetData()); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, false)); |
| |
| CheckInboundObjectReferences( |
| handler, reference, |
| {{data.object_identifier.object_digest(), KeyPriority::LAZY}}); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddSyncPiece) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| const ObjectIdentifier reference = |
| RandomObjectIdentifier(environment_.random()); |
| |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data.ToPiece(), ChangeSource::CLOUD, IsObjectSynced::YES, |
| {{reference.object_digest(), KeyPriority::EAGER}}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(Status::OK, ReadObject(handler, data.object_identifier, &piece)); |
| EXPECT_EQ(data.value, piece->GetData()); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, true)); |
| |
| CheckInboundObjectReferences( |
| handler, reference, |
| {{data.object_identifier.object_digest(), KeyPriority::EAGER}}); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddP2PPiece) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data.ToPiece(), ChangeSource::P2P, IsObjectSynced::NO, {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(Status::OK, ReadObject(handler, data.object_identifier, &piece)); |
| EXPECT_EQ(data.value, piece->GetData()); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, false)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObject) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("Some data"); |
| ASSERT_EQ(Status::OK, WriteObject(handler, &data)); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(data.object_identifier, PageStorage::Location::LOCAL); |
| EXPECT_EQ(data.object_identifier, object->GetIdentifier()); |
| fxl::StringView object_data; |
| ASSERT_EQ(Status::OK, object->GetData(&object_data)); |
| EXPECT_EQ(data.value, convert::ToString(object_data)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPart) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("_Some data_"); |
| ASSERT_EQ(Status::OK, WriteObject(handler, &data)); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart( |
| data.object_identifier, 1, data.size - 2, PageStorage::Location::LOCAL); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data.value.substr(1, data.size - 2), |
| convert::ToString(object_part_data)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartLargeOffset) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("_Some data_"); |
| ASSERT_EQ(Status::OK, WriteObject(handler, &data)); |
| |
| fsl::SizedVmo object_part = |
| TryGetObjectPart(data.object_identifier, data.size * 2, data.size, |
| PageStorage::Location::LOCAL); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ("", convert::ToString(object_part_data)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartLargeMaxSize) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("_Some data_"); |
| ASSERT_EQ(Status::OK, WriteObject(handler, &data)); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart( |
| data.object_identifier, 0, data.size * 2, PageStorage::Location::LOCAL); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data.value, convert::ToString(object_part_data)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartNegativeArgs) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data("_Some data_"); |
| ASSERT_EQ(Status::OK, WriteObject(handler, &data)); |
| |
| fsl::SizedVmo object_part = |
| TryGetObjectPart(data.object_identifier, -data.size + 1, -1, |
| PageStorage::Location::LOCAL); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data.value.substr(1, data.size - 1), |
| convert::ToString(object_part_data)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetLargeObjectPart) { |
| std::string data_str = RandomString(environment_.random(), 65536); |
| size_t offset = 6144; |
| size_t size = 49152; |
| |
| ObjectData data(std::move(data_str), ObjectType::TREE_NODE, |
| InlineBehavior::PREVENT); |
| |
| ASSERT_EQ( |
| PieceType::INDEX, |
| GetObjectDigestInfo(data.object_identifier.object_digest()).piece_type); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::TREE_NODE, data.ToDataSource(), /*tree_references=*/{}, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(data.object_identifier, object_identifier); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size, |
| PageStorage::Location::LOCAL); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| std::string result_str = convert::ToString(object_part_data); |
| EXPECT_EQ(size, result_str.size()); |
| EXPECT_EQ(data.value.substr(offset, size), result_str); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartFromSync) { |
| ObjectData data("_Some data_", InlineBehavior::PREVENT); |
| FakeSyncDelegate sync; |
| sync.AddObject(data.object_identifier, data.value); |
| storage_->SetSyncDelegate(&sync); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart( |
| data.object_identifier, 1, data.size - 2, PageStorage::Location::NETWORK); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data.value.substr(1, data.size - 2), |
| convert::ToString(object_part_data)); |
| |
| storage_->SetSyncDelegate(nullptr); |
| ObjectData other_data("_Some other data_", InlineBehavior::PREVENT); |
| TryGetObjectPart(other_data.object_identifier, 1, other_data.size - 2, |
| PageStorage::Location::LOCAL, Status::INTERNAL_NOT_FOUND); |
| TryGetObjectPart(other_data.object_identifier, 1, other_data.size - 2, |
| PageStorage::Location::NETWORK, Status::NETWORK_ERROR); |
| } |
| |
| TEST_F(PageStorageTest, GetHugeObjectPartFromSync) { |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| int64_t offset = 28672; |
| int64_t size = 128; |
| |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, [&sync](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()) |
| .is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), |
| piece->GetData().ToString()); |
| }); |
| ASSERT_EQ(PieceType::INDEX, |
| GetObjectDigestInfo(object_identifier.object_digest()).piece_type); |
| storage_->SetSyncDelegate(&sync); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size, |
| PageStorage::Location::NETWORK); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data_str.substr(offset, size), convert::ToString(object_part_data)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| } |
| |
| TEST_F(PageStorageTest, GetHugeObjectPartFromSyncNegativeOffset) { |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| int64_t offset = -28672; |
| int64_t size = 128; |
| |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, [&sync](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()) |
| .is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), |
| piece->GetData().ToString()); |
| }); |
| ASSERT_EQ(PieceType::INDEX, |
| GetObjectDigestInfo(object_identifier.object_digest()).piece_type); |
| storage_->SetSyncDelegate(&sync); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size, |
| PageStorage::Location::NETWORK); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data_str.substr(data_str.size() + offset, size), |
| convert::ToString(object_part_data)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectFromSync) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| FakeSyncDelegate sync; |
| sync.AddObject(data.object_identifier, data.value); |
| storage_->SetSyncDelegate(&sync); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(data.object_identifier, PageStorage::Location::NETWORK); |
| EXPECT_EQ(data.object_identifier, object->GetIdentifier()); |
| fxl::StringView object_data; |
| ASSERT_EQ(Status::OK, object->GetData(&object_data)); |
| EXPECT_EQ(data.value, convert::ToString(object_data)); |
| |
| storage_->SetSyncDelegate(nullptr); |
| ObjectData other_data("Some other data", InlineBehavior::PREVENT); |
| TryGetObject(other_data.object_identifier, PageStorage::Location::LOCAL, |
| Status::INTERNAL_NOT_FOUND); |
| TryGetObject(other_data.object_identifier, PageStorage::Location::NETWORK, |
| Status::NETWORK_ERROR); |
| } |
| |
| TEST_F(PageStorageTest, FullDownloadAfterPartial) { |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| int64_t offset = 0; |
| int64_t size = 128; |
| |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, [&sync](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()) |
| .is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), |
| piece->GetData().ToString()); |
| }); |
| ASSERT_EQ(PieceType::INDEX, |
| GetObjectDigestInfo(object_identifier.object_digest()).piece_type); |
| storage_->SetSyncDelegate(&sync); |
| |
| fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size, |
| PageStorage::Location::NETWORK); |
| std::string object_part_data; |
| ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(data_str.substr(offset, size), convert::ToString(object_part_data)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| TryGetObject(object_identifier, PageStorage::LOCAL, |
| Status::INTERNAL_NOT_FOUND); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(object_identifier, PageStorage::Location::NETWORK); |
| fxl::StringView object_data; |
| ASSERT_EQ(Status::OK, object->GetData(&object_data)); |
| EXPECT_EQ(data_str, convert::ToString(object_data)); |
| EXPECT_EQ(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| TryGetObject(object_identifier, PageStorage::LOCAL, Status::OK); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectFromSyncWrongId) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| ObjectData data2("Some data2", InlineBehavior::PREVENT); |
| FakeSyncDelegate sync; |
| sync.AddObject(data.object_identifier, data2.value); |
| storage_->SetSyncDelegate(&sync); |
| |
| TryGetObject(data.object_identifier, PageStorage::Location::NETWORK, |
| Status::OBJECT_DIGEST_MISMATCH); |
| } |
| |
| TEST_F(PageStorageTest, AddAndGetHugeTreenodeFromLocal) { |
| std::string data_str = RandomString(environment_.random(), 65536); |
| |
| ObjectData data(std::move(data_str), ObjectType::TREE_NODE, |
| InlineBehavior::PREVENT); |
| // An identifier to another tree node pointed at by the current one. |
| const ObjectIdentifier tree_reference = |
| RandomObjectIdentifier(environment_.random()); |
| ASSERT_EQ( |
| ObjectType::TREE_NODE, |
| GetObjectDigestInfo(data.object_identifier.object_digest()).object_type); |
| ASSERT_EQ( |
| PieceType::INDEX, |
| GetObjectDigestInfo(data.object_identifier.object_digest()).piece_type); |
| ASSERT_EQ( |
| InlinedPiece::NO, |
| GetObjectDigestInfo(data.object_identifier.object_digest()).inlined); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::TREE_NODE, data.ToDataSource(), |
| {{tree_reference.object_digest(), KeyPriority::LAZY}}, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| EXPECT_EQ(Status::OK, status); |
| // This ensures that the object is encoded with an index, as we checked the |
| // piece type of |data.object_identifier| above. |
| EXPECT_EQ(data.object_identifier, object_identifier); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(object_identifier, PageStorage::Location::LOCAL); |
| fxl::StringView content; |
| ASSERT_EQ(Status::OK, object->GetData(&content)); |
| EXPECT_EQ(data.value, content); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| |
| // Check that the index piece obtained at |object_identifier| is different |
| // from the object itself, ie. that some splitting occurred. |
| std::unique_ptr<const Piece> piece = TryGetPiece(object_identifier); |
| EXPECT_NE(content, piece->GetData()); |
| |
| RunInCoroutine([this, piece = std::move(piece), tree_reference, |
| object_identifier](CoroutineHandler* handler) { |
| // Check tree reference. |
| CheckInboundObjectReferences( |
| handler, tree_reference, |
| {{object_identifier.object_digest(), KeyPriority::LAZY}}); |
| // Check piece references. |
| ASSERT_EQ( |
| Status::OK, |
| ForEachIndexChild( |
| piece->GetData(), [this, handler, object_identifier]( |
| ObjectIdentifier piece_identifier) { |
| CheckInboundObjectReferences( |
| handler, piece_identifier, |
| {{object_identifier.object_digest(), KeyPriority::EAGER}}); |
| return Status::OK; |
| })); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, UnsyncedPieces) { |
| ObjectData data_array[] = { |
| ObjectData("Some data", InlineBehavior::PREVENT), |
| ObjectData("Some more data", InlineBehavior::PREVENT), |
| ObjectData("Even more data", InlineBehavior::PREVENT), |
| }; |
| constexpr size_t size = arraysize(data_array); |
| for (auto& data : data_array) { |
| TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, false)); |
| } |
| |
| std::vector<CommitId> commits; |
| |
| // Add one key-value pair per commit. |
| for (size_t i = 0; i < size; ++i) { |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| |
| journal->Put(fxl::StringPrintf("key%lu", i), |
| data_array[i].object_identifier, KeyPriority::LAZY); |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| commits.push_back(GetFirstHead()->GetId()); |
| } |
| |
| // GetUnsyncedPieces should return the ids of all objects: 3 values and |
| // the 3 root nodes of the 3 commits. |
| bool called; |
| Status status; |
| std::vector<ObjectIdentifier> object_identifiers; |
| storage_->GetUnsyncedPieces(callback::Capture( |
| callback::SetWhenCalled(&called), &status, &object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(6u, object_identifiers.size()); |
| for (size_t i = 0; i < size; ++i) { |
| std::unique_ptr<const Commit> commit = GetCommit(commits[i]); |
| EXPECT_TRUE(std::find_if(object_identifiers.begin(), |
| object_identifiers.end(), |
| [&](const auto& identifier) { |
| return identifier == commit->GetRootIdentifier(); |
| }) != object_identifiers.end()); |
| } |
| for (auto& data : data_array) { |
| EXPECT_TRUE(std::find(object_identifiers.begin(), object_identifiers.end(), |
| data.object_identifier) != object_identifiers.end()); |
| } |
| |
| // Mark the 2nd object as synced. We now expect to still find the 2 unsynced |
| // values and the (also unsynced) root node. |
| storage_->MarkPieceSynced( |
| data_array[1].object_identifier, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| std::vector<ObjectIdentifier> objects; |
| storage_->GetUnsyncedPieces( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &objects)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(5u, objects.size()); |
| std::unique_ptr<const Commit> commit = GetCommit(commits[2]); |
| EXPECT_TRUE(std::find(objects.begin(), objects.end(), |
| commit->GetRootIdentifier()) != objects.end()); |
| EXPECT_TRUE(std::find(objects.begin(), objects.end(), |
| data_array[0].object_identifier) != objects.end()); |
| EXPECT_TRUE(std::find(objects.begin(), objects.end(), |
| data_array[2].object_identifier) != objects.end()); |
| } |
| |
| TEST_F(PageStorageTest, PageIsSynced) { |
| ObjectData data_array[] = { |
| ObjectData("Some data", InlineBehavior::PREVENT), |
| ObjectData("Some more data", InlineBehavior::PREVENT), |
| ObjectData("Even more data", InlineBehavior::PREVENT), |
| }; |
| constexpr size_t size = arraysize(data_array); |
| for (auto& data : data_array) { |
| TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, false)); |
| } |
| |
| // The objects have not been added in a commit: there is nothing to sync and |
| // the page is considered synced. |
| bool called; |
| Status status; |
| bool is_synced; |
| storage_->IsSynced( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(true, is_synced); |
| |
| // Add all objects in one commit. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| for (size_t i = 0; i < size; ++i) { |
| journal->Put(fxl::StringPrintf("key%lu", i), |
| data_array[i].object_identifier, KeyPriority::LAZY); |
| } |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| CommitId commit_id = GetFirstHead()->GetId(); |
| |
| // After commiting, the page is unsynced. |
| called = false; |
| storage_->IsSynced( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_FALSE(is_synced); |
| // Mark objects (and the root tree node) as synced and expect that the page is |
| // still unsynced. |
| for (const auto& data : data_array) { |
| called = false; |
| storage_->MarkPieceSynced( |
| data.object_identifier, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| } |
| |
| called = false; |
| storage_->MarkPieceSynced( |
| GetFirstHead()->GetRootIdentifier(), |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| called = false; |
| storage_->IsSynced( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_FALSE(is_synced); |
| |
| // Mark the commit as synced and expect that the page is synced. |
| called = false; |
| storage_->MarkCommitSynced( |
| commit_id, callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| called = false; |
| storage_->IsSynced( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_TRUE(is_synced); |
| |
| // All objects should be synced now. |
| for (auto& data : data_array) { |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, true)); |
| } |
| } |
| |
| TEST_F(PageStorageTest, PageIsMarkedOnlineAfterCloudSync) { |
| // Check that the page is initially not marked as online. |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Create a local commit: the page is still not online. |
| int size = 10; |
| std::unique_ptr<const Commit> commit = TryCommitFromLocal(size); |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Mark all objects as synced. The page is still not online: other devices |
| // will only see these objects if the corresponding commit is also synced to |
| // the cloud. |
| bool called; |
| Status status; |
| std::vector<ObjectIdentifier> object_identifiers; |
| storage_->GetUnsyncedPieces(callback::Capture( |
| callback::SetWhenCalled(&called), &status, &object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| for (ObjectIdentifier& object_identifier : object_identifiers) { |
| storage_->MarkPieceSynced( |
| object_identifier, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| } |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Mark the commit as synced. The page should now be marked as online. |
| storage_->MarkCommitSynced( |
| commit->GetId(), |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_TRUE(storage_->IsOnline()); |
| } |
| |
| TEST_F(PageStorageTest, PageIsMarkedOnlineSyncWithPeer) { |
| // Check that the page is initially not marked as online. |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Mark the page as synced to peer and expect that it is marked as online. |
| bool called; |
| Status status; |
| storage_->MarkSyncedToPeer( |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_TRUE(storage_->IsOnline()); |
| } |
| |
| TEST_F(PageStorageTest, PageIsEmpty) { |
| ObjectData value("Some value", InlineBehavior::PREVENT); |
| bool called; |
| Status status; |
| bool is_empty; |
| |
| // Initially the page is empty. |
| storage_->IsEmpty( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_empty)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_TRUE(is_empty); |
| |
| // Add an entry and expect that the page is not empty any more. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", value.object_identifier, KeyPriority::LAZY); |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| |
| storage_->IsEmpty( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_empty)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_FALSE(is_empty); |
| |
| // Clear the page and expect it to be empty again. |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Delete("key"); |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| |
| storage_->IsEmpty( |
| callback::Capture(callback::SetWhenCalled(&called), &status, &is_empty)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_TRUE(is_empty); |
| } |
| |
| TEST_F(PageStorageTest, UntrackedObjectsSimple) { |
| ObjectData data("Some data", InlineBehavior::PREVENT); |
| |
| // The object is not yet created and its id should not be marked as untracked. |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| |
| // After creating the object it should be marked as untracked. |
| TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| |
| // After adding the object in a commit it should not be untracked any more. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", data.object_identifier, KeyPriority::EAGER); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| } |
| |
| TEST_F(PageStorageTest, UntrackedObjectsComplex) { |
| ObjectData data_array[] = { |
| ObjectData("Some data", InlineBehavior::PREVENT), |
| ObjectData("Some more data", InlineBehavior::PREVENT), |
| ObjectData("Even more data", InlineBehavior::PREVENT), |
| }; |
| for (auto& data : data_array) { |
| TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| } |
| |
| // Add a first commit containing data_array[0]. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key0", data_array[0].object_identifier, KeyPriority::LAZY); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[0].object_identifier, true)); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[0].object_identifier, false)); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[1].object_identifier, true)); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[2].object_identifier, true)); |
| |
| // Create a second commit. After calling Put for "key1" for the second time |
| // data_array[1] is no longer part of this commit: it should remain |
| // untracked after committing. |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key1", data_array[1].object_identifier, KeyPriority::LAZY); |
| journal->Put("key2", data_array[2].object_identifier, KeyPriority::LAZY); |
| journal->Put("key1", data_array[2].object_identifier, KeyPriority::LAZY); |
| journal->Put("key3", data_array[0].object_identifier, KeyPriority::LAZY); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[0].object_identifier, false)); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[1].object_identifier, true)); |
| EXPECT_TRUE(ObjectIsUntracked(data_array[2].object_identifier, false)); |
| } |
| |
| TEST_F(PageStorageTest, CommitWatchers) { |
| FakeCommitWatcher watcher; |
| storage_->AddCommitWatcher(&watcher); |
| |
| // Add a watcher and receive the commit. |
| auto expected = TryCommitFromLocal(10); |
| ASSERT_TRUE(expected); |
| EXPECT_EQ(1, watcher.commit_count); |
| EXPECT_EQ(expected->GetId(), watcher.last_commit_id); |
| EXPECT_EQ(ChangeSource::LOCAL, watcher.last_source); |
| |
| // Add a second watcher. |
| FakeCommitWatcher watcher2; |
| storage_->AddCommitWatcher(&watcher2); |
| expected = TryCommitFromLocal(10); |
| ASSERT_TRUE(expected); |
| EXPECT_EQ(2, watcher.commit_count); |
| EXPECT_EQ(expected->GetId(), watcher.last_commit_id); |
| EXPECT_EQ(ChangeSource::LOCAL, watcher.last_source); |
| EXPECT_EQ(1, watcher2.commit_count); |
| EXPECT_EQ(expected->GetId(), watcher2.last_commit_id); |
| EXPECT_EQ(ChangeSource::LOCAL, watcher2.last_source); |
| |
| // Remove one watcher. |
| storage_->RemoveCommitWatcher(&watcher2); |
| expected = TryCommitFromSync(); |
| EXPECT_EQ(3, watcher.commit_count); |
| EXPECT_EQ(expected->GetId(), watcher.last_commit_id); |
| EXPECT_EQ(ChangeSource::CLOUD, watcher.last_source); |
| EXPECT_EQ(1, watcher2.commit_count); |
| } |
| |
| // If a commit fails to be persisted on disk, no notification should be sent. |
| TEST_F(PageStorageTest, CommitFailNoWatchNotification) { |
| FakeCommitWatcher watcher; |
| storage_->AddCommitWatcher(&watcher); |
| EXPECT_EQ(0, watcher.commit_count); |
| |
| // Create the commit. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key1", RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| |
| leveldb_->SetFailBatchExecuteAfter(1); |
| std::unique_ptr<const Commit> commit = |
| TryCommitJournal(std::move(journal), Status::IO_ERROR); |
| |
| // The watcher is not called. |
| EXPECT_EQ(0, watcher.commit_count); |
| } |
| |
| TEST_F(PageStorageTest, SyncMetadata) { |
| std::vector<std::pair<fxl::StringView, fxl::StringView>> keys_and_values = { |
| {"foo1", "foo2"}, {"bar1", " bar2 "}}; |
| for (const auto& key_and_value : keys_and_values) { |
| auto key = key_and_value.first; |
| auto value = key_and_value.second; |
| bool called; |
| Status status; |
| std::string returned_value; |
| storage_->GetSyncMetadata( |
| key, callback::Capture(callback::SetWhenCalled(&called), &status, |
| &returned_value)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status); |
| |
| storage_->SetSyncMetadata( |
| key, value, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| storage_->GetSyncMetadata( |
| key, callback::Capture(callback::SetWhenCalled(&called), &status, |
| &returned_value)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| EXPECT_EQ(value, returned_value); |
| } |
| } |
| |
| TEST_F(PageStorageTest, AddMultipleCommitsFromSync) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| FakeSyncDelegate sync; |
| storage_->SetSyncDelegate(&sync); |
| |
| // Build the commit Tree with: |
| // 0 |
| // | |
| // 1 2 |
| std::vector<ObjectIdentifier> object_identifiers; |
| object_identifiers.resize(3); |
| for (size_t i = 0; i < object_identifiers.size(); ++i) { |
| ObjectData value("value" + std::to_string(i), InlineBehavior::PREVENT); |
| std::vector<Entry> entries = {Entry{"key" + std::to_string(i), |
| value.object_identifier, |
| KeyPriority::EAGER}}; |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node)); |
| object_identifiers[i] = node->GetIdentifier(); |
| sync.AddObject(value.object_identifier, value.value); |
| std::unique_ptr<const Object> root_object = |
| TryGetObject(object_identifiers[i], PageStorage::Location::NETWORK); |
| fxl::StringView root_data; |
| ASSERT_EQ(Status::OK, root_object->GetData(&root_data)); |
| sync.AddObject(object_identifiers[i], root_data.ToString()); |
| } |
| |
| // Reset and clear the storage. |
| ResetStorage(); |
| storage_->SetSyncDelegate(&sync); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit0 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), object_identifiers[0], |
| std::move(parent)); |
| parent.clear(); |
| |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit1 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), object_identifiers[1], |
| std::move(parent)); |
| parent.clear(); |
| |
| parent.emplace_back(commit1->Clone()); |
| std::unique_ptr<const Commit> commit2 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), object_identifiers[2], |
| std::move(parent)); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit0->GetId(), |
| commit0->GetStorageBytes().ToString()); |
| commits_and_bytes.emplace_back(commit1->GetId(), |
| commit1->GetStorageBytes().ToString()); |
| commits_and_bytes.emplace_back(commit2->GetId(), |
| commit2->GetStorageBytes().ToString()); |
| |
| bool called; |
| Status status; |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| EXPECT_EQ(4u, sync.object_requests.size()); |
| EXPECT_NE(sync.object_requests.find(object_identifiers[0]), |
| sync.object_requests.end()); |
| EXPECT_EQ(sync.object_requests.find(object_identifiers[1]), |
| sync.object_requests.end()); |
| EXPECT_NE(sync.object_requests.find(object_identifiers[2]), |
| sync.object_requests.end()); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, Generation) { |
| std::unique_ptr<const Commit> commit1 = TryCommitFromLocal(3); |
| ASSERT_TRUE(commit1); |
| EXPECT_EQ(1u, commit1->GetGeneration()); |
| |
| std::unique_ptr<const Commit> commit2 = TryCommitFromLocal(3); |
| ASSERT_TRUE(commit2); |
| EXPECT_EQ(2u, commit2->GetGeneration()); |
| |
| std::unique_ptr<Journal> journal = |
| storage_->StartMergeCommit(std::move(commit1), std::move(commit2)); |
| |
| std::unique_ptr<const Commit> commit3 = |
| TryCommitJournal(std::move(journal), Status::OK); |
| ASSERT_TRUE(commit3); |
| EXPECT_EQ(3u, commit3->GetGeneration()); |
| } |
| |
| TEST_F(PageStorageTest, GetEntryFromCommit) { |
| int size = 10; |
| std::unique_ptr<const Commit> commit = TryCommitFromLocal(size); |
| ASSERT_TRUE(commit); |
| |
| bool called; |
| Status status; |
| Entry entry; |
| storage_->GetEntryFromCommit( |
| *commit, "key not found", |
| callback::Capture(callback::SetWhenCalled(&called), &status, &entry)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| ASSERT_EQ(Status::KEY_NOT_FOUND, status); |
| |
| for (int i = 0; i < size; ++i) { |
| std::string expected_key = fxl::StringPrintf("key%05d", i); |
| storage_->GetEntryFromCommit( |
| *commit, expected_key, |
| callback::Capture(callback::SetWhenCalled(&called), &status, &entry)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| ASSERT_EQ(Status::OK, status); |
| EXPECT_EQ(expected_key, entry.key); |
| } |
| } |
| |
| TEST_F(PageStorageTest, WatcherForReEntrantCommits) { |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| |
| std::unique_ptr<const Commit> commit1 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| CommitId id1 = commit1->GetId(); |
| |
| parent.clear(); |
| parent.emplace_back(commit1->Clone()); |
| |
| std::unique_ptr<const Commit> commit2 = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), |
| RandomObjectIdentifier(environment_.random()), std::move(parent)); |
| CommitId id2 = commit2->GetId(); |
| |
| FakeCommitWatcher watcher; |
| storage_->AddCommitWatcher(&watcher); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitFromLocal( |
| std::move(commit1), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| storage_->AddCommitFromLocal( |
| std::move(commit2), {}, |
| callback::Capture(callback::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| EXPECT_EQ(2, watcher.commit_count); |
| EXPECT_EQ(id2, watcher.last_commit_id); |
| } |
| |
| TEST_F(PageStorageTest, NoOpCommit) { |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| ASSERT_FALSE(heads.empty()); |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| |
| // Create a key, and delete it. |
| journal->Put("key", RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| journal->Delete("key"); |
| |
| // Commit the journal. |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal( |
| std::move(journal), |
| callback::Capture(callback::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| // Commiting a no-op commit should result in a successful status, but a null |
| // commit. |
| ASSERT_EQ(Status::OK, status); |
| ASSERT_FALSE(commit); |
| } |
| |
| // Check that receiving a remote commit and commiting locally at the same time |
| // do not prevent the commit to be marked as unsynced. |
| TEST_F(PageStorageTest, MarkRemoteCommitSyncedRace) { |
| bool sync_delegate_called; |
| fit::closure sync_delegate_call; |
| DelayingFakeSyncDelegate sync(callback::Capture( |
| callback::SetWhenCalled(&sync_delegate_called), &sync_delegate_call)); |
| storage_->SetSyncDelegate(&sync); |
| |
| // We need to create new nodes for the storage to be asynchronous. The empty |
| // node is already there, so we create two (child, which is empty, and root, |
| // which contains child). |
| std::string child_data = btree::EncodeNode(0u, std::vector<Entry>(), {}); |
| ObjectIdentifier child_identifier = encryption_service_.MakeObjectIdentifier( |
| ComputeObjectDigest(PieceType::CHUNK, ObjectType::TREE_NODE, child_data)); |
| sync.AddObject(child_identifier, child_data); |
| |
| std::string root_data = |
| btree::EncodeNode(0u, std::vector<Entry>(), {{0u, child_identifier}}); |
| ObjectIdentifier root_identifier = encryption_service_.MakeObjectIdentifier( |
| ComputeObjectDigest(PieceType::CHUNK, ObjectType::TREE_NODE, root_data)); |
| sync.AddObject(root_identifier, root_data); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| |
| std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_identifier, std::move(parent)); |
| CommitId id = commit->GetId(); |
| |
| // Start adding the remote commit. |
| bool commits_from_sync_called; |
| Status commits_from_sync_status; |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit->GetId(), |
| commit->GetStorageBytes().ToString()); |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes), ChangeSource::CLOUD, |
| callback::Capture(callback::SetWhenCalled(&commits_from_sync_called), |
| &commits_from_sync_status, &missing_ids)); |
| |
| // Make the loop run until GetObject is called in sync, and before |
| // AddCommitsFromSync finishes. |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(sync_delegate_called); |
| EXPECT_FALSE(commits_from_sync_called); |
| |
| // Add the local commit. |
| bool commits_from_local_called; |
| Status commits_from_local_status; |
| storage_->AddCommitFromLocal( |
| std::move(commit), {}, |
| callback::Capture(callback::SetWhenCalled(&commits_from_local_called), |
| &commits_from_local_status)); |
| |
| RunLoopUntilIdle(); |
| EXPECT_FALSE(commits_from_sync_called); |
| // The local commit should be commited. |
| EXPECT_TRUE(commits_from_local_called); |
| ASSERT_TRUE(sync_delegate_call); |
| sync_delegate_call(); |
| |
| // Let the two AddCommit finish. |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(commits_from_sync_called); |
| EXPECT_TRUE(commits_from_local_called); |
| EXPECT_EQ(Status::OK, commits_from_sync_status); |
| EXPECT_EQ(Status::OK, commits_from_local_status); |
| |
| // Verify that the commit is added correctly. |
| bool called; |
| Status status; |
| storage_->GetCommit(id, callback::Capture(callback::SetWhenCalled(&called), |
| &status, &commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::OK, status); |
| |
| // The commit should be marked as synced. |
| EXPECT_EQ(0u, GetUnsyncedCommits().size()); |
| } |
| |
| // Verifies that GetUnsyncedCommits() returns commits ordered by their |
| // generation, and not by the timestamp. |
| // |
| // In this test the commits have the following structure: |
| // (root) |
| // / | \ |
| // (A) (B) (C) |
| // \ / |
| // (merge) |
| // C is the last commit to be created. The test verifies that the unsynced |
| // commits are returned in the generation order, with the merge commit being the |
| // last despite not being the most recent. |
| TEST_F(PageStorageTest, GetUnsyncedCommits) { |
| std::unique_ptr<const Commit> root = GetFirstHead(); |
| std::unique_ptr<Journal> journal_a = storage_->StartCommit(root->Clone()); |
| journal_a->Put("a", RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_a = |
| TryCommitJournal(std::move(journal_a), Status::OK); |
| ASSERT_TRUE(commit_a); |
| EXPECT_EQ(1u, commit_a->GetGeneration()); |
| |
| std::unique_ptr<Journal> journal_b = storage_->StartCommit(root->Clone()); |
| journal_b->Put("b", RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_b = |
| TryCommitJournal(std::move(journal_b), Status::OK); |
| ASSERT_TRUE(commit_b); |
| EXPECT_EQ(1u, commit_b->GetGeneration()); |
| |
| std::unique_ptr<Journal> journal_merge = |
| storage_->StartMergeCommit(std::move(commit_a), std::move(commit_b)); |
| |
| std::unique_ptr<const Commit> commit_merge = |
| TryCommitJournal(std::move(journal_merge), Status::OK); |
| ASSERT_TRUE(commit_merge); |
| EXPECT_EQ(2u, commit_merge->GetGeneration()); |
| |
| std::unique_ptr<Journal> journal_c = storage_->StartCommit(std::move(root)); |
| journal_c->Put("c", RandomObjectIdentifier(environment_.random()), |
| KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_c = |
| TryCommitJournal(std::move(journal_c), Status::OK); |
| ASSERT_TRUE(commit_c); |
| EXPECT_EQ(1u, commit_c->GetGeneration()); |
| |
| // Verify that the merge commit is returned as last, even though commit C is |
| // older. |
| std::vector<std::unique_ptr<const Commit>> unsynced_commits = |
| GetUnsyncedCommits(); |
| EXPECT_EQ(4u, unsynced_commits.size()); |
| EXPECT_EQ(commit_merge->GetId(), unsynced_commits.back()->GetId()); |
| EXPECT_LT(commit_merge->GetTimestamp(), commit_c->GetTimestamp()); |
| } |
| |
| // Add a commit for which we don't have its parent. Verify that an error is |
| // returned, along with the id of the missing parent. |
| TEST_F(PageStorageTest, AddCommitsMissingParent) { |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| auto commit_parent = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_identifier, std::move(parent)); |
| parent.clear(); |
| parent.push_back(commit_parent->Clone()); |
| auto commit_child = CommitImpl::FromContentAndParents( |
| environment_.clock(), storage_.get(), root_identifier, std::move(parent)); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit_child->GetId(), |
| commit_child->GetStorageBytes().ToString()); |
| |
| bool called; |
| Status status; |
| std::vector<CommitId> missing_ids; |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes), ChangeSource::P2P, |
| callback::Capture(callback::SetWhenCalled(&called), &status, |
| &missing_ids)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status); |
| EXPECT_THAT(missing_ids, ElementsAre(commit_parent->GetId())); |
| } |
| |
| TEST_F(PageStorageTest, GetMergeCommitIdsEmpty) { |
| std::unique_ptr<const Commit> parent1 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent1); |
| |
| std::unique_ptr<const Commit> parent2 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent2); |
| |
| // Check that there is no merge of |parent1| and |parent2|. |
| bool called; |
| Status status; |
| std::vector<CommitId> merges; |
| storage_->GetMergeCommitIds( |
| parent1->GetId(), parent2->GetId(), |
| callback::Capture(callback::SetWhenCalled(&called), &status, &merges)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_THAT(merges, IsEmpty()); |
| } |
| |
| TEST_F(PageStorageTest, GetMergeCommitIdsNonEmpty) { |
| std::unique_ptr<const Commit> parent1 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent1); |
| |
| std::unique_ptr<const Commit> parent2 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent2); |
| |
| std::unique_ptr<Journal> journal = |
| storage_->StartMergeCommit(parent1->Clone(), parent2->Clone()); |
| |
| std::unique_ptr<const Commit> merge = |
| TryCommitJournal(std::move(journal), Status::OK); |
| ASSERT_TRUE(merge); |
| |
| // Check that |merge| is in the list of merges. |
| bool called; |
| Status status; |
| std::vector<CommitId> merges; |
| storage_->GetMergeCommitIds( |
| parent1->GetId(), parent2->GetId(), |
| callback::Capture(callback::SetWhenCalled(&called), &status, &merges)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_THAT(merges, ElementsAre(merge->GetId())); |
| |
| storage_->GetMergeCommitIds( |
| parent2->GetId(), parent1->GetId(), |
| callback::Capture(callback::SetWhenCalled(&called), &status, &merges)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_THAT(merges, ElementsAre(merge->GetId())); |
| } |
| |
| } // namespace |
| |
| } // namespace storage |