| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <dirent.h> |
| #include <lib/async/cpp/task.h> |
| #include <lib/fit/function.h> |
| |
| #include <algorithm> |
| #include <chrono> |
| #include <map> |
| #include <memory> |
| #include <queue> |
| #include <set> |
| |
| #include "gmock/gmock.h" |
| #include "gtest/gtest.h" |
| #include "src/ledger/bin/app/flags.h" |
| #include "src/ledger/bin/clocks/testing/device_id_manager_empty_impl.h" |
| #include "src/ledger/bin/encryption/fake/fake_encryption_service.h" |
| #include "src/ledger/bin/encryption/primitives/hash.h" |
| #include "src/ledger/bin/platform/scoped_tmp_location.h" |
| #include "src/ledger/bin/public/status.h" |
| #include "src/ledger/bin/storage/fake/fake_object_identifier_factory.h" |
| #include "src/ledger/bin/storage/impl/btree/encoding.h" |
| #include "src/ledger/bin/storage/impl/btree/iterator.h" |
| #include "src/ledger/bin/storage/impl/btree/tree_node.h" |
| #include "src/ledger/bin/storage/impl/commit_random_impl.h" |
| #include "src/ledger/bin/storage/impl/constants.h" |
| #include "src/ledger/bin/storage/impl/journal_impl.h" |
| #include "src/ledger/bin/storage/impl/leveldb.h" |
| #include "src/ledger/bin/storage/impl/object_digest.h" |
| #include "src/ledger/bin/storage/impl/object_impl.h" |
| #include "src/ledger/bin/storage/impl/page_db.h" |
| #include "src/ledger/bin/storage/impl/page_db_empty_impl.h" |
| #include "src/ledger/bin/storage/impl/page_storage_impl.h" |
| #include "src/ledger/bin/storage/impl/split.h" |
| #include "src/ledger/bin/storage/impl/storage_test_utils.h" |
| #include "src/ledger/bin/storage/public/commit_watcher.h" |
| #include "src/ledger/bin/storage/public/constants.h" |
| #include "src/ledger/bin/storage/public/types.h" |
| #include "src/ledger/bin/testing/test_with_environment.h" |
| #include "src/ledger/lib/callback/capture.h" |
| #include "src/ledger/lib/callback/set_when_called.h" |
| #include "src/ledger/lib/convert/convert.h" |
| #include "src/ledger/lib/coroutine/coroutine.h" |
| #include "src/ledger/lib/logging/logging.h" |
| #include "src/ledger/lib/socket/strings.h" |
| #include "src/ledger/lib/timekeeper/test_clock.h" |
| #include "src/ledger/lib/vmo/strings.h" |
| #include "third_party/abseil-cpp/absl/base/attributes.h" |
| #include "third_party/abseil-cpp/absl/strings/str_format.h" |
| #include "third_party/abseil-cpp/absl/strings/string_view.h" |
| |
| namespace storage { |
| |
| class PageStorageImplAccessorForTest { |
| public: |
| static void AddPiece(const std::unique_ptr<PageStorageImpl>& storage, |
| std::unique_ptr<Piece> piece, ChangeSource source, |
| IsObjectSynced is_object_synced, ObjectReferencesAndPriority references, |
| fit::function<void(Status)> callback) { |
| storage->AddPiece(std::move(piece), source, is_object_synced, std::move(references), |
| std::move(callback)); |
| } |
| |
| static PageDb& GetDb(const std::unique_ptr<PageStorageImpl>& storage) { return *(storage->db_); } |
| |
| static void GetCommitRootIdentifier(const std::unique_ptr<PageStorageImpl>& storage, |
| CommitIdView commit_id, |
| fit::function<void(Status, ObjectIdentifier)> callback) { |
| storage->GetCommitRootIdentifier(commit_id, std::move(callback)); |
| } |
| |
| static bool RootCommitIdentifierMapIsEmpty(const std::unique_ptr<PageStorageImpl>& storage) { |
| return storage->roots_of_commits_being_added_.empty(); |
| } |
| |
| static bool RemoteCommitIdMapIsEmpty(const std::unique_ptr<PageStorageImpl>& storage) { |
| return storage->remote_ids_of_commits_being_added_.empty(); |
| } |
| |
| static void ChooseDiffBases(const std::unique_ptr<PageStorageImpl>& storage, |
| CommitIdView target_id, |
| fit::callback<void(Status, std::vector<CommitId>)> callback) { |
| return storage->ChooseDiffBases(std::move(target_id), std::move(callback)); |
| } |
| |
| // Asynchronous version of PageStorage::DeleteObject, running it inside a coroutine. |
| static void DeleteObject( |
| const std::unique_ptr<PageStorageImpl>& storage, ObjectDigest object_digest, |
| fit::function<void(Status, ObjectReferencesAndPriority references)> callback) { |
| storage->coroutine_manager_.StartCoroutine( |
| [&storage, object_digest = std::move(object_digest), |
| callback = std::move(callback)](coroutine::CoroutineHandler* handler) mutable { |
| ObjectReferencesAndPriority references; |
| Status status = storage->DeleteObject(handler, std::move(object_digest), &references); |
| callback(status, references); |
| }); |
| } |
| |
| static long CountLiveReferences(const std::unique_ptr<PageStorageImpl>& storage, |
| const ObjectDigest& digest) { |
| return storage->object_identifier_factory_.count(digest); |
| } |
| |
| static ledger::OperationSerializer& GetCommitSerializer( |
| const std::unique_ptr<PageStorageImpl>& storage) { |
| return storage->commit_serializer_; |
| } |
| }; |
| |
| namespace { |
| |
| using ::coroutine::CoroutineHandler; |
| using ::testing::_; |
| using ::testing::AllOf; |
| using ::testing::AnyOfArray; |
| using ::testing::ContainerEq; |
| using ::testing::Contains; |
| using ::testing::ElementsAre; |
| using ::testing::Field; |
| using ::testing::IsEmpty; |
| using ::testing::IsSubsetOf; |
| using ::testing::IsSupersetOf; |
| using ::testing::Not; |
| using ::testing::Pair; |
| using ::testing::SizeIs; |
| using ::testing::UnorderedElementsAre; |
| using ::testing::UnorderedElementsAreArray; |
| using ::testing::VariantWith; |
| |
| std::vector<PageStorage::CommitIdAndBytes> CommitAndBytesFromCommit(const Commit& commit) { |
| std::vector<PageStorage::CommitIdAndBytes> result; |
| result.emplace_back(commit.GetId(), convert::ToString(commit.GetStorageBytes())); |
| return result; |
| } |
| |
| testing::Matcher<const DeviceClock&> DeviceClockMatchesCommit(const Commit& commit) { |
| return VariantWith<DeviceEntry>( |
| Field("head", &DeviceEntry::head, |
| AllOf(Field("commit_id", &ClockEntry::commit_id, commit.GetId()), |
| Field("generation", &ClockEntry::generation, commit.GetGeneration())))); |
| } |
| |
| // Makes an object identifier untracked. |
| void UntrackIdentifier(ObjectIdentifier* identifier) { |
| *identifier = ObjectIdentifier(identifier->key_index(), identifier->object_digest(), nullptr); |
| } |
| |
| // DataSource that returns an error on the callback to Get(). |
| class FakeErrorDataSource : public DataSource { |
| public: |
| explicit FakeErrorDataSource(async_dispatcher_t* dispatcher) : dispatcher_(dispatcher) {} |
| |
| uint64_t GetSize() override { return 1; } |
| |
| void Get(fit::function<void(std::unique_ptr<DataChunk>, Status)> callback) override { |
| async::PostTask(dispatcher_, [callback = std::move(callback)] { |
| callback(nullptr, DataSource::Status::ERROR); |
| }); |
| } |
| |
| async_dispatcher_t* const dispatcher_; |
| }; |
| |
| class FakeCommitWatcher : public CommitWatcher { |
| public: |
| FakeCommitWatcher() = default; |
| |
| void OnNewCommits(const std::vector<std::unique_ptr<const Commit>>& commits, |
| ChangeSource source) override { |
| ++commit_count; |
| last_commit_id = commits.back()->GetId(); |
| last_source = source; |
| } |
| |
| int commit_count = 0; |
| CommitId last_commit_id; |
| ChangeSource last_source; |
| }; |
| |
| enum class HasP2P { YES, NO }; |
| |
| enum class HasCloud { |
| // The cloud is present and supports |GetDiff|. |
| YES_WITH_DIFFS, |
| // The cloud is present but does not support |GetDiff|. |
| YES_NO_DIFFS, |
| // The cloud is not present. |
| NO |
| }; |
| |
| // Combination of features of sync and storage we want to test for. |
| struct SyncFeatures { |
| // Is P2P sync available? |
| HasP2P has_p2p; |
| // Is cloud sync available? Does the cloud support diffs? |
| HasCloud has_cloud; |
| // The diff compatibility policy used by Ledger. |
| DiffCompatibilityPolicy diff_compatibility_policy; |
| |
| static const SyncFeatures kDefault; |
| static const SyncFeatures kNoDiff; |
| }; |
| |
| const SyncFeatures SyncFeatures::kDefault = {HasP2P::YES, HasCloud::YES_WITH_DIFFS, |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}; |
| const SyncFeatures SyncFeatures::kNoDiff = {HasP2P::YES, HasCloud::YES_NO_DIFFS, |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}; |
| |
| // Where an object is available from. |
| enum ObjectAvailability { |
| // Object is only available from P2P. |
| P2P, |
| // Object is available from P2P and cloud. |
| P2P_AND_CLOUD, |
| }; |
| |
| class DelayingFakeSyncDelegate : public PageSyncDelegate { |
| public: |
| explicit DelayingFakeSyncDelegate(fit::function<void(fit::closure)> on_get_object, |
| fit::function<void(fit::closure)> on_get_diff = nullptr, |
| SyncFeatures sync_features = SyncFeatures::kDefault) |
| : features_(sync_features), |
| on_get_object_(std::move(on_get_object)), |
| on_get_diff_(std::move(on_get_diff)) { |
| if (!on_get_diff_) { |
| on_get_diff_ = [](fit::closure callback) { callback(); }; |
| } |
| } |
| |
| // Adds the given object to sync. |object_source| indicates if it should be available from P2P |
| // only, or from P2P and from the cloud. |
| void AddObject(ObjectIdentifier object_identifier, const std::string& value, |
| ObjectAvailability object_source) { |
| UntrackIdentifier(&object_identifier); |
| auto [it, inserted] = digest_to_value_.emplace(std::move(object_identifier), |
| std::make_pair(object_source, value)); |
| if (!inserted && object_source == ObjectAvailability::P2P_AND_CLOUD) { |
| // |P2P_AND_CLOUD| is more permissive than |P2P|. |
| it->second.first = ObjectAvailability::P2P_AND_CLOUD; |
| } |
| } |
| |
| void GetObject(ObjectIdentifier object_identifier, RetrievedObjectType retrieved_object_type, |
| fit::function<void(Status, ChangeSource, IsObjectSynced, |
| std::unique_ptr<DataSource::DataChunk>)> |
| callback) override { |
| UntrackIdentifier(&object_identifier); |
| object_requests.emplace(object_identifier, retrieved_object_type); |
| auto value_found = digest_to_value_.find(object_identifier); |
| if (value_found == digest_to_value_.end()) { |
| callback(Status::INTERNAL_NOT_FOUND, ChangeSource::CLOUD, IsObjectSynced::NO, nullptr); |
| return; |
| } |
| |
| auto [object_source, value] = value_found->second; |
| // Check we can return this object. |
| if (features_.has_cloud != HasCloud::NO && retrieved_object_type == RetrievedObjectType::BLOB && |
| object_source == ObjectAvailability::P2P_AND_CLOUD) { |
| on_get_object_([callback = std::move(callback), value = value] { |
| callback(Status::OK, ChangeSource::CLOUD, IsObjectSynced::YES, |
| DataSource::DataChunk::Create(value)); |
| }); |
| } else if (features_.has_p2p == HasP2P::YES) { |
| on_get_object_([callback = std::move(callback), value = value] { |
| callback(Status::OK, ChangeSource::P2P, IsObjectSynced::NO, |
| DataSource::DataChunk::Create(value)); |
| }); |
| } else { |
| callback(Status::INTERNAL_NOT_FOUND, ChangeSource::CLOUD, IsObjectSynced::NO, nullptr); |
| } |
| } |
| |
| void AddDiff(CommitId commit_id, CommitId base_id, std::vector<EntryChange> changes) { |
| commit_to_diff_[commit_id] = std::make_pair(std::move(base_id), std::move(changes)); |
| } |
| |
| void GetDiff(CommitId commit_id, std::vector<CommitId> possible_bases, |
| fit::function<void(Status status, CommitId base_commit, |
| std::vector<EntryChange> diff_entries)> |
| callback) override { |
| diff_requests.emplace(commit_id, std::move(possible_bases)); |
| switch (features_.has_cloud) { |
| case HasCloud::NO: |
| // We don't support diffs. |
| callback(Status::INTERNAL_NOT_FOUND, {}, {}); |
| return; |
| case HasCloud::YES_NO_DIFFS: |
| // We only send diffs with base = target and empty changes. |
| callback(Status::OK, commit_id, {}); |
| return; |
| case HasCloud::YES_WITH_DIFFS: |
| auto diff_found = commit_to_diff_.find(commit_id); |
| if (diff_found == commit_to_diff_.end()) { |
| callback(Status::INTERNAL_NOT_FOUND, {}, {}); |
| return; |
| } |
| callback(Status::OK, diff_found->second.first, diff_found->second.second); |
| } |
| } |
| |
| void UpdateClock(storage::Clock /*clock*/, |
| fit::function<void(ledger::Status)> callback) override { |
| LEDGER_NOTIMPLEMENTED(); |
| callback(ledger::Status::NOT_IMPLEMENTED); |
| } |
| |
| size_t GetNumberOfObjectsStored() { return digest_to_value_.size(); } |
| |
| std::set<std::pair<ObjectIdentifier, RetrievedObjectType>> object_requests; |
| std::set<std::pair<CommitId, std::vector<CommitId>>> diff_requests; |
| |
| void set_on_get_object(fit::function<void(fit::closure)> callback) { |
| on_get_object_ = std::move(callback); |
| } |
| |
| void SetSyncFeatures(SyncFeatures features) { features_ = features; } |
| |
| private: |
| SyncFeatures features_; |
| fit::function<void(fit::closure)> on_get_object_; |
| fit::function<void(fit::closure)> on_get_diff_; |
| std::map<ObjectIdentifier, std::pair<ObjectAvailability, std::string>> digest_to_value_; |
| std::map<CommitId, std::pair<CommitId, std::vector<EntryChange>>> commit_to_diff_; |
| }; |
| |
| class FakeSyncDelegate : public DelayingFakeSyncDelegate { |
| public: |
| FakeSyncDelegate(SyncFeatures sync_features = SyncFeatures::kDefault) |
| : DelayingFakeSyncDelegate([](fit::closure callback) { callback(); }, |
| [](fit::closure callback) { callback(); }, sync_features) {} |
| }; |
| |
| // Shim for LevelDB that allows to selectively fail some calls. |
| class ControlledLevelDb : public Db { |
| public: |
| explicit ControlledLevelDb(ledger::FileSystem* file_system, async_dispatcher_t* dispatcher, |
| ledger::DetachedPath db_path) |
| : leveldb_(file_system, dispatcher, db_path) {} |
| |
| class ControlledBatch : public Batch { |
| public: |
| explicit ControlledBatch(ControlledLevelDb* controller, std::unique_ptr<Batch> batch) |
| : controller_(controller), batch_(std::move(batch)) {} |
| |
| // Batch: |
| Status Put(coroutine::CoroutineHandler* handler, convert::ExtendedStringView key, |
| absl::string_view value) override { |
| return batch_->Put(handler, key, value); |
| } |
| |
| Status Delete(coroutine::CoroutineHandler* handler, convert::ExtendedStringView key) override { |
| return batch_->Delete(handler, key); |
| } |
| |
| Status Execute(coroutine::CoroutineHandler* handler) override { |
| if (controller_->fail_batch_execute_after_ == 0) { |
| return Status::IO_ERROR; |
| } |
| if (controller_->fail_batch_execute_after_ > 0) { |
| controller_->fail_batch_execute_after_--; |
| } |
| if (controller_->on_execute_) { |
| controller_->on_execute_(); |
| } |
| return batch_->Execute(handler); |
| } |
| |
| private: |
| ControlledLevelDb* controller_; |
| std::unique_ptr<Batch> batch_; |
| }; |
| |
| Status Init() { return leveldb_.Init(); } |
| |
| // Sets the number of calls to |Batch::Execute()|, for batches generated by |
| // this object, after which all calls would fail. It is used to simulate write |
| // failures. |
| // If |fail_batch_execute_after| is negative, or this method is not called, |
| // |Batch::Execute()| calls will nevef fail. |
| void SetFailBatchExecuteAfter(int fail_batch_execute_after) { |
| fail_batch_execute_after_ = fail_batch_execute_after; |
| } |
| |
| // Db: |
| Status StartBatch(coroutine::CoroutineHandler* handler, std::unique_ptr<Batch>* batch) override { |
| std::unique_ptr<Batch> inner_batch; |
| Status status = leveldb_.StartBatch(handler, &inner_batch); |
| *batch = std::make_unique<ControlledBatch>(this, std::move(inner_batch)); |
| return status; |
| } |
| |
| Status Get(coroutine::CoroutineHandler* handler, convert::ExtendedStringView key, |
| std::string* value) override { |
| return leveldb_.Get(handler, key, value); |
| } |
| |
| Status HasKey(coroutine::CoroutineHandler* handler, convert::ExtendedStringView key) override { |
| return leveldb_.HasKey(handler, key); |
| } |
| |
| Status HasPrefix(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView prefix) override { |
| return leveldb_.HasPrefix(handler, prefix); |
| } |
| |
| Status GetObject(coroutine::CoroutineHandler* handler, convert::ExtendedStringView key, |
| ObjectIdentifier object_identifier, |
| std::unique_ptr<const Piece>* piece) override { |
| return leveldb_.GetObject(handler, key, std::move(object_identifier), piece); |
| } |
| |
| Status GetByPrefix(coroutine::CoroutineHandler* handler, convert::ExtendedStringView prefix, |
| std::vector<std::string>* key_suffixes) override { |
| return leveldb_.GetByPrefix(handler, prefix, key_suffixes); |
| } |
| |
| Status GetEntriesByPrefix(coroutine::CoroutineHandler* handler, |
| convert::ExtendedStringView prefix, |
| std::vector<std::pair<std::string, std::string>>* entries) override { |
| return leveldb_.GetEntriesByPrefix(handler, prefix, entries); |
| } |
| |
| Status GetIteratorAtPrefix( |
| coroutine::CoroutineHandler* handler, convert::ExtendedStringView prefix, |
| std::unique_ptr< |
| Iterator<const std::pair<convert::ExtendedStringView, convert::ExtendedStringView>>>* |
| iterator) override { |
| return leveldb_.GetIteratorAtPrefix(handler, prefix, iterator); |
| } |
| |
| // Sets a callback triggered before each |Batch::Execute|. |
| void set_on_execute(fit::closure callback) { on_execute_ = std::move(callback); } |
| |
| private: |
| // Number of calls to |Batch::Execute()| before they start failing. If |
| // negative, |Batch::Execute()| calls will never fail. |
| int fail_batch_execute_after_ = -1; |
| LevelDb leveldb_; |
| fit::closure on_execute_; |
| }; |
| |
| class PageStorageTest : public StorageTest { |
| public: |
| PageStorageTest() : PageStorageTest(ledger::kTestingGarbageCollectionPolicy) {} |
| |
| explicit PageStorageTest(GarbageCollectionPolicy gc_policy, |
| DiffCompatibilityPolicy diff_compatibility_policy = |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES) |
| : StorageTest(gc_policy, diff_compatibility_policy), encryption_service_(dispatcher()) {} |
| |
| PageStorageTest(const PageStorageTest&) = delete; |
| PageStorageTest& operator=(const PageStorageTest&) = delete; |
| ~PageStorageTest() override = default; |
| |
| // Test: |
| void SetUp() override { ResetStorage(); } |
| |
| void ResetStorage(CommitPruningPolicy pruning_policy = CommitPruningPolicy::NEVER) { |
| if (storage_) { |
| storage_->SetSyncDelegate(nullptr); |
| storage_.reset(); |
| } |
| tmp_location_ = environment_.file_system()->CreateScopedTmpLocation(); |
| PageId id = RandomString(environment_.random(), 10); |
| auto db = std::make_unique<ControlledLevelDb>(environment_.file_system(), dispatcher(), |
| tmp_location_->path()); |
| leveldb_ = db.get(); |
| ASSERT_EQ(db->Init(), Status::OK); |
| storage_ = std::make_unique<PageStorageImpl>(&environment_, &encryption_service_, std::move(db), |
| id, pruning_policy); |
| |
| bool called; |
| Status status; |
| clocks::DeviceIdManagerEmptyImpl device_id_manager; |
| storage_->Init(&device_id_manager, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(storage_->GetId(), id); |
| } |
| |
| // After |UntrackIdentifier| or |ResetStorage|, |identifier| may point to an expired factory. |
| // Reallocates a fresh identifier tracked by the current storage's factory if necessary. |
| void RetrackIdentifier(ObjectIdentifier* identifier) { |
| if (identifier->factory() != storage_->GetObjectIdentifierFactory()) { |
| *identifier = storage_->GetObjectIdentifierFactory()->MakeObjectIdentifier( |
| identifier->key_index(), identifier->object_digest()); |
| } |
| } |
| |
| protected: |
| PageStorage* GetStorage() override { return storage_.get(); } |
| |
| std::vector<std::unique_ptr<const Commit>> GetHeads() { |
| std::vector<std::unique_ptr<const Commit>> heads; |
| Status status = storage_->GetHeadCommits(&heads); |
| EXPECT_EQ(status, Status::OK); |
| return heads; |
| } |
| |
| std::unique_ptr<const Commit> GetFirstHead() { |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| EXPECT_FALSE(heads.empty()); |
| return std::move(heads[0]); |
| } |
| |
| std::unique_ptr<const Commit> GetCommit(const CommitId& id) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit; |
| storage_->GetCommit(id, ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| return commit; |
| } |
| |
| // Returns a random, tracked, non-inline object identifier. |
| // Since random identifiers do not correspond to actual stored objects, we do not need to |
| // untrack them to allow more garbage-collection opportunities (they wouldn't be collected |
| // anyway). Keeping them tracked is necessary to satisfy validity checks within PageStorage |
| // operations. |
| ObjectIdentifier RandomObjectIdentifier() { |
| return storage::RandomObjectIdentifier(environment_.random(), |
| storage_->GetObjectIdentifierFactory()); |
| } |
| |
| // Returns a random, tracked, inline object identifier. |
| ObjectIdentifier RandomInlineObjectIdentifier() { |
| ObjectIdentifier identifier = |
| MakeObject(RandomString(environment_.random(), 31), InlineBehavior::ALLOW) |
| .object_identifier; |
| EXPECT_TRUE(GetObjectDigestInfo(identifier.object_digest()).is_inlined()); |
| RetrackIdentifier(&identifier); |
| return identifier; |
| } |
| |
| // Returns an untracked ObjectData built with the provided |args|. |
| template <typename... Args> |
| ObjectData MakeObject(Args&&... args) { |
| return ObjectData(&fake_factory_, std::forward<Args>(args)...); |
| } |
| |
| std::unique_ptr<const Commit> TryCommitFromSync() { |
| ObjectIdentifier root_identifier; |
| EXPECT_TRUE(GetEmptyNodeIdentifier(&root_identifier)); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| std::unique_ptr<const Commit> commit = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parent)); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| return commit; |
| } |
| |
| // Returns an empty pointer if |CommitJournal| times out. |
| ABSL_MUST_USE_RESULT std::unique_ptr<const Commit> TryCommitJournal( |
| std::unique_ptr<Journal> journal, Status expected_status) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| |
| RunLoopUntilIdle(); |
| EXPECT_EQ(status, expected_status); |
| if (!called) { |
| return std::unique_ptr<const Commit>(); |
| } |
| return commit; |
| } |
| |
| // Returns an empty pointer if |TryCommitJournal| failed. |
| ABSL_MUST_USE_RESULT std::unique_ptr<const Commit> TryCommitFromLocal(int keys, |
| size_t min_key_size = 0) { |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| |
| for (int i = 0; i < keys; ++i) { |
| auto key = absl::StrFormat("key%05d", i); |
| if (key.size() < min_key_size) { |
| key.resize(min_key_size); |
| } |
| journal->Put(key, RandomObjectIdentifier(), KeyPriority::EAGER); |
| } |
| |
| journal->Delete("key_does_not_exist"); |
| |
| std::unique_ptr<const Commit> commit = TryCommitJournal(std::move(journal), Status::OK); |
| if (!commit) { |
| return commit; |
| } |
| |
| // Check the contents. |
| std::vector<Entry> entries = GetCommitContents(*commit); |
| EXPECT_EQ(entries.size(), static_cast<size_t>(keys)); |
| for (int i = 0; i < keys; ++i) { |
| auto key = absl::StrFormat("key%05d", i); |
| if (key.size() < min_key_size) { |
| key.resize(min_key_size); |
| } |
| EXPECT_EQ(entries[i].key, key); |
| } |
| |
| return commit; |
| } |
| |
| ObjectIdentifier TryAddFromLocal(std::string content, |
| const ObjectIdentifier& expected_identifier) { |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, DataSource::Create(std::move(content)), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, expected_identifier); |
| return object_identifier; |
| } |
| |
| std::unique_ptr<const Object> TryGetObject(ObjectIdentifier object_identifier, |
| PageStorage::Location location, |
| Status expected_status = Status::OK) { |
| RetrackIdentifier(&object_identifier); |
| bool called; |
| Status status; |
| std::unique_ptr<const Object> object; |
| storage_->GetObject(object_identifier, location, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, expected_status); |
| return object; |
| } |
| |
| ledger::SizedVmo TryGetObjectPart(ObjectIdentifier object_identifier, size_t offset, |
| size_t max_size, PageStorage::Location location, |
| Status expected_status = Status::OK) { |
| RetrackIdentifier(&object_identifier); |
| bool called; |
| Status status; |
| ledger::SizedVmo vmo; |
| storage_->GetObjectPart(object_identifier, offset, max_size, location, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &vmo)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, expected_status); |
| return vmo; |
| } |
| |
| std::unique_ptr<const Piece> TryGetPiece(ObjectIdentifier object_identifier, |
| Status expected_status = Status::OK) { |
| RetrackIdentifier(&object_identifier); |
| bool called; |
| Status status; |
| std::unique_ptr<const Piece> piece; |
| storage_->GetPiece(object_identifier, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &piece)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, expected_status) << object_identifier; |
| return piece; |
| } |
| |
| std::vector<Entry> GetCommitContents(const Commit& commit) { |
| bool called; |
| Status status; |
| std::vector<Entry> result; |
| auto on_next = [&result](Entry e) { |
| result.push_back(e); |
| return true; |
| }; |
| storage_->GetCommitContents(commit, "", std::move(on_next), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| return result; |
| } |
| |
| std::vector<std::unique_ptr<const Commit>> GetUnsyncedCommits() { |
| bool called; |
| Status status; |
| std::vector<std::unique_ptr<const Commit>> commits; |
| storage_->GetUnsyncedCommits( |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commits)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| return commits; |
| } |
| |
| Status WriteObject(CoroutineHandler* handler, ObjectData* data, |
| PageDbObjectStatus object_status = PageDbObjectStatus::TRANSIENT, |
| const ObjectReferencesAndPriority& references = {}) { |
| return PageStorageImplAccessorForTest::GetDb(storage_).WriteObject( |
| handler, DataChunkPiece(data->object_identifier, data->ToChunk()), object_status, |
| references); |
| } |
| |
| Status ReadObject(CoroutineHandler* handler, ObjectIdentifier object_identifier, |
| std::unique_ptr<const Piece>* piece) { |
| return PageStorageImplAccessorForTest::GetDb(storage_).ReadObject(handler, object_identifier, |
| piece); |
| } |
| |
| // Checks that |object_identifier| is referenced by |expected_references|. |
| void CheckInboundObjectReferences(CoroutineHandler* handler, ObjectIdentifier object_identifier, |
| ObjectReferencesAndPriority expected_references) { |
| ASSERT_FALSE(GetObjectDigestInfo(object_identifier.object_digest()).is_inlined()) |
| << "Broken test: CheckInboundObjectReferences must be called on " |
| "non-inline pieces only."; |
| ObjectReferencesAndPriority stored_references; |
| ASSERT_EQ(PageStorageImplAccessorForTest::GetDb(storage_).GetInboundObjectReferences( |
| handler, object_identifier, &stored_references), |
| Status::OK); |
| EXPECT_THAT(stored_references, UnorderedElementsAreArray(expected_references)); |
| } |
| |
| // Checks that |object_identifier| is referenced by |expected_references|. |
| void CheckInboundCommitReferences(CoroutineHandler* handler, ObjectIdentifier object_identifier, |
| const std::vector<CommitId>& expected_references) { |
| ASSERT_FALSE(GetObjectDigestInfo(object_identifier.object_digest()).is_inlined()) |
| << "Broken test: CheckInboundCommitReferences must be called on " |
| "non-inline pieces only."; |
| std::vector<CommitId> stored_references; |
| ASSERT_EQ(PageStorageImplAccessorForTest::GetDb(storage_).GetInboundCommitReferences( |
| handler, object_identifier, &stored_references), |
| Status::OK); |
| EXPECT_THAT(stored_references, UnorderedElementsAreArray(expected_references)); |
| } |
| |
| ::testing::AssertionResult ObjectIsUntracked(ObjectIdentifier object_identifier, |
| bool expected_untracked) { |
| RetrackIdentifier(&object_identifier); |
| bool called; |
| Status status; |
| bool is_untracked; |
| storage_->ObjectIsUntracked( |
| object_identifier, ledger::Capture(ledger::SetWhenCalled(&called), &status, &is_untracked)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "ObjectIsUntracked for id " << object_identifier << " didn't return."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "ObjectIsUntracked for id " << object_identifier << " returned status " << status; |
| } |
| if (is_untracked != expected_untracked) { |
| return ::testing::AssertionFailure() |
| << "For id " << object_identifier << " expected to find the object " |
| << (is_untracked ? "un" : "") << "tracked, but was " |
| << (expected_untracked ? "un" : "") << "tracked, instead."; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ::testing::AssertionResult IsPieceSynced(ObjectIdentifier object_identifier, |
| bool expected_synced) { |
| RetrackIdentifier(&object_identifier); |
| bool called; |
| Status status; |
| bool is_synced; |
| storage_->IsPieceSynced(object_identifier, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() |
| << "IsPieceSynced for id " << object_identifier << " didn't return."; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() |
| << "IsPieceSynced for id " << object_identifier << " returned status " << status; |
| } |
| if (is_synced != expected_synced) { |
| return ::testing::AssertionFailure() |
| << "For id " << object_identifier << " expected to find the object " |
| << (is_synced ? "un" : "") << "synced, but was " << (expected_synced ? "un" : "") |
| << "synced, instead."; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ::testing::AssertionResult MarkCommitSynced(const CommitId& commit) { |
| bool called; |
| Status status; |
| storage_->MarkCommitSynced(commit, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() << "MarkCommitSynced did not return"; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() << "MarkCommitSynced returned status " << status; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ::testing::AssertionResult MarkPieceSynced(ObjectIdentifier object_identifier) { |
| RetrackIdentifier(&object_identifier); |
| bool called; |
| Status status; |
| storage_->MarkPieceSynced(object_identifier, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| |
| if (!called) { |
| return ::testing::AssertionFailure() << "MarkPieceSynced did not return"; |
| } |
| if (status != Status::OK) { |
| return ::testing::AssertionFailure() << "MarkPieceSynced returned status " << status; |
| } |
| return ::testing::AssertionSuccess(); |
| } |
| |
| ControlledLevelDb* leveldb_; |
| std::unique_ptr<ledger::ScopedTmpLocation> tmp_location_; |
| encryption::FakeEncryptionService encryption_service_; |
| std::unique_ptr<PageStorageImpl> storage_; |
| // A fake factory to allocate test identifiers, ensuring they are not automatically tracked by |
| // |storage_| (hence leaving more opportunities to find garbage-collection bugs). |
| fake::FakeObjectIdentifierFactory fake_factory_; |
| }; |
| |
| // A PageStorage test with garbage-collection disabled. |
| class PageStorageTestNoGc : public PageStorageTest { |
| public: |
| PageStorageTestNoGc() : PageStorageTest(GarbageCollectionPolicy::NEVER) {} |
| }; |
| |
| // A PageStorage test with EAGER_ROOT_NODES garbage-collection policy. |
| class PageStorageTestEagerRootNodesGC : public PageStorageTest { |
| public: |
| PageStorageTestEagerRootNodesGC() : PageStorageTest(GarbageCollectionPolicy::EAGER_ROOT_NODES) {} |
| }; |
| |
| // A test fixture parametrized by what kind of queries will be answered by the sync delegate. |
| class PageStorageSyncTest : public PageStorageTest, |
| public ::testing::WithParamInterface<SyncFeatures> { |
| public: |
| PageStorageSyncTest() : PageStorageSyncTest(ledger::kTestingGarbageCollectionPolicy) {} |
| explicit PageStorageSyncTest(GarbageCollectionPolicy gc_policy) |
| : PageStorageTest(gc_policy, GetParam().diff_compatibility_policy) {} |
| |
| // Where are tree nodes expected to be available. |
| ObjectAvailability TreeNodeObjectAvailability() { |
| switch (GetParam().diff_compatibility_policy) { |
| case DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES: |
| return ObjectAvailability::P2P_AND_CLOUD; |
| case DiffCompatibilityPolicy::USE_ONLY_DIFFS: |
| return ObjectAvailability::P2P; |
| } |
| } |
| }; |
| |
| // Sync tests need one of P2P and cloud, and if they have only cloud, they need either |
| // |has_cloud| to be |YES_WITH_DIFFS| or |diff_compatibility_policy| to be |
| // |USE_DIFFS_AND_TREE_NODES|. |
| INSTANTIATE_TEST_SUITE_P( |
| PageStorageSyncTest, PageStorageSyncTest, |
| ::testing::Values( |
| SyncFeatures{HasP2P::YES, HasCloud::YES_WITH_DIFFS, |
| DiffCompatibilityPolicy::USE_ONLY_DIFFS}, |
| SyncFeatures{HasP2P::YES, HasCloud::YES_WITH_DIFFS, |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}, |
| SyncFeatures{HasP2P::YES, HasCloud::YES_NO_DIFFS, |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}, |
| SyncFeatures{HasP2P::YES, HasCloud::YES_NO_DIFFS, DiffCompatibilityPolicy::USE_ONLY_DIFFS}, |
| SyncFeatures{HasP2P::YES, HasCloud::NO, DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}, |
| SyncFeatures{HasP2P::YES, HasCloud::NO, DiffCompatibilityPolicy::USE_ONLY_DIFFS}, |
| SyncFeatures{HasP2P::NO, HasCloud::YES_NO_DIFFS, |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}, |
| SyncFeatures{HasP2P::NO, HasCloud::YES_WITH_DIFFS, |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}, |
| SyncFeatures{HasP2P::NO, HasCloud::YES_WITH_DIFFS, |
| DiffCompatibilityPolicy::USE_ONLY_DIFFS})); |
| |
| TEST_F(PageStorageTest, AddGetLocalCommits) { |
| // Search for a commit id that doesn't exist and see the error. |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> lookup_commit; |
| storage_->GetCommit(RandomCommitId(environment_.random()), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &lookup_commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::INTERNAL_NOT_FOUND); |
| EXPECT_FALSE(lookup_commit); |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| CommitId id = commit->GetId(); |
| std::string storage_bytes = convert::ToString(commit->GetStorageBytes()); |
| |
| // Search for a commit that exists and check the content. |
| std::unique_ptr<const Commit> found = GetCommit(id); |
| EXPECT_EQ(found->GetStorageBytes(), storage_bytes); |
| } |
| |
| TEST_F(PageStorageTest, AddLocalCommitsReferences) { |
| // Create two commits with the same root node. This requires creating an intermediate commit: |
| // - insert entry A in the empty page, commit as commit 1 |
| // - insert entry B in commit 1, commit as commit 2 |
| // - remove entry B in commit 2, commit as commit 3 |
| // Then commits 1 and 3 have the same tree with the same entry ids, so will share a root node. |
| // We then check that both commits 1 and 3 are stored as inbound references of their root node. |
| |
| std::unique_ptr<const Commit> base = GetFirstHead(); |
| const ObjectIdentifier object_id = RandomObjectIdentifier(); |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(base->Clone()); |
| journal->Put("key", object_id, KeyPriority::EAGER); |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit1; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(commit1->Clone()); |
| journal->Put("other", object_id, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit2; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(commit2->Clone()); |
| journal->Delete("other"); |
| std::unique_ptr<const Commit> commit3; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit3)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| ObjectIdentifier root_node1 = commit1->GetRootIdentifier(); |
| ObjectIdentifier root_node3 = commit3->GetRootIdentifier(); |
| |
| CommitId id1 = commit1->GetId(); |
| CommitId id3 = commit3->GetId(); |
| EXPECT_NE(id1, id3); |
| EXPECT_EQ(root_node1, root_node3); |
| |
| RunInCoroutine([this, root_node1, id1, id3](CoroutineHandler* handler) { |
| CheckInboundCommitReferences(handler, root_node1, {id1, id3}); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddCommitFromLocalDoNotMarkUnsynedAlreadySyncedCommit) { |
| bool called; |
| Status status; |
| |
| // Create a conflict. |
| std::unique_ptr<const Commit> base = GetFirstHead(); |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(base->Clone()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit1; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| CommitId id1 = commit1->GetId(); |
| storage_->MarkCommitSynced(id1, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(base->Clone()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit2; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| CommitId id2 = commit2->GetId(); |
| storage_->MarkCommitSynced(id2, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Make a merge commit. Merge commits only depend on their parents and |
| // contents, so we can reproduce them. |
| storage::ObjectIdentifier merged_object_id = RandomObjectIdentifier(); |
| journal = storage_->StartMergeCommit(commit1->Clone(), commit2->Clone()); |
| journal->Put("key", merged_object_id, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_merged1; |
| storage_->CommitJournal(std::move(journal), ledger::Capture(ledger::SetWhenCalled(&called), |
| &status, &commit_merged1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| CommitId merged_id1 = commit_merged1->GetId(); |
| |
| auto commits = GetUnsyncedCommits(); |
| EXPECT_EQ(commits.size(), 1u); |
| EXPECT_EQ(commits[0]->GetId(), merged_id1); |
| |
| storage_->MarkCommitSynced(merged_id1, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Add the commit again. |
| journal = storage_->StartMergeCommit(commit1->Clone(), commit2->Clone()); |
| journal->Put("key", merged_object_id, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_merged2; |
| storage_->CommitJournal(std::move(journal), ledger::Capture(ledger::SetWhenCalled(&called), |
| &status, &commit_merged2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| CommitId merged_id2 = commit_merged2->GetId(); |
| |
| // Check that the commit is not marked unsynced. |
| commits = GetUnsyncedCommits(); |
| EXPECT_EQ(commits.size(), 0u); |
| } |
| |
| TEST_F(PageStorageTest, AddCommitBeforeParentsError) { |
| // Try to add a commit before its parent and see the error. |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(std::make_unique<CommitRandomImpl>(environment_.random(), &fake_factory_)); |
| ObjectIdentifier empty_object_id; |
| GetEmptyNodeIdentifier(&empty_object_id); |
| std::unique_ptr<const Commit> commit = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), empty_object_id, std::move(parent)); |
| |
| bool called; |
| Status status; |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit->GetId(), convert::ToString(commit->GetStorageBytes())); |
| storage_->AddCommitsFromSync(std::move(commits_and_bytes), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::INTERNAL_NOT_FOUND); |
| } |
| |
| TEST_F(PageStorageTest, AddCommitsOutOfOrderError) { |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| auto commit1 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parent)); |
| parent.clear(); |
| parent.push_back(commit1->Clone()); |
| auto commit2 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parent)); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit2->GetId(), convert::ToString(commit2->GetStorageBytes())); |
| commits_and_bytes.emplace_back(commit1->GetId(), convert::ToString(commit1->GetStorageBytes())); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(std::move(commits_and_bytes), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::INTERNAL_NOT_FOUND); |
| } |
| |
| TEST_P(PageStorageSyncTest, AddGetSyncedCommits) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| FakeSyncDelegate sync(GetParam()); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Create a node with 2 values. |
| ObjectData lazy_value = MakeObject("Some data", InlineBehavior::PREVENT); |
| ObjectData eager_value = MakeObject("More data", InlineBehavior::PREVENT); |
| std::vector<Entry> entries = { |
| Entry{"key0", lazy_value.object_identifier, KeyPriority::LAZY, EntryId("id_1")}, |
| Entry{"key1", eager_value.object_identifier, KeyPriority::EAGER, EntryId("id_2")}, |
| }; |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| // Add the three objects to FakeSyncDelegate. |
| sync.AddObject(lazy_value.object_identifier, lazy_value.value, |
| ObjectAvailability::P2P_AND_CLOUD); |
| sync.AddObject(eager_value.object_identifier, eager_value.value, |
| ObjectAvailability::P2P_AND_CLOUD); |
| |
| { |
| // Ensure root_object is not kept, as the storage it depends on will be |
| // deleted. |
| std::unique_ptr<const Object> root_object = |
| TryGetObject(root_identifier, PageStorage::Location::Local()); |
| |
| absl::string_view root_data; |
| ASSERT_EQ(root_object->GetData(&root_data), Status::OK); |
| sync.AddObject(root_identifier, convert::ToString(root_data), TreeNodeObjectAvailability()); |
| } |
| |
| // Reset and clear the storage. |
| ResetStorage(); |
| storage_->SetSyncDelegate(&sync); |
| RetrackIdentifier(&root_identifier); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| CommitId parent_id = parent[0]->GetId(); |
| std::unique_ptr<const Commit> commit = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parent)); |
| CommitId id = commit->GetId(); |
| |
| // Add the diff of the commit to FakeSyncDelegate. |
| sync.AddDiff(id, parent_id, {{entries[0], false}, {entries[1], false}}); |
| |
| // Adding the commit should only request the tree node and the eager value. |
| // The diff should also be requested. |
| sync.object_requests.clear(); |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_THAT(sync.diff_requests, ElementsAre(Pair(id, _))); |
| |
| // We only request the root object (at least as a tree node, maybe as a blob) and the eager |
| // value (only as a BLOB). |
| EXPECT_THAT(sync.object_requests, |
| IsSupersetOf({Pair(root_identifier, RetrievedObjectType::TREE_NODE), |
| Pair(eager_value.object_identifier, RetrievedObjectType::BLOB)})); |
| EXPECT_THAT(sync.object_requests, |
| IsSubsetOf({Pair(root_identifier, RetrievedObjectType::TREE_NODE), |
| Pair(root_identifier, RetrievedObjectType::BLOB), |
| Pair(eager_value.object_identifier, RetrievedObjectType::BLOB)})); |
| |
| // Adding the same commit twice should not request any objects from sync. |
| sync.object_requests.clear(); |
| sync.diff_requests.clear(); |
| storage_->AddCommitsFromSync(CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(sync.object_requests.empty()); |
| EXPECT_TRUE(sync.diff_requests.empty()); |
| |
| std::unique_ptr<const Commit> found = GetCommit(id); |
| EXPECT_EQ(found->GetStorageBytes(), commit->GetStorageBytes()); |
| |
| // Check that the commit is not marked as unsynced. |
| std::vector<std::unique_ptr<const Commit>> commits = GetUnsyncedCommits(); |
| EXPECT_TRUE(commits.empty()); |
| }); |
| } |
| |
| // Check that receiving a remote commit that is already present locally but not |
| // synced will mark the commit as synced. |
| TEST_F(PageStorageTest, MarkRemoteCommitSynced) { |
| FakeSyncDelegate sync; |
| storage_->SetSyncDelegate(&sync); |
| |
| bool called; |
| Status status; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| CommitId id = commit->GetId(); |
| |
| EXPECT_EQ(GetUnsyncedCommits().size(), 1u); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit->GetId(), convert::ToString(commit->GetStorageBytes())); |
| storage_->AddCommitsFromSync(std::move(commits_and_bytes), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| EXPECT_EQ(GetUnsyncedCommits().size(), 0u); |
| } |
| |
| TEST_F(PageStorageTest, SyncCommits) { |
| std::vector<std::unique_ptr<const Commit>> commits = GetUnsyncedCommits(); |
| |
| // Initially there should be no unsynced commits. |
| EXPECT_TRUE(commits.empty()); |
| |
| bool called; |
| Status status; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| commits = GetUnsyncedCommits(); |
| EXPECT_EQ(commits.size(), 1u); |
| EXPECT_EQ(commits[0]->GetStorageBytes(), commit->GetStorageBytes()); |
| |
| // Mark it as synced. |
| storage_->MarkCommitSynced(commit->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| commits = GetUnsyncedCommits(); |
| EXPECT_TRUE(commits.empty()); |
| } |
| |
| TEST_F(PageStorageTest, HeadCommits) { |
| // Every page should have one initial head commit. |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| EXPECT_EQ(heads.size(), 1u); |
| |
| // Adding a new commit with the previous head as its parent should replace the |
| // old head. |
| bool called; |
| Status status; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| heads = GetHeads(); |
| ASSERT_EQ(heads.size(), 1u); |
| EXPECT_EQ(heads[0]->GetId(), commit->GetId()); |
| } |
| |
| TEST_F(PageStorageTest, OrderHeadCommitsByTimestampThenId) { |
| ledger::TestClock test_clock; |
| // We generate a few timestamps: some random, and a few equal constants to |
| // test ID ordering. |
| std::vector<zx::time_utc> timestamps(7); |
| std::generate(timestamps.begin(), timestamps.end(), |
| [this] { return environment_.random()->Draw<zx::time_utc>(); }); |
| timestamps.insert(timestamps.end(), {zx::time_utc(1000), zx::time_utc(1000), zx::time_utc(1000)}); |
| std::shuffle(timestamps.begin(), timestamps.end(), |
| environment_.random()->NewBitGenerator<size_t>()); |
| |
| std::vector<ObjectIdentifier> object_identifiers; |
| object_identifiers.resize(timestamps.size()); |
| for (size_t i = 0; i < timestamps.size(); ++i) { |
| ObjectData value = MakeObject("value" + std::to_string(i), InlineBehavior::ALLOW); |
| std::vector<Entry> entries = {Entry{"key" + std::to_string(i), value.object_identifier, |
| KeyPriority::EAGER, EntryId("id" + std::to_string(i))}}; |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node)); |
| object_identifiers[i] = node->GetIdentifier(); |
| } |
| |
| std::unique_ptr<const Commit> base = GetFirstHead(); |
| |
| // We first generate the commits. The will be shuffled at a later time. |
| std::vector<PageStorage::CommitIdAndBytes> commits; |
| std::vector<std::pair<zx::time_utc, CommitId>> sorted_commits; |
| for (size_t i = 0; i < timestamps.size(); i++) { |
| test_clock.Set(timestamps[i]); |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.push_back(base->Clone()); |
| std::unique_ptr<const Commit> commit = storage_->GetCommitFactory()->FromContentAndParents( |
| &test_clock, environment_.random(), object_identifiers[i], std::move(parent)); |
| |
| commits.emplace_back(commit->GetId(), convert::ToString(commit->GetStorageBytes())); |
| sorted_commits.emplace_back(timestamps[i], commit->GetId()); |
| } |
| |
| auto rng = environment_.random()->NewBitGenerator<uint64_t>(); |
| std::shuffle(commits.begin(), commits.end(), rng); |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(std::move(commits), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| EXPECT_TRUE(RunLoopUntilIdle()); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Check that GetHeadCommitIds returns sorted commits. |
| std::vector<std::unique_ptr<const Commit>> heads; |
| status = storage_->GetHeadCommits(&heads); |
| EXPECT_EQ(status, Status::OK); |
| std::sort(sorted_commits.begin(), sorted_commits.end()); |
| ASSERT_EQ(heads.size(), sorted_commits.size()); |
| for (size_t i = 0; i < sorted_commits.size(); ++i) { |
| EXPECT_EQ(heads[i]->GetId(), sorted_commits[i].second); |
| } |
| } |
| |
| TEST_F(PageStorageTest, CreateJournals) { |
| // Explicit journal. |
| auto left_commit = TryCommitFromLocal(5); |
| ASSERT_TRUE(left_commit); |
| auto right_commit = TryCommitFromLocal(10); |
| ASSERT_TRUE(right_commit); |
| |
| // Journal for merge commit. |
| std::unique_ptr<Journal> journal = |
| storage_->StartMergeCommit(std::move(left_commit), std::move(right_commit)); |
| } |
| |
| TEST_F(PageStorageTest, CreateJournalHugeNode) { |
| std::unique_ptr<const Commit> commit = TryCommitFromLocal(500, 1024); |
| ASSERT_TRUE(commit); |
| std::vector<Entry> entries = GetCommitContents(*commit); |
| |
| EXPECT_EQ(entries.size(), 500u); |
| for (const auto& entry : entries) { |
| EXPECT_EQ(entry.key.size(), 1024u); |
| } |
| |
| // Check that all node's parts are marked as unsynced. |
| bool called; |
| Status status; |
| std::vector<ObjectIdentifier> object_identifiers; |
| storage_->GetUnsyncedPieces( |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| bool found_index = false; |
| std::set<ObjectIdentifier> unsynced_identifiers(object_identifiers.begin(), |
| object_identifiers.end()); |
| for (const auto& identifier : unsynced_identifiers) { |
| EXPECT_FALSE(GetObjectDigestInfo(identifier.object_digest()).is_inlined()); |
| if (GetObjectDigestInfo(identifier.object_digest()).piece_type == PieceType::INDEX) { |
| found_index = true; |
| std::set<ObjectIdentifier> sub_identifiers; |
| IterationStatus iteration_status = IterationStatus::ERROR; |
| CollectPieces( |
| identifier, |
| [this](ObjectIdentifier identifier, |
| fit::function<void(Status, absl::string_view)> callback) { |
| storage_->GetPiece(std::move(identifier), |
| [callback = std::move(callback)]( |
| Status status, std::unique_ptr<const Piece> piece) { |
| if (status != Status::OK) { |
| callback(status, ""); |
| return; |
| } |
| callback(status, piece->GetData()); |
| }); |
| }, |
| [&iteration_status, &sub_identifiers](IterationStatus status, |
| ObjectIdentifier identifier) { |
| iteration_status = status; |
| if (status == IterationStatus::IN_PROGRESS) { |
| EXPECT_TRUE(sub_identifiers.insert(identifier).second); |
| } |
| return true; |
| }); |
| RunLoopUntilIdle(); |
| EXPECT_EQ(iteration_status, IterationStatus::DONE); |
| for (const auto& identifier : sub_identifiers) { |
| if (!GetObjectDigestInfo(identifier.object_digest()).is_inlined()) { |
| EXPECT_EQ(unsynced_identifiers.count(identifier), 1u); |
| } |
| } |
| } |
| } |
| EXPECT_TRUE(found_index); |
| } |
| |
| TEST_F(PageStorageTest, DestroyUncommittedJournal) { |
| // It is not an error if a journal is not committed or rolled back. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| } |
| |
| TEST_F(PageStorageTest, AddObjectFromLocal) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| EXPECT_EQ(piece->GetData(), data.value); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| }); |
| } |
| |
| // This test implements its own garbage-collection to discover bugs earlier and with better |
| // error messages. |
| TEST_F(PageStorageTestNoGc, AddHugeObjectFromLocal) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| // Create data large enough to be split into pieces (and trigger potential garbage-collection |
| // bugs: the more pieces, the more likely we are to hit them). |
| ObjectData data = MakeObject(RandomString(environment_.random(), 1 << 20)); |
| ASSERT_FALSE(GetObjectDigestInfo(data.object_identifier.object_digest()).is_inlined()); |
| ASSERT_FALSE(GetObjectDigestInfo(data.object_identifier.object_digest()).is_chunk()); |
| |
| // Build a set of the pieces |data| is made of. |
| std::set<ObjectDigest> digests; |
| ForEachPiece(data.value, ObjectType::BLOB, &fake_factory_, |
| [&digests](std::unique_ptr<const Piece> piece) { |
| ObjectDigest digest = piece->GetIdentifier().object_digest(); |
| if (GetObjectDigestInfo(digest).is_inlined()) { |
| return; |
| } |
| digests.insert(digest); |
| }); |
| |
| // Trigger deletion of *all* pieces from storage immediately before *any* of them is written |
| // to disk. This is an attempt at finding bugs in the code that wouldn't hold pieces alive |
| // long enough before writing them to disk. |
| leveldb_->set_on_execute([this, digests = std::move(digests)]() { |
| for (const auto& digest : digests) { |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, digest, |
| [digest = digest](Status status, ObjectReferencesAndPriority references) { |
| EXPECT_NE(status, Status::OK) |
| << "DeleteObject succeeded; missing a live reference for " << digest; |
| }); |
| } |
| }); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(object_identifier, PageStorage::Location::Local()); |
| ASSERT_NE(object, nullptr); |
| EXPECT_EQ(object->GetIdentifier(), data.object_identifier); |
| absl::string_view object_data; |
| ASSERT_EQ(object->GetData(&object_data), Status::OK); |
| EXPECT_EQ(convert::ToString(object_data), data.value); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddSmallObjectFromLocal) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data"); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| EXPECT_EQ(ExtractObjectDigestData(object_identifier.object_digest()), data.value); |
| |
| std::unique_ptr<const Piece> piece; |
| EXPECT_EQ(ReadObject(handler, object_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| // Inline objects do not need to ever be tracked. |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, false)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, InterruptAddObjectFromLocal) { |
| ObjectData data = MakeObject("Some data"); |
| |
| storage_->AddObjectFromLocal(ObjectType::BLOB, data.ToDataSource(), {}, |
| [](Status returned_status, ObjectIdentifier object_identifier) {}); |
| |
| // Checking that we do not crash when deleting the storage while an AddObject |
| // call is in progress. |
| storage_.reset(); |
| } |
| |
| TEST_F(PageStorageTest, AddObjectFromLocalError) { |
| auto data_source = std::make_unique<FakeErrorDataSource>(dispatcher()); |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, std::move(data_source), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::IO_ERROR); |
| } |
| |
| // This test deletes objects manually, do not use automatic garbage-collection to keep |
| // results predictable. |
| TEST_F(PageStorageTestNoGc, DeleteObject) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| auto data = std::make_unique<ObjectData>(storage_->GetObjectIdentifierFactory(), "Some data", |
| InlineBehavior::PREVENT); |
| ObjectIdentifier object_identifier = data->object_identifier; |
| const ObjectDigest object_digest = object_identifier.object_digest(); |
| |
| // Add a local piece |data|. |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data->ToPiece(), ChangeSource::LOCAL, IsObjectSynced::NO, {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Check that the piece can be read back. |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| // The piece is a BLOB CHUNK, it should have no reference. |
| ObjectReferencesAndPriority references; |
| ASSERT_EQ(piece->AppendReferences(&references), Status::OK); |
| EXPECT_THAT(references, IsEmpty()); |
| |
| // Remove live references to the identifier. |
| data.reset(); |
| piece.reset(); |
| UntrackIdentifier(&object_identifier); |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, object_digest), 0); |
| |
| // Delete the piece. |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, object_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &references)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_THAT(references, IsEmpty()); |
| |
| // Check that the object is gone. |
| RetrackIdentifier(&object_identifier); |
| EXPECT_EQ(ReadObject(handler, object_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| }); |
| } |
| |
| // Converts ObjectIdentifier into ObjectDigest in |references| and returns the result. |
| ObjectReferencesAndPriority MakeObjectReferencesAndPriority( |
| std::set<std::pair<ObjectIdentifier, KeyPriority>> references) { |
| ObjectReferencesAndPriority result; |
| std::transform(references.begin(), references.end(), std::inserter(result, result.begin()), |
| [](const std::pair<ObjectIdentifier, KeyPriority>& reference) |
| -> std::pair<ObjectDigest, KeyPriority> { |
| return {reference.first.object_digest(), reference.second}; |
| }); |
| return result; |
| } |
| |
| // Tests that DeleteObject deletes both piece references and tree references. |
| // This test deletes objects manually, do not use automatic garbage-collection to keep |
| // results predictable. |
| TEST_F(PageStorageTestNoGc, DeleteObjectWithReferences) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| // Create a valid random tree node object, with tree references and large enough to be split |
| // into several pieces. |
| std::vector<Entry> entries; |
| std::map<size_t, ObjectIdentifier> children; |
| std::set<std::pair<ObjectIdentifier, KeyPriority>> references; |
| for (size_t i = 0; i < 100; ++i) { |
| // Add a random entry. |
| entries.push_back(Entry{RandomString(environment_.random(), 500), RandomObjectIdentifier(), |
| i % 2 ? KeyPriority::EAGER : KeyPriority::LAZY, |
| EntryId(RandomString(environment_.random(), 32))}); |
| references.insert({entries.back().object_identifier, entries.back().priority}); |
| // Add a random child. |
| children.emplace(i, RandomObjectIdentifier()); |
| references.insert({children[i], KeyPriority::EAGER}); |
| } |
| std::sort(entries.begin(), entries.end(), |
| [](const Entry& e1, const Entry& e2) { return e1.key < e2.key; }); |
| std::string data_str = btree::EncodeNode(0, entries, children); |
| ASSERT_TRUE(btree::CheckValidTreeNodeSerialization(data_str)); |
| |
| // Add the tree node to local storage. |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::TREE_NODE, |
| MakeObject(std::move(data_str), ObjectType::TREE_NODE, InlineBehavior::PREVENT) |
| .ToDataSource(), |
| MakeObjectReferencesAndPriority(references), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Check that we got an index piece, hence some piece references. |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| std::unique_ptr<const Piece> piece = TryGetPiece(object_identifier); |
| ASSERT_NE(piece, nullptr); |
| |
| // Add piece references to |references| to check both tree and piece references from now on. |
| ASSERT_EQ(ForEachIndexChild( |
| piece->GetData(), &fake_factory_, |
| [object_identifier, &references](ObjectIdentifier piece_identifier) { |
| if (GetObjectDigestInfo(piece_identifier.object_digest()).is_inlined()) { |
| // References to inline pieces are not stored on disk. |
| return Status::OK; |
| } |
| references.insert({piece_identifier, KeyPriority::EAGER}); |
| return Status::OK; |
| }), |
| Status::OK); |
| |
| // Check piece and tree have been written to local storage. |
| for (const auto& [identifier, priority] : references) { |
| CheckInboundObjectReferences(handler, identifier, |
| {{object_identifier.object_digest(), priority}}); |
| } |
| |
| // Remove live references to the identifier. |
| const ObjectDigest object_digest = object_identifier.object_digest(); |
| piece.reset(); |
| UntrackIdentifier(&object_identifier); |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, object_digest), 0); |
| |
| // Delete the piece. |
| ObjectReferencesAndPriority delete_references; |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, object_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &delete_references)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_THAT(delete_references, ContainerEq(MakeObjectReferencesAndPriority(references))); |
| |
| // Check that the object is gone. |
| RetrackIdentifier(&object_identifier); |
| EXPECT_EQ(ReadObject(handler, object_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| |
| // Check that references are gone. |
| for (const auto& [identifier, priority] : references) { |
| CheckInboundObjectReferences(handler, identifier, {}); |
| } |
| }); |
| } |
| |
| // This test creates two commits, commit1 which associates "key" to some "Some data", and commit2 |
| // which associates "key" to a random piece. |
| // It first attempts to delete the root piece of commit1 and the piece containing "Some data", |
| // which is impossible because those pieces are referenced by commit1 and its root piece |
| // respectively. It then marks the root piece of commit1 as synchronized. This makes another |
| // attempt at the same deletions succeed: the root piece is now synchronized and referenced by a |
| // non-head commit, and the piece containing "Some data" is not referenced by anything (after the |
| // former deletion succeeds), making both of them garbage-collectable. This test deletes objects |
| // manually, do not use automatic garbage-collection to keep results predictable. |
| TEST_F(PageStorageTestNoGc, DeleteObjectAbortsWhenOnDiskReference) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| auto data = std::make_unique<ObjectData>(&fake_factory_, "Some data", InlineBehavior::PREVENT); |
| ObjectIdentifier object_identifier = data->object_identifier; |
| const ObjectDigest object_digest = object_identifier.object_digest(); |
| RetrackIdentifier(&object_identifier); |
| |
| // Add a local piece |data|. |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data->ToPiece(), ChangeSource::LOCAL, IsObjectSynced::NO, {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Add an object-object on-disk reference, as part of a commit. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", object_identifier, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit1; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Mark the commit as synced so that we do not need to keep its root commit alive to compute a |
| // cloud diff. |
| storage_->MarkCommitSynced(commit1->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Record the root piece of |commit1| to attempt deletion later. |
| ObjectIdentifier root_piece_identifier = commit1->GetRootIdentifier(); |
| const ObjectDigest root_piece_digest = root_piece_identifier.object_digest(); |
| |
| // Remove live references to the identifiers. |
| data.reset(); |
| commit1.reset(); |
| UntrackIdentifier(&object_identifier); |
| UntrackIdentifier(&root_piece_identifier); |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, object_digest), 0); |
| |
| // Add another commit so that the previous commit is not live anymore (otherwise it keeps a |
| // live reference to its root piece |root_piece_digest|). |
| EXPECT_NE(PageStorageImplAccessorForTest::CountLiveReferences(storage_, root_piece_digest), 0); |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit2; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Mark the commit as synced so that we do not need to keep its parent alive to compute a |
| // cloud diff (otherwise it will also keep a live reference to |root_piece_digest|). |
| storage_->MarkCommitSynced(commit2->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, root_piece_digest), 0); |
| |
| // Attempt to delete the |data| piece. |
| ObjectReferencesAndPriority references; |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, object_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &references)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::CANCELED); |
| |
| // Attempt to delete the root piece of |commit1|. |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, root_piece_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &references)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::CANCELED); |
| |
| // Check that the pieces are is still there. |
| RetrackIdentifier(&object_identifier); |
| RetrackIdentifier(&root_piece_identifier); |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| ASSERT_EQ(ReadObject(handler, root_piece_identifier, &piece), Status::OK); |
| piece.reset(); |
| |
| // Mark the root piece of |commit1| as synced, to make it garbage-collectable (commit-object |
| // references are ignored for synchronized pieces). |
| RunInCoroutine([this, root_piece_identifier](CoroutineHandler* handler) { |
| EXPECT_EQ(PageStorageImplAccessorForTest::GetDb(storage_).SetObjectStatus( |
| handler, root_piece_identifier, PageDbObjectStatus::SYNCED), |
| Status::OK); |
| }); |
| |
| // Delete the root piece of |commit1|. |
| UntrackIdentifier(&root_piece_identifier); |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, root_piece_digest), 0); |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, root_piece_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &references)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| RetrackIdentifier(&root_piece_identifier); |
| ASSERT_EQ(ReadObject(handler, root_piece_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| |
| // Since tree references are associated with the root piece, it is now possible to delete the |
| // |data| piece, which was only referenced at |commit1|. |
| UntrackIdentifier(&object_identifier); |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, object_digest), 0); |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, object_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &references)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| RetrackIdentifier(&object_identifier); |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| }); |
| } |
| |
| // This test deletes objects manually, do not use automatic garbage-collection to keep |
| // results predictable. |
| TEST_F(PageStorageTestNoGc, DeleteObjectAbortsWhenLiveReference) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| auto data = std::make_unique<ObjectData>(&fake_factory_, "Some data", InlineBehavior::PREVENT); |
| ObjectIdentifier object_identifier = data->object_identifier; |
| const ObjectDigest object_digest = object_identifier.object_digest(); |
| |
| // Add a local piece |data|. |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data->ToPiece(), ChangeSource::LOCAL, IsObjectSynced::NO, {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Remove live references to the identifier. |
| data.reset(); |
| UntrackIdentifier(&object_identifier); |
| EXPECT_EQ(PageStorageImplAccessorForTest::CountLiveReferences(storage_, object_digest), 0); |
| |
| // Start deletion of the piece. |
| ObjectReferencesAndPriority references; |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, object_digest, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &references)); |
| ASSERT_FALSE(called); |
| |
| // Make the identifier live again before deletion has gone through. |
| RetrackIdentifier(&object_identifier); |
| |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::CANCELED); |
| |
| // Check that the object is still there. |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddLocalPiece) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| const ObjectIdentifier reference = RandomObjectIdentifier(); |
| |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data.ToPiece(), ChangeSource::LOCAL, IsObjectSynced::NO, |
| {{reference.object_digest(), KeyPriority::LAZY}}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, data.object_identifier, &piece), Status::OK); |
| EXPECT_EQ(piece->GetData(), data.value); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, false)); |
| |
| CheckInboundObjectReferences(handler, reference, |
| {{data.object_identifier.object_digest(), KeyPriority::LAZY}}); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddSyncPiece) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| const ObjectIdentifier reference = RandomObjectIdentifier(); |
| |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data.ToPiece(), ChangeSource::CLOUD, IsObjectSynced::YES, |
| {{reference.object_digest(), KeyPriority::EAGER}}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, data.object_identifier, &piece), Status::OK); |
| EXPECT_EQ(piece->GetData(), data.value); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, true)); |
| |
| CheckInboundObjectReferences(handler, reference, |
| {{data.object_identifier.object_digest(), KeyPriority::EAGER}}); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, AddP2PPiece) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| |
| bool called; |
| Status status; |
| PageStorageImplAccessorForTest::AddPiece( |
| storage_, data.ToPiece(), ChangeSource::P2P, IsObjectSynced::NO, {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, data.object_identifier, &piece), Status::OK); |
| EXPECT_EQ(piece->GetData(), data.value); |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| EXPECT_TRUE(IsPieceSynced(data.object_identifier, false)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObject) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| ASSERT_EQ(WriteObject(handler, &data), Status::OK); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(data.object_identifier, PageStorage::Location::Local()); |
| EXPECT_EQ(object->GetIdentifier(), data.object_identifier); |
| absl::string_view object_data; |
| ASSERT_EQ(object->GetData(&object_data), Status::OK); |
| EXPECT_EQ(convert::ToString(object_data), data.value); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPart) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("_Some data_", InlineBehavior::PREVENT); |
| ASSERT_EQ(WriteObject(handler, &data), Status::OK); |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(data.object_identifier, 1, data.size - 2, PageStorage::Location::Local()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data.value.substr(1, data.size - 2)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartLargeOffset) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("_Some data_", InlineBehavior::PREVENT); |
| ASSERT_EQ(WriteObject(handler, &data), Status::OK); |
| |
| ledger::SizedVmo object_part = TryGetObjectPart(data.object_identifier, data.size * 2, |
| data.size, PageStorage::Location::Local()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), ""); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartLargeMaxSize) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("_Some data_", InlineBehavior::PREVENT); |
| ASSERT_EQ(WriteObject(handler, &data), Status::OK); |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(data.object_identifier, 0, data.size * 2, PageStorage::Location::Local()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data.value); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartNegativeArgs) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("_Some data_", InlineBehavior::PREVENT); |
| ASSERT_EQ(WriteObject(handler, &data), Status::OK); |
| |
| ledger::SizedVmo object_part = TryGetObjectPart(data.object_identifier, -data.size + 1, -1, |
| PageStorage::Location::Local()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data.value.substr(1, data.size - 1)); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetLargeObjectPart) { |
| std::string data_str = RandomString(environment_.random(), 65536); |
| size_t offset = 6144; |
| size_t size = 49152; |
| |
| ObjectData data = MakeObject(std::move(data_str), InlineBehavior::PREVENT); |
| |
| ASSERT_EQ(GetObjectDigestInfo(data.object_identifier.object_digest()).piece_type, |
| PieceType::INDEX); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), /*tree_references=*/{}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, offset, size, PageStorage::Location::Local()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| std::string result_str = convert::ToString(object_part_data); |
| EXPECT_EQ(result_str.size(), size); |
| EXPECT_EQ(result_str, data.value.substr(offset, size)); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartFromSync) { |
| ObjectData data = MakeObject("_Some data_", InlineBehavior::PREVENT); |
| FakeSyncDelegate sync; |
| sync.AddObject(data.object_identifier, data.value, ObjectAvailability::P2P_AND_CLOUD); |
| storage_->SetSyncDelegate(&sync); |
| |
| ledger::SizedVmo object_part = TryGetObjectPart(data.object_identifier, 1, data.size - 2, |
| PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data.value.substr(1, data.size - 2)); |
| |
| storage_->SetSyncDelegate(nullptr); |
| ObjectData other_data = MakeObject("_Some other data_", InlineBehavior::PREVENT); |
| TryGetObjectPart(other_data.object_identifier, 1, other_data.size - 2, |
| PageStorage::Location::Local(), Status::INTERNAL_NOT_FOUND); |
| TryGetObjectPart(other_data.object_identifier, 1, other_data.size - 2, |
| PageStorage::Location::ValueFromNetwork(), Status::NETWORK_ERROR); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartFromSyncEndOfChunk) { |
| // Test for LE-797: GetObjectPartFromSync was sometimes called to read zero |
| // bytes off a piece. |
| // Generates a read such that the end of the read is on a boundary between two |
| // chunks. |
| |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| |
| FakeSyncDelegate sync; |
| // Given the length of the piece, there will be at least two non-inlined |
| // chunks. This relies on ForEachPiece giving the chunks in order. |
| std::vector<size_t> chunk_lengths; |
| std::vector<ObjectIdentifier> chunk_identifiers; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, |
| [&sync, &chunk_lengths, &chunk_identifiers](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| ObjectDigestInfo digest_info = GetObjectDigestInfo(object_identifier.object_digest()); |
| if (digest_info.is_chunk()) { |
| chunk_lengths.push_back(piece->GetData().size()); |
| chunk_identifiers.push_back(object_identifier); |
| } |
| if (digest_info.is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Read 128 bytes off the end of the first chunk. |
| uint64_t size = 128; |
| ASSERT_LT(size, chunk_lengths[0]); |
| uint64_t offset = chunk_lengths[0] - size; |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, offset, size, PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data_str.substr(offset, size)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| EXPECT_THAT(sync.object_requests, Contains(Pair(object_identifier, RetrievedObjectType::BLOB))); |
| EXPECT_THAT(sync.object_requests, |
| Contains(Pair(chunk_identifiers[0], RetrievedObjectType::BLOB))); |
| EXPECT_THAT(sync.object_requests, Not(Contains(Pair(chunk_identifiers[1], _)))); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartFromSyncStartOfChunk) { |
| // Generates a read such that the start of the read is on a boundary between |
| // two chunks. |
| |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| |
| FakeSyncDelegate sync; |
| // Given the length of the piece, there will be at least two non-inlined |
| // chunks. This relies on ForEachPiece giving the chunks in order. |
| std::vector<size_t> chunk_lengths; |
| std::vector<ObjectIdentifier> chunk_identifiers; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, |
| [&sync, &chunk_lengths, &chunk_identifiers](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| ObjectDigestInfo digest_info = GetObjectDigestInfo(object_identifier.object_digest()); |
| if (digest_info.is_chunk()) { |
| chunk_lengths.push_back(piece->GetData().size()); |
| chunk_identifiers.push_back(object_identifier); |
| } |
| if (digest_info.is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Read 128 bytes off the start of the second chunk. |
| uint64_t size = 128; |
| ASSERT_LT(size, chunk_lengths[1]); |
| uint64_t offset = chunk_lengths[0]; |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, offset, size, PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data_str.substr(offset, size)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| EXPECT_THAT(sync.object_requests, Contains(Pair(object_identifier, RetrievedObjectType::BLOB))); |
| EXPECT_THAT(sync.object_requests, Not(Contains(Pair(chunk_identifiers[0], _)))); |
| EXPECT_THAT(sync.object_requests, |
| Contains(Pair(chunk_identifiers[1], RetrievedObjectType::BLOB))); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartFromSyncZeroBytes) { |
| // Generates a read that falls inside a chunk but reads zero bytes. |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, [&sync](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| ObjectDigestInfo digest_info = GetObjectDigestInfo(object_identifier.object_digest()); |
| if (digest_info.is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Read zero bytes inside a chunk. This succeeds and only reads the root |
| // piece. |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, 12, 0, PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), ""); |
| EXPECT_THAT(sync.object_requests, |
| ElementsAre(Pair(object_identifier, RetrievedObjectType::BLOB))); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectPartFromSyncZeroBytesNotFound) { |
| FakeSyncDelegate sync; |
| storage_->SetSyncDelegate(&sync); |
| |
| // Reading zero bytes from non-existing objects returns an error. |
| ObjectData other_data = MakeObject("_Some other data_", InlineBehavior::PREVENT); |
| TryGetObjectPart(other_data.object_identifier, 1, 0, PageStorage::Location::ValueFromNetwork(), |
| Status::INTERNAL_NOT_FOUND); |
| } |
| |
| // This test implements its own garbage-collection to discover bugs earlier and with better |
| // error messages. |
| TEST_F(PageStorageTestNoGc, GetHugeObjectPartFromSync) { |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| int64_t offset = 28672; |
| int64_t size = 128; |
| |
| std::map<ObjectDigest, ObjectIdentifier> digest_to_identifier; |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, |
| [&sync, &digest_to_identifier](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()).is_inlined()) { |
| return; |
| } |
| digest_to_identifier[object_identifier.object_digest()] = object_identifier; |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| // Trigger deletion of *all* pieces from storage immediately after *any* of them is retrieved |
| // from cloud. This is an attempt at finding bugs in the code that wouldn't hold pieces alive |
| // long enough before writing them to disk. |
| sync.set_on_get_object([this, &digest_to_identifier](fit::closure callback) { |
| callback(); |
| for (const auto& [digest, identifier] : digest_to_identifier) { |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, digest, |
| [digest = digest](Status status, ObjectReferencesAndPriority references) { |
| EXPECT_NE(status, Status::OK) |
| << "DeleteObject succeeded; missing a live reference for " << digest; |
| }); |
| } |
| }); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Add a commit lazily referencing the object to keep it alive once downloaded. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| RetrackIdentifier(&object_identifier); |
| journal->Put("key", object_identifier, KeyPriority::LAZY); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| UntrackIdentifier(&object_identifier); |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, offset, size, PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data_str.substr(offset, size)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| EXPECT_THAT(sync.object_requests, Contains(Pair(object_identifier, RetrievedObjectType::BLOB))); |
| // Check that the requested pieces have been added to storage, and collect |
| // their outbound references into an inbound-references map. Note that we need |
| // to collect references only from piece actually added to storage, rather |
| // than all pieces from |ForEachPiece|, since pieces not present in storage do |
| // not contribute to reference counting. |
| std::map<ObjectIdentifier, ObjectReferencesAndPriority> inbound_references; |
| for (const auto& [piece_identifier, object_type] : sync.object_requests) { |
| EXPECT_EQ(object_type, RetrievedObjectType::BLOB); |
| |
| auto piece = TryGetPiece(piece_identifier); |
| ASSERT_NE(piece, nullptr); |
| ObjectReferencesAndPriority outbound_references; |
| ASSERT_EQ(Status::OK, piece->AppendReferences(&outbound_references)); |
| for (const auto& [reference, priority] : outbound_references) { |
| auto reference_identifier = digest_to_identifier.find(reference); |
| ASSERT_NE(reference_identifier, digest_to_identifier.end()); |
| inbound_references[reference_identifier->second].emplace(piece_identifier.object_digest(), |
| priority); |
| } |
| } |
| // Check that references have been stored correctly. |
| RunInCoroutine( |
| [this, inbound_references = std::move(inbound_references)](CoroutineHandler* handler) { |
| for (const auto& [identifier, references] : inbound_references) { |
| CheckInboundObjectReferences(handler, identifier, references); |
| } |
| }); |
| } |
| |
| TEST_F(PageStorageTest, GetHugeObjectPartFromSyncNegativeOffset) { |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| int64_t offset = -28672; |
| int64_t size = 128; |
| |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, [&sync](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()).is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| storage_->SetSyncDelegate(&sync); |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, offset, size, PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data_str.substr(data_str.size() + offset, size)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| // Check that at least the root piece has been added to storage. |
| TryGetPiece(object_identifier); |
| } |
| |
| TEST_F(PageStorageTest, GetHugeObjectFromSyncMaxConcurrentDownloads) { |
| // In practice, a string that long yields between 30 and 60 pieces. |
| std::string data_str = RandomString(environment_.random(), 2 << 18); |
| |
| // Create a fake sync delegate that will accumulate pending calls in |sync_delegate_calls|. |
| std::vector<fit::closure> sync_delegate_calls; |
| DelayingFakeSyncDelegate sync([&sync_delegate_calls](fit::closure callback) { |
| sync_delegate_calls.push_back(std::move(callback)); |
| }); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Initialize the sync delegate with the pieces of |data|. |
| std::map<ObjectDigest, ObjectIdentifier> digest_to_identifier; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, |
| [&sync, &digest_to_identifier](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()).is_inlined()) { |
| return; |
| } |
| digest_to_identifier[object_identifier.object_digest()] = object_identifier; |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| // Check that we created a part big enough to require at least two batches of pending calls. |
| ASSERT_GT(sync.GetNumberOfObjectsStored(), 2 * kMaxConcurrentDownloads); |
| RetrackIdentifier(&object_identifier); |
| |
| // Fetch the whole object from the delegate. This should block repeateadly whenever we have |
| // accumulated the maximum number of concurrent connections, ie. pending calls. |
| bool called; |
| Status status; |
| ledger::SizedVmo object_part; |
| storage_->GetObjectPart(object_identifier, /*offset=*/0, /*max_size=*/-1, |
| PageStorage::Location::ValueFromNetwork(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_part)); |
| RunLoopUntilIdle(); |
| |
| // Unblock the pending calls until GetObjectPart returns. |
| do { |
| EXPECT_LE(sync_delegate_calls.size(), kMaxConcurrentDownloads); |
| for (auto& sync_delegate_call : sync_delegate_calls) { |
| async::PostTask(dispatcher(), [sync_delegate_call = std::move(sync_delegate_call)]() { |
| sync_delegate_call(); |
| }); |
| } |
| sync_delegate_calls.clear(); |
| RunLoopUntilIdle(); |
| } while (!called); |
| |
| EXPECT_EQ(status, Status::OK); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data_str); |
| EXPECT_EQ(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| EXPECT_THAT(sync.object_requests, Contains(Pair(object_identifier, RetrievedObjectType::BLOB))); |
| } |
| |
| TEST_F(PageStorageTest, GetObjectFromSync) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| FakeSyncDelegate sync; |
| sync.AddObject(data.object_identifier, data.value, ObjectAvailability::P2P_AND_CLOUD); |
| storage_->SetSyncDelegate(&sync); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(data.object_identifier, PageStorage::Location::ValueFromNetwork()); |
| EXPECT_EQ(object->GetIdentifier(), data.object_identifier); |
| absl::string_view object_data; |
| ASSERT_EQ(object->GetData(&object_data), Status::OK); |
| EXPECT_EQ(convert::ToString(object_data), data.value); |
| // Check that the piece has been added to storage (it is small enough that |
| // there is only one piece). |
| TryGetPiece(data.object_identifier); |
| |
| storage_->SetSyncDelegate(nullptr); |
| ObjectData other_data = MakeObject("Some other data", InlineBehavior::PREVENT); |
| TryGetObject(other_data.object_identifier, PageStorage::Location::Local(), |
| Status::INTERNAL_NOT_FOUND); |
| TryGetObject(other_data.object_identifier, PageStorage::Location::ValueFromNetwork(), |
| Status::NETWORK_ERROR); |
| } |
| |
| TEST_F(PageStorageTest, FullDownloadAfterPartial) { |
| std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1); |
| int64_t offset = 0; |
| int64_t size = 128; |
| |
| FakeSyncDelegate sync; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::BLOB, &fake_factory_, [&sync](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier object_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(object_identifier.object_digest()).is_inlined()) { |
| return; |
| } |
| sync.AddObject(std::move(object_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Add a commit lazily referencing the object to keep it alive once downloaded. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| RetrackIdentifier(&object_identifier); |
| journal->Put("key", object_identifier, KeyPriority::LAZY); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| UntrackIdentifier(&object_identifier); |
| |
| ledger::SizedVmo object_part = |
| TryGetObjectPart(object_identifier, offset, size, PageStorage::Location::ValueFromNetwork()); |
| std::string object_part_data; |
| ASSERT_TRUE(ledger::StringFromVmo(object_part, &object_part_data)); |
| EXPECT_EQ(convert::ToString(object_part_data), data_str.substr(offset, size)); |
| EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored()); |
| TryGetObject(object_identifier, PageStorage::Location::Local(), Status::INTERNAL_NOT_FOUND); |
| // Check that all requested pieces have been stored locally. |
| for (const auto& [piece_identifier, object_type] : sync.object_requests) { |
| ASSERT_EQ(object_type, RetrievedObjectType::BLOB); |
| TryGetPiece(piece_identifier); |
| } |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(object_identifier, PageStorage::Location::ValueFromNetwork()); |
| absl::string_view object_data; |
| ASSERT_EQ(object->GetData(&object_data), Status::OK); |
| EXPECT_EQ(convert::ToString(object_data), data_str); |
| EXPECT_EQ(sync.GetNumberOfObjectsStored(), sync.object_requests.size()); |
| TryGetObject(object_identifier, PageStorage::Location::Local(), Status::OK); |
| // Check that all pieces have been stored locally. |
| for (const auto& [piece_identifier, object_type] : sync.object_requests) { |
| ASSERT_EQ(object_type, RetrievedObjectType::BLOB); |
| TryGetPiece(piece_identifier); |
| } |
| } |
| |
| TEST_F(PageStorageTest, GetObjectFromSyncWrongId) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| ObjectData data2 = MakeObject("Some data2", InlineBehavior::PREVENT); |
| FakeSyncDelegate sync; |
| sync.AddObject(data.object_identifier, data2.value, ObjectAvailability::P2P_AND_CLOUD); |
| storage_->SetSyncDelegate(&sync); |
| |
| TryGetObject(data.object_identifier, PageStorage::Location::ValueFromNetwork(), |
| Status::DATA_INTEGRITY_ERROR); |
| } |
| |
| TEST_F(PageStorageTest, AddAndGetHugeTreenodeFromLocal) { |
| std::string data_str = RandomString(environment_.random(), 65536); |
| |
| ObjectData data = MakeObject(std::move(data_str), ObjectType::TREE_NODE, InlineBehavior::PREVENT); |
| // An identifier to another tree node pointed at by the current one. |
| const ObjectIdentifier tree_reference = RandomObjectIdentifier(); |
| ASSERT_EQ(GetObjectDigestInfo(data.object_identifier.object_digest()).object_type, |
| ObjectType::TREE_NODE); |
| ASSERT_EQ(GetObjectDigestInfo(data.object_identifier.object_digest()).piece_type, |
| PieceType::INDEX); |
| ASSERT_EQ(GetObjectDigestInfo(data.object_identifier.object_digest()).inlined, InlinedPiece::NO); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::TREE_NODE, data.ToDataSource(), |
| {{tree_reference.object_digest(), KeyPriority::LAZY}}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| EXPECT_EQ(status, Status::OK); |
| // This ensures that the object is encoded with an index, as we checked the |
| // piece type of |data.object_identifier| above. |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| |
| std::unique_ptr<const Object> object = |
| TryGetObject(object_identifier, PageStorage::Location::Local()); |
| absl::string_view content; |
| ASSERT_EQ(object->GetData(&content), Status::OK); |
| EXPECT_EQ(content, data.value); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| |
| // Check that the index piece obtained at |object_identifier| is different |
| // from the object itself, ie. that some splitting occurred. |
| std::unique_ptr<const Piece> piece = TryGetPiece(object_identifier); |
| ASSERT_NE(piece, nullptr); |
| EXPECT_NE(content, piece->GetData()); |
| |
| RunInCoroutine([this, piece = std::move(piece), tree_reference, |
| object_identifier](CoroutineHandler* handler) { |
| // Check tree reference. |
| CheckInboundObjectReferences(handler, tree_reference, |
| {{object_identifier.object_digest(), KeyPriority::LAZY}}); |
| // Check piece references. |
| ASSERT_EQ(ForEachIndexChild( |
| piece->GetData(), &fake_factory_, |
| [this, handler, object_identifier](ObjectIdentifier piece_identifier) { |
| if (GetObjectDigestInfo(piece_identifier.object_digest()).is_inlined()) { |
| // References to inline pieces are not stored on disk. |
| return Status::OK; |
| } |
| CheckInboundObjectReferences( |
| handler, piece_identifier, |
| {{object_identifier.object_digest(), KeyPriority::EAGER}}); |
| return Status::OK; |
| }), |
| Status::OK); |
| }); |
| } |
| |
| // This test implements its own garbage-collection to discover bugs earlier and with better |
| // error messages. |
| TEST_F(PageStorageTestNoGc, AddAndGetHugeTreenodeFromSync) { |
| // Build a random, valid tree node. |
| std::vector<Entry> entries; |
| std::map<size_t, ObjectIdentifier> children; |
| for (size_t i = 0; i < 1000; ++i) { |
| entries.push_back(Entry{RandomString(environment_.random(), 50), RandomObjectIdentifier(), |
| i % 2 ? KeyPriority::EAGER : KeyPriority::LAZY, |
| EntryId(RandomString(environment_.random(), 32))}); |
| children.emplace(i, RandomObjectIdentifier()); |
| } |
| std::sort(entries.begin(), entries.end(), |
| [](const Entry& e1, const Entry& e2) { return e1.key < e2.key; }); |
| std::string data_str = btree::EncodeNode(0, entries, children); |
| ASSERT_TRUE(btree::CheckValidTreeNodeSerialization(data_str)); |
| |
| // Split the tree node content into pieces, add them to a SyncDelegate to be |
| // retrieved by GetObject, and store inbound piece references into a map to |
| // check them later. |
| std::map<ObjectDigest, ObjectIdentifier> digest_to_identifier; |
| FakeSyncDelegate sync(SyncFeatures::kNoDiff); |
| std::map<ObjectIdentifier, ObjectReferencesAndPriority> inbound_references; |
| ObjectIdentifier object_identifier = ForEachPiece( |
| data_str, ObjectType::TREE_NODE, &fake_factory_, |
| [&sync, &digest_to_identifier, &inbound_references](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier piece_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(piece_identifier.object_digest()).is_inlined()) { |
| return; |
| } |
| digest_to_identifier[piece_identifier.object_digest()] = piece_identifier; |
| ObjectReferencesAndPriority outbound_references; |
| ASSERT_EQ(Status::OK, piece->AppendReferences(&outbound_references)); |
| for (const auto& [reference, priority] : outbound_references) { |
| auto reference_identifier = digest_to_identifier.find(reference); |
| // ForEachPiece returns pieces in order, so we must have already seen |
| // pieces referenced by the current one. |
| ASSERT_NE(reference_identifier, digest_to_identifier.end()); |
| inbound_references[reference_identifier->second].emplace(piece_identifier.object_digest(), |
| priority); |
| } |
| sync.AddObject(std::move(piece_identifier), convert::ToString(piece->GetData()), |
| ObjectAvailability::P2P_AND_CLOUD); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| // Trigger deletion of *all* pieces from storage immediately after *any* of them is retrieved |
| // from cloud. This is an attempt at finding bugs in the code that wouldn't hold pieces alive |
| // long enough before writing them to disk. |
| sync.set_on_get_object([this, &digest_to_identifier](fit::closure callback) { |
| callback(); |
| for (const auto& [digest, identifier] : digest_to_identifier) { |
| PageStorageImplAccessorForTest::DeleteObject( |
| storage_, digest, |
| [digest = digest](Status status, ObjectReferencesAndPriority references) { |
| EXPECT_NE(status, Status::OK) |
| << "DeleteObject succeeded; missing a live reference for " << digest; |
| }); |
| } |
| }); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Add object references to the inbound references map. |
| for (const Entry& entry : entries) { |
| inbound_references[entry.object_identifier].emplace(object_identifier.object_digest(), |
| entry.priority); |
| } |
| for (const auto& [size, child_identifier] : children) { |
| inbound_references[child_identifier].emplace(object_identifier.object_digest(), |
| KeyPriority::EAGER); |
| } |
| |
| // Get the object from network and check that it is correct. |
| |
| // The tree node is not in a commit, but we still need to put the id of a commit we know in the |
| // location. Since diffs are disabled, this can be any commit. |
| RetrackIdentifier(&object_identifier); |
| CommitId commit_id = GetFirstHead()->GetId(); |
| std::unique_ptr<const Object> object = |
| TryGetObject(object_identifier, PageStorage::Location::TreeNodeFromNetwork(commit_id)); |
| absl::string_view content; |
| ASSERT_EQ(object->GetData(&content), Status::OK); |
| EXPECT_EQ(content, data_str); |
| |
| // Check that all pieces have been stored locally. |
| EXPECT_EQ(sync.GetNumberOfObjectsStored(), sync.object_requests.size()); |
| for (auto [piece_identifier, object_type] : sync.object_requests) { |
| EXPECT_EQ(object_type, RetrievedObjectType::TREE_NODE); |
| TryGetPiece(piece_identifier); |
| } |
| |
| // Check that references have been stored correctly. |
| RunInCoroutine( |
| [this, inbound_references = std::move(inbound_references)](CoroutineHandler* handler) { |
| for (const auto& [identifier, references] : inbound_references) { |
| CheckInboundObjectReferences(handler, identifier, references); |
| } |
| }); |
| |
| // Now that the object has been retrieved from network, we should be able to |
| // retrieve it again locally. |
| auto local_object = TryGetObject(object_identifier, PageStorage::Location::Local(), Status::OK); |
| ASSERT_EQ(object->GetData(&content), Status::OK); |
| EXPECT_EQ(content, data_str); |
| } |
| |
| TEST_F(PageStorageTest, UnsyncedPieces) { |
| ObjectData data_array[] = { |
| MakeObject("Some data", InlineBehavior::PREVENT), |
| MakeObject("Some more data", InlineBehavior::PREVENT), |
| MakeObject("Even more data", InlineBehavior::PREVENT), |
| }; |
| std::vector<ObjectIdentifier> stored_identifiers; |
| for (auto& data : data_array) { |
| auto object_identifier = TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| stored_identifiers.push_back(object_identifier); |
| } |
| |
| std::vector<CommitId> commits; |
| |
| // Add one key-value pair per commit. |
| for (const auto& data_identifier : stored_identifiers) { |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put(RandomString(environment_.random(), 10), data_identifier, KeyPriority::LAZY); |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| commits.push_back(GetFirstHead()->GetId()); |
| } |
| |
| // List all objects appearing in the commits. |
| // There is not enough data in a commit for a tree node to be split, but it may still contain |
| // multiple tree nodes. The values will not be split either, so all the objects are chunks and |
| // |GetObjectIdentifiers| returns all the piece identifiers that are part of the commits. |
| bool called; |
| Status status; |
| std::set<ObjectIdentifier> object_identifiers; |
| for (size_t i = 0; i < commits.size(); i++) { |
| std::set<ObjectIdentifier> commit_object_identifiers; |
| btree::GetObjectIdentifiers( |
| environment_.coroutine_service(), storage_.get(), |
| {GetCommit(commits[i])->GetRootIdentifier(), PageStorage::Location::Local()}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // We expect all identifiers to be chunks, and either they are tree nodes or they are one of |
| // the data identifiers. One of them should be the root of the commit. |
| EXPECT_THAT(commit_object_identifiers, Contains(GetCommit(commits[i])->GetRootIdentifier())); |
| for (const auto& object_identifier : commit_object_identifiers) { |
| EXPECT_TRUE(GetObjectDigestInfo(object_identifier.object_digest()).is_chunk()); |
| bool is_tree = GetObjectDigestInfo(object_identifier.object_digest()).object_type == |
| ObjectType::TREE_NODE; |
| if (!is_tree) { |
| // Commit |i| contains data from data_array[i] as well as data from the previous commits |
| // since each is built on top of the previous one. |
| EXPECT_THAT(object_identifier, |
| AnyOfArray(stored_identifiers.begin(), stored_identifiers.begin() + i + 1)); |
| } |
| object_identifiers.insert(object_identifier); |
| } |
| } |
| |
| // GetUnsyncedPieces should return the ids of all the objects appearing in the tree of the 3 |
| // commits. |
| std::vector<ObjectIdentifier> returned_object_identifiers; |
| storage_->GetUnsyncedPieces( |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &returned_object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_THAT(returned_object_identifiers, UnorderedElementsAreArray(object_identifiers)); |
| |
| // Mark the 2nd object as synced. We now expect to still find the other unsynced objects. |
| storage_->MarkPieceSynced(stored_identifiers[1], |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| object_identifiers.erase(stored_identifiers[1]); |
| |
| returned_object_identifiers.clear(); |
| storage_->GetUnsyncedPieces( |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &returned_object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_THAT(returned_object_identifiers, UnorderedElementsAreArray(object_identifiers)); |
| } |
| |
| TEST_F(PageStorageTest, PageIsSynced) { |
| ObjectData data_array[] = { |
| MakeObject("Some data", InlineBehavior::PREVENT), |
| MakeObject("Some more data", InlineBehavior::PREVENT), |
| MakeObject("Even more data", InlineBehavior::PREVENT), |
| }; |
| std::vector<ObjectIdentifier> stored_identifiers; |
| for (auto& data : data_array) { |
| auto object_identifier = TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| EXPECT_TRUE(IsPieceSynced(object_identifier, false)); |
| stored_identifiers.push_back(object_identifier); |
| } |
| |
| // The objects have not been added in a commit: there is nothing to sync and |
| // the page is considered synced. |
| bool called; |
| Status status; |
| bool is_synced; |
| storage_->IsSynced(Capture(ledger::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(is_synced, true); |
| |
| // Add all objects in one commit. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| for (const auto& data_identifier : stored_identifiers) { |
| journal->Put(RandomString(environment_.random(), 10), data_identifier, KeyPriority::LAZY); |
| } |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| CommitId commit_id = GetFirstHead()->GetId(); |
| |
| // After commiting, the page is unsynced. |
| called = false; |
| storage_->IsSynced(Capture(ledger::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_FALSE(is_synced); |
| |
| // Mark all objects (tree nodes and values of entries in the tree) as synced and expect that the |
| // page is still unsynced. |
| // |
| // There is not enough data in the tree for a tree node to be split, but it may still contain |
| // multiple tree nodes. The values will not be split either, so all the objects are chunks and |
| // |GetObjectIdentifiers| returns all the object identifiers we need to mark as synced. |
| std::set<ObjectIdentifier> object_identifiers; |
| btree::GetObjectIdentifiers( |
| environment_.coroutine_service(), storage_.get(), |
| {GetFirstHead()->GetRootIdentifier(), PageStorage::Location::Local()}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| for (const auto& object_identifier : object_identifiers) { |
| ASSERT_TRUE(GetObjectDigestInfo(object_identifier.object_digest()).is_chunk()); |
| storage_->MarkPieceSynced(object_identifier, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| } |
| |
| storage_->IsSynced(Capture(ledger::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_FALSE(is_synced); |
| |
| // Mark the commit as synced and expect that the page is synced. |
| storage_->MarkCommitSynced(commit_id, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| storage_->IsSynced(Capture(ledger::SetWhenCalled(&called), &status, &is_synced)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(is_synced); |
| |
| // All objects should be synced now. |
| for (const auto& object_identifier : stored_identifiers) { |
| EXPECT_TRUE(IsPieceSynced(object_identifier, true)); |
| } |
| } |
| |
| TEST_F(PageStorageTest, PageIsMarkedOnlineAfterCloudSync) { |
| // Check that the page is initially not marked as online. |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Create a local commit: the page is still not online. |
| int size = 10; |
| std::unique_ptr<const Commit> commit = TryCommitFromLocal(size); |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Mark all objects as synced. The page is still not online: other devices |
| // will only see these objects if the corresponding commit is also synced to |
| // the cloud. |
| bool called; |
| Status status; |
| std::vector<ObjectIdentifier> object_identifiers; |
| storage_->GetUnsyncedPieces( |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifiers)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| for (ObjectIdentifier& object_identifier : object_identifiers) { |
| storage_->MarkPieceSynced(object_identifier, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| } |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Mark the commit as synced. The page should now be marked as online. |
| storage_->MarkCommitSynced(commit->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(storage_->IsOnline()); |
| } |
| |
| TEST_F(PageStorageTest, PageIsMarkedOnlineSyncWithPeer) { |
| // Check that the page is initially not marked as online. |
| EXPECT_FALSE(storage_->IsOnline()); |
| |
| // Mark the page as synced to peer and expect that it is marked as online. |
| bool called; |
| Status status; |
| storage_->MarkSyncedToPeer(Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(storage_->IsOnline()); |
| } |
| |
| TEST_F(PageStorageTest, PageIsEmpty) { |
| bool called; |
| Status status; |
| bool is_empty; |
| |
| // Initially the page is empty. |
| storage_->IsEmpty(Capture(ledger::SetWhenCalled(&called), &status, &is_empty)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(is_empty); |
| |
| // Add an entry and expect that the page is not empty any more. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::LAZY); |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| |
| storage_->IsEmpty(Capture(ledger::SetWhenCalled(&called), &status, &is_empty)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_FALSE(is_empty); |
| |
| // Clear the page and expect it to be empty again. |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Delete("key"); |
| EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| |
| storage_->IsEmpty(Capture(ledger::SetWhenCalled(&called), &status, &is_empty)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(is_empty); |
| } |
| |
| TEST_F(PageStorageTest, UntrackedObjectsSimple) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| |
| // The object is not yet created and its id should not be marked as untracked. |
| EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false)); |
| |
| // After creating the object it should be marked as untracked. |
| ObjectIdentifier object_identifier = TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| |
| // After adding the object in a commit it should not be untracked any more. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", object_identifier, KeyPriority::EAGER); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, false)); |
| } |
| |
| TEST_F(PageStorageTest, UntrackedObjectsComplex) { |
| ObjectData data_array[] = { |
| MakeObject("Some data", InlineBehavior::PREVENT), |
| MakeObject("Some more data", InlineBehavior::PREVENT), |
| MakeObject("Even more data", InlineBehavior::PREVENT), |
| }; |
| std::vector<ObjectIdentifier> stored_identifiers; |
| for (auto& data : data_array) { |
| auto object_identifier = TryAddFromLocal(data.value, data.object_identifier); |
| EXPECT_TRUE(ObjectIsUntracked(object_identifier, true)); |
| stored_identifiers.push_back(object_identifier); |
| } |
| |
| // Add a first commit containing stored_identifiers[0]. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key0", stored_identifiers[0], KeyPriority::LAZY); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[0], true)); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[0], false)); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[1], true)); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[2], true)); |
| |
| // Create a second commit. After calling Put for "key1" for the second time |
| // stored_identifiers[1] is no longer part of this commit: it should remain |
| // untracked after committing. |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key1", stored_identifiers[1], KeyPriority::LAZY); |
| journal->Put("key2", stored_identifiers[2], KeyPriority::LAZY); |
| journal->Put("key1", stored_identifiers[2], KeyPriority::LAZY); |
| journal->Put("key3", stored_identifiers[0], KeyPriority::LAZY); |
| ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK)); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[0], false)); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[1], true)); |
| EXPECT_TRUE(ObjectIsUntracked(stored_identifiers[2], false)); |
| } |
| |
| TEST_F(PageStorageTest, CommitWatchers) { |
| FakeCommitWatcher watcher; |
| storage_->AddCommitWatcher(&watcher); |
| |
| // Add a watcher and receive the commit. |
| auto expected = TryCommitFromLocal(10); |
| ASSERT_TRUE(expected); |
| EXPECT_EQ(watcher.commit_count, 1); |
| EXPECT_EQ(watcher.last_commit_id, expected->GetId()); |
| EXPECT_EQ(watcher.last_source, ChangeSource::LOCAL); |
| |
| // Add a second watcher. |
| FakeCommitWatcher watcher2; |
| storage_->AddCommitWatcher(&watcher2); |
| expected = TryCommitFromLocal(10); |
| ASSERT_TRUE(expected); |
| EXPECT_EQ(watcher.commit_count, 2); |
| EXPECT_EQ(watcher.last_commit_id, expected->GetId()); |
| EXPECT_EQ(watcher.last_source, ChangeSource::LOCAL); |
| EXPECT_EQ(watcher2.commit_count, 1); |
| EXPECT_EQ(watcher2.last_commit_id, expected->GetId()); |
| EXPECT_EQ(watcher2.last_source, ChangeSource::LOCAL); |
| |
| // Remove one watcher. |
| storage_->RemoveCommitWatcher(&watcher2); |
| expected = TryCommitFromSync(); |
| EXPECT_EQ(watcher.commit_count, 3); |
| EXPECT_EQ(watcher.last_commit_id, expected->GetId()); |
| EXPECT_EQ(watcher.last_source, ChangeSource::CLOUD); |
| EXPECT_EQ(watcher2.commit_count, 1); |
| } |
| |
| // If a commit fails to be persisted on disk, no notification should be sent. |
| TEST_F(PageStorageTest, CommitFailNoWatchNotification) { |
| FakeCommitWatcher watcher; |
| storage_->AddCommitWatcher(&watcher); |
| EXPECT_EQ(watcher.commit_count, 0); |
| |
| // Create the commit. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key1", RandomObjectIdentifier(), KeyPriority::EAGER); |
| |
| leveldb_->SetFailBatchExecuteAfter(1); |
| std::unique_ptr<const Commit> commit = TryCommitJournal(std::move(journal), Status::IO_ERROR); |
| |
| // The watcher is not called. |
| EXPECT_EQ(watcher.commit_count, 0); |
| } |
| |
| TEST_F(PageStorageTest, SyncMetadata) { |
| std::vector<std::pair<absl::string_view, absl::string_view>> keys_and_values = { |
| {"foo1", "foo2"}, {"bar1", " bar2 "}}; |
| for (const auto& key_and_value : keys_and_values) { |
| auto key = key_and_value.first; |
| auto value = key_and_value.second; |
| bool called; |
| Status status; |
| std::string returned_value; |
| storage_->GetSyncMetadata( |
| key, ledger::Capture(ledger::SetWhenCalled(&called), &status, &returned_value)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::INTERNAL_NOT_FOUND); |
| |
| storage_->SetSyncMetadata(key, value, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| storage_->GetSyncMetadata( |
| key, ledger::Capture(ledger::SetWhenCalled(&called), &status, &returned_value)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(returned_value, value); |
| } |
| } |
| |
| class PageStorageTestAddMultipleCommits : public PageStorageTest { |
| public: |
| void SetUp() override { |
| PageStorageTest::SetUp(); |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| storage_->SetSyncDelegate(&sync_); |
| |
| // Build the commit Tree with: |
| // 0 1 |
| // \ / \ |
| // 2 3 |
| // | |
| // 4 |
| // 0 and 1 are present locally as commits, but their tree is not. |
| // 2, 3 and 4 are added as a single batch. |
| // A merge of 0 and 1 (commit 5) is used to make the objects of 0 and 1 garbage collectable. |
| // Each commit contains one tree node, one eager value and one lazy value. |
| tree_object_identifiers_.resize(6); |
| eager_object_identifiers_.resize(6); |
| lazy_object_identifiers_.resize(6); |
| std::vector<std::vector<Entry>> all_entries; |
| for (size_t i = 0; i < tree_object_identifiers_.size(); ++i) { |
| ObjectData eager_value = |
| MakeObject("eager value" + std::to_string(i), InlineBehavior::PREVENT); |
| ObjectData lazy_value = |
| MakeObject("lazy value" + std::to_string(i), InlineBehavior::PREVENT); |
| std::vector<Entry> entries = { |
| Entry{"key" + std::to_string(i), eager_value.object_identifier, KeyPriority::EAGER, |
| EntryId("id" + std::to_string(i))}, |
| Entry{"lazy" + std::to_string(i), lazy_value.object_identifier, KeyPriority::LAZY, |
| EntryId("id" + std::to_string(i))}}; |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node)); |
| tree_object_identifiers_[i] = node->GetIdentifier(); |
| eager_object_identifiers_[i] = eager_value.object_identifier; |
| lazy_object_identifiers_[i] = lazy_value.object_identifier; |
| sync_.AddObject(eager_value.object_identifier, eager_value.value, |
| ObjectAvailability::P2P_AND_CLOUD); |
| sync_.AddObject(lazy_value.object_identifier, lazy_value.value, |
| ObjectAvailability::P2P_AND_CLOUD); |
| std::unique_ptr<const Object> root_object = |
| TryGetObject(tree_object_identifiers_[i], PageStorage::Location::Local()); |
| absl::string_view root_data; |
| ASSERT_EQ(root_object->GetData(&root_data), Status::OK); |
| sync_.AddObject(tree_object_identifiers_[i], convert::ToString(root_data), |
| ObjectAvailability::P2P_AND_CLOUD); |
| all_entries.push_back(entries); |
| } |
| |
| // Create the commits, the initial upload batch, and the second batch. |
| std::unique_ptr<const Commit> root = GetFirstHead(); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(root->Clone()); |
| std::unique_ptr<const Commit> commit0 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), tree_object_identifiers_[0], |
| std::move(parent)); |
| parent.clear(); |
| sync_.AddDiff(commit0->GetId(), root->GetId(), |
| {{all_entries[0][0], false}, {all_entries[0][1], false}}); |
| |
| parent.emplace_back(root->Clone()); |
| std::unique_ptr<const Commit> commit1 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), tree_object_identifiers_[1], |
| std::move(parent)); |
| parent.clear(); |
| sync_.AddDiff(commit1->GetId(), root->GetId(), |
| {{all_entries[1][0], false}, {all_entries[1][1], false}}); |
| |
| // Ensure that commit0 has a larger id than commit1. |
| if (commit0->GetId() < commit1->GetId()) { |
| std::swap(commit0, commit1); |
| std::swap(tree_object_identifiers_[0], tree_object_identifiers_[1]); |
| std::swap(eager_object_identifiers_[0], eager_object_identifiers_[1]); |
| std::swap(lazy_object_identifiers_[0], lazy_object_identifiers_[1]); |
| } |
| commit_identifiers_.push_back(commit0->GetId()); |
| commit_identifiers_.push_back(commit1->GetId()); |
| |
| parent.emplace_back(commit0->Clone()); |
| parent.emplace_back(commit1->Clone()); |
| std::unique_ptr<const Commit> commit2 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), tree_object_identifiers_[2], |
| std::move(parent)); |
| parent.clear(); |
| commit_identifiers_.push_back(commit2->GetId()); |
| EXPECT_EQ(commit2->GetParentIds()[0], commit1->GetId()); |
| |
| parent.emplace_back(commit1->Clone()); |
| std::unique_ptr<const Commit> commit3 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), tree_object_identifiers_[3], |
| std::move(parent)); |
| commit_identifiers_.push_back(commit3->GetId()); |
| parent.clear(); |
| |
| parent.emplace_back(commit3->Clone()); |
| std::unique_ptr<const Commit> commit4 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), tree_object_identifiers_[4], |
| std::move(parent)); |
| commit_identifiers_.push_back(commit4->GetId()); |
| parent.clear(); |
| |
| parent.emplace_back(commit0->Clone()); |
| parent.emplace_back(commit1->Clone()); |
| std::unique_ptr<const Commit> commit5 = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), tree_object_identifiers_[5], |
| std::move(parent)); |
| commit_identifiers_.push_back(commit5->GetId()); |
| parent.clear(); |
| sync_.AddDiff(commit5->GetId(), commit1->GetId(), |
| {{all_entries[1][0], true}, |
| {all_entries[1][1], true}, |
| {all_entries[5][0], false}, |
| {all_entries[5][1], false}}); |
| |
| // Commit 5 is the only commit that is guaranteed not to be GCed after 0, 1 and 5 are added. |
| sync_.AddDiff(commit2->GetId(), commit5->GetId(), |
| {{all_entries[5][0], true}, |
| {all_entries[5][1], true}, |
| {all_entries[2][0], false}, |
| {all_entries[2][1], false}}); |
| sync_.AddDiff(commit3->GetId(), commit5->GetId(), |
| {{all_entries[5][0], true}, |
| {all_entries[5][1], true}, |
| {all_entries[3][0], false}, |
| {all_entries[3][1], false}}); |
| sync_.AddDiff(commit4->GetId(), commit5->GetId(), |
| {{all_entries[5][0], true}, |
| {all_entries[5][1], true}, |
| {all_entries[4][0], false}, |
| {all_entries[4][1], false}}); |
| |
| std::vector<PageStorage::CommitIdAndBytes> initial_batch; |
| initial_batch.emplace_back(commit0->GetId(), convert::ToString(commit0->GetStorageBytes())); |
| initial_batch.emplace_back(commit1->GetId(), convert::ToString(commit1->GetStorageBytes())); |
| initial_batch.emplace_back(commit5->GetId(), convert::ToString(commit5->GetStorageBytes())); |
| |
| test_batch_.emplace_back(commit2->GetId(), convert::ToString(commit2->GetStorageBytes())); |
| test_batch_.emplace_back(commit3->GetId(), convert::ToString(commit3->GetStorageBytes())); |
| test_batch_.emplace_back(commit4->GetId(), convert::ToString(commit4->GetStorageBytes())); |
| |
| commit0.reset(); |
| commit1.reset(); |
| commit2.reset(); |
| commit3.reset(); |
| commit4.reset(); |
| commit5.reset(); |
| |
| // Reset and clear the storage. We do not retrack the identifiers immediately because we |
| // want to leave the opportunity for the roots of commit 0 and 1 to be collected. |
| ResetStorage(); |
| storage_->SetSyncDelegate(&sync_); |
| |
| // Add commits 0, 1 and 5 from the cloud and let garbage collection kick in. |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(std::move(initial_batch), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Check that the objects of commits 0 and 1 have been collected. |
| for (auto& identifiers : |
| {tree_object_identifiers_, eager_object_identifiers_, lazy_object_identifiers_}) { |
| for (auto commit : {0, 1}) { |
| ObjectIdentifier identifier = identifiers[commit]; |
| RetrackIdentifier(&identifier); |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| } |
| } |
| |
| // Retrack all identifiers. |
| for (auto object_identifiers : |
| {&tree_object_identifiers_, &eager_object_identifiers_, &lazy_object_identifiers_}) { |
| for (auto& identifier : *object_identifiers) { |
| RetrackIdentifier(&identifier); |
| } |
| } |
| sync_.object_requests.clear(); |
| sync_.diff_requests.clear(); |
| }); |
| } |
| |
| FakeSyncDelegate sync_; |
| std::vector<CommitId> commit_identifiers_; |
| std::vector<ObjectIdentifier> tree_object_identifiers_; |
| std::vector<ObjectIdentifier> eager_object_identifiers_; |
| std::vector<ObjectIdentifier> lazy_object_identifiers_; |
| std::vector<PageStorage::CommitIdAndBytes> test_batch_; |
| }; |
| |
| TEST_F(PageStorageTestAddMultipleCommits, FromCloudNoDiff) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| sync_.SetSyncFeatures( |
| {HasP2P::NO, HasCloud::YES_NO_DIFFS, DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES}); |
| // Add commits 2, 3 4. |
| bool called; |
| Status status; |
| |
| storage_->AddCommitsFromSync(std::move(test_batch_), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Diffs for commits 2 and 4 have been requested (but not returned). |
| EXPECT_THAT(sync_.diff_requests, UnorderedElementsAre(Pair(commit_identifiers_[2], _), |
| Pair(commit_identifiers_[4], _))); |
| |
| // The tree and eager objects of commits 2 and 4 have been requested. |
| // The tree has been first requested as a tree node (and received no response), then as a |
| // blob. |
| EXPECT_THAT( |
| sync_.object_requests, |
| UnorderedElementsAre(Pair(tree_object_identifiers_[2], RetrievedObjectType::TREE_NODE), |
| Pair(tree_object_identifiers_[2], RetrievedObjectType::BLOB), |
| Pair(eager_object_identifiers_[2], RetrievedObjectType::BLOB), |
| Pair(tree_object_identifiers_[4], RetrievedObjectType::TREE_NODE), |
| Pair(tree_object_identifiers_[4], RetrievedObjectType::BLOB), |
| Pair(eager_object_identifiers_[4], RetrievedObjectType::BLOB))); |
| }); |
| } |
| |
| TEST_F(PageStorageTestAddMultipleCommits, FromCloudWithDiff) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| sync_.SetSyncFeatures( |
| {HasP2P::NO, HasCloud::YES_WITH_DIFFS, DiffCompatibilityPolicy::USE_ONLY_DIFFS}); |
| // Add commits 2, 3 4. |
| bool called; |
| Status status; |
| |
| storage_->AddCommitsFromSync(std::move(test_batch_), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Commits 0 and 2 have been requested. |
| EXPECT_THAT(sync_.diff_requests, UnorderedElementsAre(Pair(commit_identifiers_[2], _), |
| Pair(commit_identifiers_[4], _))); |
| // The tree and eager objects of commit 2 and 4 have been requested. The tree has been |
| // requested as a TREE_NODE, but not as a BLOB, as a diff has been received. |
| EXPECT_THAT( |
| sync_.object_requests, |
| UnorderedElementsAre(Pair(tree_object_identifiers_[2], RetrievedObjectType::TREE_NODE), |
| Pair(eager_object_identifiers_[2], RetrievedObjectType::BLOB), |
| Pair(tree_object_identifiers_[4], RetrievedObjectType::TREE_NODE), |
| Pair(eager_object_identifiers_[4], RetrievedObjectType::BLOB))); |
| }); |
| } |
| |
| TEST_F(PageStorageTestAddMultipleCommits, FromP2P) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| sync_.SetSyncFeatures({HasP2P::YES, HasCloud::NO, DiffCompatibilityPolicy::USE_ONLY_DIFFS}); |
| |
| // Add commits 2, 3 4. |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(std::move(test_batch_), ChangeSource::P2P, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // The tree, and eager objects of all new commits have been downloaded, as well as the tree |
| // of parent commits. |
| EXPECT_THAT( |
| sync_.object_requests, |
| UnorderedElementsAre(Pair(tree_object_identifiers_[1], RetrievedObjectType::TREE_NODE), |
| Pair(tree_object_identifiers_[2], RetrievedObjectType::TREE_NODE), |
| Pair(eager_object_identifiers_[2], RetrievedObjectType::BLOB), |
| Pair(tree_object_identifiers_[3], RetrievedObjectType::TREE_NODE), |
| Pair(eager_object_identifiers_[3], RetrievedObjectType::BLOB), |
| Pair(tree_object_identifiers_[4], RetrievedObjectType::TREE_NODE), |
| Pair(eager_object_identifiers_[4], RetrievedObjectType::BLOB))); |
| |
| // They have also been requested as diffs. |
| EXPECT_THAT( |
| sync_.diff_requests, |
| UnorderedElementsAre(Pair(commit_identifiers_[1], _), Pair(commit_identifiers_[2], _), |
| Pair(commit_identifiers_[3], _), Pair(commit_identifiers_[4], _))); |
| }); |
| } |
| |
| TEST_F(PageStorageTest, Generation) { |
| std::unique_ptr<const Commit> commit1 = TryCommitFromLocal(3); |
| ASSERT_TRUE(commit1); |
| EXPECT_EQ(commit1->GetGeneration(), 1u); |
| |
| std::unique_ptr<const Commit> commit2 = TryCommitFromLocal(3); |
| ASSERT_TRUE(commit2); |
| EXPECT_EQ(commit2->GetGeneration(), 2u); |
| |
| std::unique_ptr<Journal> journal = |
| storage_->StartMergeCommit(std::move(commit1), std::move(commit2)); |
| |
| std::unique_ptr<const Commit> commit3 = TryCommitJournal(std::move(journal), Status::OK); |
| ASSERT_TRUE(commit3); |
| EXPECT_EQ(commit3->GetGeneration(), 3u); |
| } |
| |
| TEST_F(PageStorageTest, GetEntryFromCommit) { |
| int size = 10; |
| std::unique_ptr<const Commit> commit = TryCommitFromLocal(size); |
| ASSERT_TRUE(commit); |
| |
| bool called; |
| Status status; |
| Entry entry; |
| storage_->GetEntryFromCommit(*commit, "key not found", |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &entry)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| ASSERT_EQ(status, Status::KEY_NOT_FOUND); |
| |
| for (int i = 0; i < size; ++i) { |
| std::string expected_key = absl::StrFormat("key%05d", i); |
| storage_->GetEntryFromCommit(*commit, expected_key, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &entry)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| ASSERT_EQ(status, Status::OK); |
| EXPECT_EQ(entry.key, expected_key); |
| } |
| } |
| |
| TEST_F(PageStorageTest, GetDiffForCloudInsertion) { |
| // Create an initial commit with 10 keys and then another one having commit1 as a parent, |
| // inserting a new key. |
| std::unique_ptr<const Commit> commit1 = TryCommitFromLocal(10); |
| ASSERT_TRUE(commit1); |
| |
| const std::string new_key = "new_key"; |
| const ObjectIdentifier new_identifier = RandomObjectIdentifier(); |
| const KeyPriority new_priority = KeyPriority::LAZY; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put(new_key, new_identifier, new_priority); |
| std::unique_ptr<const Commit> commit2 = TryCommitJournal(std::move(journal), Status::OK); |
| |
| bool called = false; |
| storage_->GetDiffForCloud( |
| *commit2, [&](Status status, CommitIdView base_id, std::vector<EntryChange> changes) { |
| called = true; |
| ASSERT_EQ(status, Status::OK); |
| EXPECT_EQ(base_id, commit1->GetId()); |
| |
| EXPECT_THAT(changes, SizeIs(1)); |
| EXPECT_EQ(changes[0].entry.key, new_key); |
| EXPECT_EQ(changes[0].entry.object_identifier, new_identifier); |
| EXPECT_EQ(changes[0].entry.priority, new_priority); |
| EXPECT_THAT(changes[0].entry.entry_id, Not(IsEmpty())); |
| EXPECT_EQ(changes[0].deleted, false); |
| }); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| } |
| |
| TEST_F(PageStorageTest, GetDiffForCloudDeletion) { |
| // Create an initial commit with 3 keys and then another one having commit1 as a parent, |
| // deleting a key. |
| const std::string deleted_key = "deleted_key"; |
| const ObjectIdentifier deleted_identifier = RandomObjectIdentifier(); |
| const KeyPriority deleted_priority = KeyPriority::EAGER; |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("a key", RandomObjectIdentifier(), KeyPriority::LAZY); |
| journal->Put(deleted_key, deleted_identifier, deleted_priority); |
| journal->Put("last key", RandomObjectIdentifier(), KeyPriority::LAZY); |
| std::unique_ptr<const Commit> commit1 = TryCommitJournal(std::move(journal), Status::OK); |
| ASSERT_TRUE(commit1); |
| |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Delete(deleted_key); |
| std::unique_ptr<const Commit> commit2 = TryCommitJournal(std::move(journal), Status::OK); |
| |
| bool called = false; |
| storage_->GetDiffForCloud( |
| *commit2, [&](Status status, CommitIdView base_id, std::vector<EntryChange> changes) { |
| called = true; |
| ASSERT_EQ(status, Status::OK); |
| EXPECT_EQ(base_id, commit1->GetId()); |
| |
| EXPECT_THAT(changes, SizeIs(1)); |
| EXPECT_EQ(changes[0].entry.key, deleted_key); |
| EXPECT_EQ(changes[0].entry.object_identifier, deleted_identifier); |
| EXPECT_EQ(changes[0].entry.priority, deleted_priority); |
| EXPECT_THAT(changes[0].entry.entry_id, Not(IsEmpty())); |
| EXPECT_EQ(changes[0].deleted, true); |
| }); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| } |
| |
| TEST_F(PageStorageTest, GetDiffForCloudUpdate) { |
| // Create an initial commit with 3 keys and then another one having commit1 as a parent, |
| // updating a key. |
| const std::string updated_key = "updated_key"; |
| const ObjectIdentifier old_identifier = RandomObjectIdentifier(); |
| const KeyPriority old_priority = KeyPriority::LAZY; |
| const ObjectIdentifier new_identifier = RandomObjectIdentifier(); |
| const KeyPriority new_priority = KeyPriority::EAGER; |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("a key", RandomObjectIdentifier(), KeyPriority::LAZY); |
| journal->Put(updated_key, old_identifier, old_priority); |
| journal->Put("last key", RandomObjectIdentifier(), KeyPriority::LAZY); |
| std::unique_ptr<const Commit> commit1 = TryCommitJournal(std::move(journal), Status::OK); |
| ASSERT_TRUE(commit1); |
| |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put(updated_key, new_identifier, new_priority); |
| std::unique_ptr<const Commit> commit2 = TryCommitJournal(std::move(journal), Status::OK); |
| |
| bool called = false; |
| storage_->GetDiffForCloud( |
| *commit2, [&](Status status, CommitIdView base_id, std::vector<EntryChange> changes) { |
| called = true; |
| ASSERT_EQ(status, Status::OK); |
| EXPECT_EQ(base_id, commit1->GetId()); |
| |
| EXPECT_THAT(changes, SizeIs(2)); |
| EXPECT_EQ(changes[0].entry.key, updated_key); |
| EXPECT_EQ(changes[0].entry.object_identifier, old_identifier); |
| EXPECT_EQ(changes[0].entry.priority, old_priority); |
| EXPECT_EQ(changes[0].deleted, true); |
| |
| EXPECT_EQ(changes[1].entry.key, updated_key); |
| EXPECT_EQ(changes[1].entry.object_identifier, new_identifier); |
| EXPECT_EQ(changes[1].entry.priority, new_priority); |
| EXPECT_EQ(changes[1].deleted, false); |
| }); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| } |
| |
| TEST_F(PageStorageTest, GetDiffForCloudEntryIdCorrectness) { |
| // Create an initial commit with 10 keys, then one having commit1 as a parent adding a key and |
| // then one having commit2 as a parent deleting the same key. |
| std::unique_ptr<const Commit> commit1 = TryCommitFromLocal(10); |
| ASSERT_TRUE(commit1); |
| |
| const std::string new_key = "new_key"; |
| const ObjectIdentifier new_identifier = RandomObjectIdentifier(); |
| const KeyPriority new_priority = KeyPriority::LAZY; |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put(new_key, new_identifier, new_priority); |
| std::unique_ptr<const Commit> commit2 = TryCommitJournal(std::move(journal), Status::OK); |
| |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Delete(new_key); |
| std::unique_ptr<const Commit> commit3 = TryCommitJournal(std::move(journal), Status::OK); |
| |
| // The entry_id of the inserted entry should be the same as the entry_id of the deleted one. |
| EntryId expected_entry_id; |
| bool called = false; |
| storage_->GetDiffForCloud( |
| *commit2, [&](Status status, CommitIdView base_id, std::vector<EntryChange> changes) { |
| called = true; |
| ASSERT_EQ(status, Status::OK); |
| EXPECT_EQ(base_id, commit1->GetId()); |
| |
| EXPECT_THAT(changes, SizeIs(1)); |
| EXPECT_EQ(changes[0].entry.key, new_key); |
| EXPECT_EQ(changes[0].entry.object_identifier, new_identifier); |
| EXPECT_EQ(changes[0].entry.priority, new_priority); |
| EXPECT_THAT(changes[0].entry.entry_id, Not(IsEmpty())); |
| EXPECT_EQ(changes[0].deleted, false); |
| expected_entry_id = changes[0].entry.entry_id; |
| }); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| called = false; |
| storage_->GetDiffForCloud( |
| *commit3, [&](Status status, CommitIdView base_id, std::vector<EntryChange> changes) { |
| called = true; |
| ASSERT_EQ(status, Status::OK); |
| EXPECT_EQ(base_id, commit2->GetId()); |
| |
| EXPECT_THAT(changes, SizeIs(1)); |
| EXPECT_EQ(changes[0].entry.key, new_key); |
| EXPECT_EQ(changes[0].entry.object_identifier, new_identifier); |
| EXPECT_EQ(changes[0].entry.priority, new_priority); |
| EXPECT_THAT(changes[0].entry.entry_id, Not(IsEmpty())); |
| EXPECT_EQ(changes[0].deleted, true); |
| |
| EXPECT_EQ(expected_entry_id, changes[0].entry.entry_id); |
| }); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| } |
| |
| TEST_F(PageStorageTest, WatcherForReEntrantCommits) { |
| FakeCommitWatcher watcher; |
| storage_->AddCommitWatcher(&watcher); |
| |
| bool called; |
| Status status; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit1; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(std::move(commit1)); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit2; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| EXPECT_EQ(watcher.commit_count, 2); |
| EXPECT_EQ(watcher.last_commit_id, commit2->GetId()); |
| } |
| |
| TEST_F(PageStorageTest, NoOpCommit) { |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| ASSERT_FALSE(heads.empty()); |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| |
| // Create a key, and delete it. |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| journal->Delete("key"); |
| |
| // Commit the journal. |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| |
| // Commiting a no-op commit should result in a successful status, but a null |
| // commit. |
| ASSERT_EQ(status, Status::OK); |
| ASSERT_FALSE(commit); |
| } |
| |
| // Check that receiving a remote commit and commiting the same commit locally at |
| // the same time do not prevent the commit to be marked as unsynced. |
| TEST_F(PageStorageTest, MarkRemoteCommitSyncedRace) { |
| // We need a commit that we can add both "from sync" and locally. For this |
| // purpose, we use a merge commit: we create a conflict, then a merge. We |
| // propagate the conflicting commits through synchronization, and then both |
| // add the merge and create it locally. |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> base_commit = GetFirstHead(); |
| |
| ObjectIdentifier value_1 = RandomInlineObjectIdentifier(); |
| ObjectIdentifier value_2 = RandomInlineObjectIdentifier(); |
| ObjectIdentifier merge_value = RandomInlineObjectIdentifier(); |
| ASSERT_NE(value_1, value_2); |
| |
| std::unique_ptr<Journal> journal1 = storage_->StartCommit(base_commit->Clone()); |
| journal1->Put("key", value_1, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit1; |
| storage_->CommitJournal(std::move(journal1), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| RunLoopFor(zx::sec(1)); |
| |
| std::unique_ptr<Journal> journal2 = storage_->StartCommit(base_commit->Clone()); |
| journal2->Put("key", value_2, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit2; |
| storage_->CommitJournal(std::move(journal2), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Create a merge. |
| std::unique_ptr<Journal> journal3 = |
| storage_->StartMergeCommit(commit1->Clone(), commit2->Clone()); |
| journal3->Put("key", merge_value, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit3; |
| storage_->CommitJournal(std::move(journal3), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit3)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| CommitId id3 = commit3->GetId(); |
| std::map<ObjectIdentifier, std::string> object_data_base; |
| object_data_base[commit1->GetRootIdentifier()] = |
| convert::ToString(TryGetPiece(commit1->GetRootIdentifier())->GetData()); |
| object_data_base[commit2->GetRootIdentifier()] = |
| convert::ToString(TryGetPiece(commit2->GetRootIdentifier())->GetData()); |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes_base; |
| commits_and_bytes_base.emplace_back(commit1->GetId(), |
| convert::ToString(commit1->GetStorageBytes())); |
| commits_and_bytes_base.emplace_back(commit2->GetId(), |
| convert::ToString(commit2->GetStorageBytes())); |
| |
| std::map<ObjectIdentifier, std::string> object_data_merge; |
| object_data_merge[commit3->GetRootIdentifier()] = |
| convert::ToString(TryGetPiece(commit3->GetRootIdentifier())->GetData()); |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes_merge; |
| commits_and_bytes_merge.emplace_back(commit3->GetId(), |
| convert::ToString(commit3->GetStorageBytes())); |
| |
| // We have extracted the commit and object data. We now reset the state of |
| // PageStorage so we can add them again (in a controlled manner). |
| base_commit.reset(); |
| commit1.reset(); |
| commit2.reset(); |
| commit3.reset(); |
| ResetStorage(); |
| RetrackIdentifier(&merge_value); |
| |
| // This does not need diffs. |
| FakeSyncDelegate sync(SyncFeatures::kNoDiff); |
| storage_->SetSyncDelegate(&sync); |
| for (const auto& data : object_data_base) { |
| sync.AddObject(data.first, data.second, ObjectAvailability::P2P_AND_CLOUD); |
| } |
| |
| // Start adding the remote commit. |
| bool commits_from_sync_called; |
| Status commits_from_sync_status; |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes_base), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&commits_from_sync_called), &commits_from_sync_status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(commits_from_sync_called); |
| EXPECT_EQ(commits_from_sync_status, Status::OK); |
| ASSERT_EQ(GetHeads().size(), 2u); |
| |
| std::vector<fit::closure> sync_delegate_calls; |
| DelayingFakeSyncDelegate sync2( |
| [&sync_delegate_calls](fit::closure callback) { |
| sync_delegate_calls.push_back(std::move(callback)); |
| }, |
| [](auto callback) { callback(); }, SyncFeatures::kNoDiff); |
| storage_->SetSyncDelegate(&sync2); |
| |
| for (const auto& data : object_data_merge) { |
| sync2.AddObject(data.first, data.second, ObjectAvailability::P2P_AND_CLOUD); |
| } |
| storage_->AddCommitsFromSync( |
| std::move(commits_and_bytes_merge), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&commits_from_sync_called), &commits_from_sync_status)); |
| |
| // Make the loop run until GetObject is called in sync, and before |
| // AddCommitsFromSync finishes. |
| RunLoopUntilIdle(); |
| EXPECT_THAT(sync_delegate_calls, Not(IsEmpty())); |
| EXPECT_FALSE(commits_from_sync_called); |
| |
| // Add the local commit. |
| auto heads = GetHeads(); |
| Status commits_from_local_status; |
| std::unique_ptr<Journal> journal = |
| storage_->StartMergeCommit(std::move(heads[0]), std::move(heads[1])); |
| journal->Put("key", merge_value, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), ledger::Capture(ledger::SetWhenCalled(&called), |
| &commits_from_local_status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(commits_from_local_status, Status::OK); |
| EXPECT_FALSE(commits_from_sync_called); |
| |
| EXPECT_EQ(commit->GetId(), id3); |
| |
| // The local commit should be commited. |
| for (auto& callback : sync_delegate_calls) { |
| callback(); |
| } |
| |
| // Let the two AddCommit finish. |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(commits_from_sync_called); |
| EXPECT_EQ(commits_from_sync_status, Status::OK); |
| EXPECT_EQ(commits_from_local_status, Status::OK); |
| |
| // Verify that the commit is added correctly. |
| storage_->GetCommit(id3, ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // The commit should be marked as synced. |
| EXPECT_EQ(GetUnsyncedCommits().size(), 0u); |
| } |
| |
| // Verifies that GetUnsyncedCommits() returns commits ordered by their |
| // generation, and not by the timestamp. |
| // |
| // In this test the commits have the following structure: |
| // (root) |
| // / | \ |
| // (A) (B) (C) |
| // \ / |
| // (merge) |
| // C is the last commit to be created. The test verifies that the unsynced |
| // commits are returned in the generation order, with the merge commit being the |
| // last despite not being the most recent. |
| TEST_F(PageStorageTest, GetUnsyncedCommits) { |
| std::unique_ptr<const Commit> root = GetFirstHead(); |
| std::unique_ptr<Journal> journal_a = storage_->StartCommit(root->Clone()); |
| journal_a->Put("a", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_a = TryCommitJournal(std::move(journal_a), Status::OK); |
| ASSERT_TRUE(commit_a); |
| EXPECT_EQ(commit_a->GetGeneration(), 1u); |
| |
| std::unique_ptr<Journal> journal_b = storage_->StartCommit(root->Clone()); |
| journal_b->Put("b", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_b = TryCommitJournal(std::move(journal_b), Status::OK); |
| ASSERT_TRUE(commit_b); |
| EXPECT_EQ(commit_b->GetGeneration(), 1u); |
| |
| std::unique_ptr<Journal> journal_merge = |
| storage_->StartMergeCommit(std::move(commit_a), std::move(commit_b)); |
| |
| std::unique_ptr<const Commit> commit_merge = |
| TryCommitJournal(std::move(journal_merge), Status::OK); |
| ASSERT_TRUE(commit_merge); |
| EXPECT_EQ(commit_merge->GetGeneration(), 2u); |
| |
| std::unique_ptr<Journal> journal_c = storage_->StartCommit(std::move(root)); |
| journal_c->Put("c", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_c = TryCommitJournal(std::move(journal_c), Status::OK); |
| ASSERT_TRUE(commit_c); |
| EXPECT_EQ(commit_c->GetGeneration(), 1u); |
| |
| // Verify that the merge commit is returned as last, even though commit C is |
| // older. |
| std::vector<std::unique_ptr<const Commit>> unsynced_commits = GetUnsyncedCommits(); |
| EXPECT_EQ(unsynced_commits.size(), 4u); |
| EXPECT_EQ(unsynced_commits.back()->GetId(), commit_merge->GetId()); |
| EXPECT_LT(commit_merge->GetTimestamp(), commit_c->GetTimestamp()); |
| } |
| |
| TEST_F(PageStorageTest, GetMergeCommitIdsEmpty) { |
| std::unique_ptr<const Commit> parent1 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent1); |
| |
| std::unique_ptr<const Commit> parent2 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent2); |
| |
| // Check that there is no merge of |parent1| and |parent2|. |
| bool called; |
| Status status; |
| std::vector<CommitId> merges; |
| storage_->GetMergeCommitIds(parent1->GetId(), parent2->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &merges)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_THAT(merges, IsEmpty()); |
| } |
| |
| // Add a commit for which we don't have its parent. Verify that an error is returned. |
| TEST_F(PageStorageTest, AddCommitsMissingParent) { |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| std::vector<std::unique_ptr<const Commit>> parent; |
| parent.emplace_back(GetFirstHead()); |
| auto commit_parent = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parent)); |
| parent.clear(); |
| parent.push_back(commit_parent->Clone()); |
| auto commit_child = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parent)); |
| |
| std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes; |
| commits_and_bytes.emplace_back(commit_child->GetId(), |
| convert::ToString(commit_child->GetStorageBytes())); |
| |
| bool called; |
| Status status; |
| storage_->AddCommitsFromSync(std::move(commits_and_bytes), ChangeSource::P2P, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::INTERNAL_NOT_FOUND); |
| } |
| |
| TEST_F(PageStorageTest, GetMergeCommitIdsNonEmpty) { |
| std::unique_ptr<const Commit> parent1 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent1); |
| |
| std::unique_ptr<const Commit> parent2 = TryCommitFromLocal(3); |
| ASSERT_TRUE(parent2); |
| |
| std::unique_ptr<Journal> journal = storage_->StartMergeCommit(parent1->Clone(), parent2->Clone()); |
| |
| std::unique_ptr<const Commit> merge = TryCommitJournal(std::move(journal), Status::OK); |
| ASSERT_TRUE(merge); |
| |
| // Check that |merge| is in the list of merges. |
| bool called; |
| Status status; |
| std::vector<CommitId> merges; |
| storage_->GetMergeCommitIds(parent1->GetId(), parent2->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &merges)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_THAT(merges, ElementsAre(merge->GetId())); |
| |
| storage_->GetMergeCommitIds(parent2->GetId(), parent1->GetId(), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &merges)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_THAT(merges, ElementsAre(merge->GetId())); |
| } |
| |
| TEST_F(PageStorageTest, AddLocalCommitsInterrupted) { |
| // Destroy PageStorage while a local commit is in progress. |
| bool called; |
| Status status; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| |
| // Destroy the PageStorageImpl object during the first async operation of |
| // CommitJournal. |
| async::PostTask(dispatcher(), [this]() { storage_.reset(); }); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| EXPECT_TRUE(RunLoopUntilIdle()); |
| // The callback is eaten by the destruction of |storage_|, so we are not |
| // expecting to be called. However, we do not crash. |
| } |
| |
| TEST_F(PageStorageTest, GetCommitRootIdentifier) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> base_commit = GetFirstHead(); |
| std::unique_ptr<Journal> journal = storage_->StartCommit(base_commit->Clone()); |
| ObjectIdentifier value = RandomInlineObjectIdentifier(); |
| journal->Put("key", value, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(commit); |
| |
| ObjectIdentifier root_id = commit->GetRootIdentifier(); |
| std::string root_data = convert::ToString(TryGetPiece(root_id)->GetData()); |
| CommitId commit_id = commit->GetId(); |
| std::string commit_data = convert::ToString(commit->GetStorageBytes()); |
| auto commit_id_and_bytes = CommitAndBytesFromCommit(*commit); |
| |
| commit.reset(); |
| ResetStorage(); |
| |
| bool sync_delegate_called; |
| fit::closure sync_delegate_call; |
| DelayingFakeSyncDelegate sync( |
| ledger::Capture(ledger::SetWhenCalled(&sync_delegate_called), &sync_delegate_call), |
| [](auto callback) { callback(); }, SyncFeatures::kNoDiff); |
| storage_->SetSyncDelegate(&sync); |
| sync.AddObject(root_id, root_data, ObjectAvailability::P2P_AND_CLOUD); |
| |
| // Start adding the remote commit. |
| bool commits_from_sync_called; |
| Status commits_from_sync_status; |
| storage_->AddCommitsFromSync( |
| std::move(commit_id_and_bytes), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&commits_from_sync_called), &commits_from_sync_status)); |
| RunLoopUntilIdle(); |
| EXPECT_FALSE(commits_from_sync_called); |
| EXPECT_TRUE(sync_delegate_called); |
| ASSERT_TRUE(sync_delegate_call); |
| |
| // AddCommitsFromSync is waiting in GetObject. |
| ObjectIdentifier root_id_from_storage; |
| PageStorageImplAccessorForTest::GetCommitRootIdentifier( |
| storage_, commit_id, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &root_id_from_storage)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(root_id_from_storage, root_id); |
| |
| // Unblock AddCommitsFromSync. |
| sync_delegate_call(); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(commits_from_sync_called); |
| EXPECT_EQ(commits_from_sync_status, Status::OK); |
| |
| // The map is empty, and the root identifier is fetched from the database. |
| EXPECT_TRUE(PageStorageImplAccessorForTest::RootCommitIdentifierMapIsEmpty(storage_)); |
| PageStorageImplAccessorForTest::GetCommitRootIdentifier( |
| storage_, commit_id, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &root_id_from_storage)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(root_id_from_storage, root_id); |
| } |
| |
| TEST_P(PageStorageSyncTest, GetCommitRootIdentifierFailedToAdd) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> base_commit = GetFirstHead(); |
| std::unique_ptr<Journal> journal = storage_->StartCommit(base_commit->Clone()); |
| ObjectIdentifier value = RandomInlineObjectIdentifier(); |
| journal->Put("key", value, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(commit); |
| |
| ObjectIdentifier root_id = commit->GetRootIdentifier(); |
| std::string root_data = convert::ToString(TryGetPiece(root_id)->GetData()); |
| CommitId commit_id = commit->GetId(); |
| std::string commit_data = convert::ToString(commit->GetStorageBytes()); |
| auto commit_id_and_bytes = CommitAndBytesFromCommit(*commit); |
| |
| commit.reset(); |
| ResetStorage(); |
| |
| FakeSyncDelegate sync(GetParam()); |
| storage_->SetSyncDelegate(&sync); |
| // We do not add the root object nor the diff: GetObject will fail. |
| |
| // Add the remote commit. |
| bool commits_from_sync_called; |
| Status commits_from_sync_status; |
| storage_->AddCommitsFromSync( |
| std::move(commit_id_and_bytes), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&commits_from_sync_called), &commits_from_sync_status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(commits_from_sync_called); |
| EXPECT_EQ(commits_from_sync_status, Status::INTERNAL_NOT_FOUND); |
| |
| // The commit id to root identifier mapping is still available. |
| ObjectIdentifier root_id_from_storage; |
| PageStorageImplAccessorForTest::GetCommitRootIdentifier( |
| storage_, commit_id, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &root_id_from_storage)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(root_id_from_storage, root_id); |
| } |
| |
| TEST_F(PageStorageTest, GetCommitIdFromRemoteId) { |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> base_commit = GetFirstHead(); |
| std::unique_ptr<Journal> journal = storage_->StartCommit(base_commit->Clone()); |
| ObjectIdentifier value = RandomInlineObjectIdentifier(); |
| journal->Put("key", value, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(commit); |
| |
| ObjectIdentifier root_id = commit->GetRootIdentifier(); |
| std::string root_data = convert::ToString(TryGetPiece(root_id)->GetData()); |
| CommitId commit_id = commit->GetId(); |
| std::string commit_data = convert::ToString(commit->GetStorageBytes()); |
| auto commit_id_and_bytes = CommitAndBytesFromCommit(*commit); |
| std::string remote_commit_id = encryption_service_.EncodeCommitId(commit_id); |
| |
| ResetStorage(); |
| EXPECT_TRUE(PageStorageImplAccessorForTest::RemoteCommitIdMapIsEmpty(storage_)); |
| |
| bool sync_delegate_called; |
| fit::closure sync_delegate_call; |
| DelayingFakeSyncDelegate sync( |
| ledger::Capture(ledger::SetWhenCalled(&sync_delegate_called), &sync_delegate_call)); |
| storage_->SetSyncDelegate(&sync); |
| sync.AddObject(root_id, root_data, ObjectAvailability::P2P_AND_CLOUD); |
| |
| // Start adding the remote commit. |
| bool commits_from_sync_called; |
| Status commits_from_sync_status; |
| storage_->AddCommitsFromSync( |
| std::move(commit_id_and_bytes), ChangeSource::CLOUD, |
| ledger::Capture(ledger::SetWhenCalled(&commits_from_sync_called), &commits_from_sync_status)); |
| RunLoopUntilIdle(); |
| EXPECT_FALSE(commits_from_sync_called); |
| EXPECT_TRUE(sync_delegate_called); |
| ASSERT_TRUE(sync_delegate_call); |
| |
| // AddCommitsFromSync is waiting in GetObject. |
| CommitId commit_id_from_storage; |
| storage_->GetCommitIdFromRemoteId( |
| remote_commit_id, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_id_from_storage)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(commit_id_from_storage, commit_id); |
| |
| // Unblock AddCommitsFromSync. |
| sync_delegate_call(); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(commits_from_sync_called); |
| EXPECT_EQ(commits_from_sync_status, Status::OK); |
| |
| // The map is empty, and the root identifier is fetched from the database. |
| EXPECT_TRUE(PageStorageImplAccessorForTest::RemoteCommitIdMapIsEmpty(storage_)); |
| storage_->GetCommitIdFromRemoteId( |
| remote_commit_id, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_id_from_storage)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(commit_id_from_storage, commit_id); |
| } |
| |
| TEST_F(PageStorageTest, ChooseDiffBases) { |
| // We have the following commit tree, with the uppercase commits synced and the lowercase |
| // commits unsynced. |
| // |
| // (root) |
| // / | \ |
| // (A) (B) (C) |
| // \ / | |
| // (e) (D) |
| // | |
| // (f) |
| // |
| // The set of sync heads is {A, B, D}, and they are used as diff bases. |
| std::vector<std::unique_ptr<const Commit>> heads = GetHeads(); |
| |
| // Build the tree. |
| bool called; |
| Status status; |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_A; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_A)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_B; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_B)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_C; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_C)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(commit_C->Clone()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_D; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_D)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartMergeCommit(commit_A->Clone(), commit_B->Clone()); |
| std::unique_ptr<const Commit> commit_e; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_e)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| journal = storage_->StartCommit(commit_e->Clone()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_f; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_f)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // Mark A,B,C,D as synced. |
| for (auto& commit_id : |
| {commit_A->GetId(), commit_B->GetId(), commit_C->GetId(), commit_D->GetId()}) { |
| storage_->MarkCommitSynced(commit_id, ledger::Capture(ledger::SetWhenCalled(&called), &status)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| } |
| |
| // Check that the diff bases are the sync heads. |
| std::vector<CommitId> sync_heads; |
| // The target commit is ignored. |
| CommitId target_id = commit_f->GetId(); |
| PageStorageImplAccessorForTest::ChooseDiffBases( |
| storage_, target_id, ledger::Capture(ledger::SetWhenCalled(&called), &status, &sync_heads)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_THAT(sync_heads, |
| UnorderedElementsAre(commit_A->GetId(), commit_B->GetId(), commit_D->GetId())); |
| } |
| |
| // Checks that the RetrievedObjectType can vary for a given piece depending on why we're reading |
| // it. |
| TEST_F(PageStorageTest, GetPieceRetrievedObjectType) { |
| // Build a random, valid tree node. |
| std::vector<Entry> entries; |
| CreateEntries(1000, &entries); |
| std::string data_str = btree::EncodeNode(0, entries, {}); |
| ASSERT_TRUE(btree::CheckValidTreeNodeSerialization(data_str)); |
| |
| // Split the tree node content into pieces and add them to a SyncDelegate to be |
| // retrieved by GetObject. |
| FakeSyncDelegate sync(SyncFeatures::kNoDiff); |
| std::map<ObjectDigest, ObjectIdentifier> digest_to_identifier; |
| ObjectIdentifier object_identifier = |
| ForEachPiece(data_str, ObjectType::TREE_NODE, &fake_factory_, |
| [&sync, &digest_to_identifier](std::unique_ptr<const Piece> piece) { |
| ObjectIdentifier piece_identifier = piece->GetIdentifier(); |
| if (GetObjectDigestInfo(piece_identifier.object_digest()).is_inlined()) { |
| return; |
| } |
| digest_to_identifier[piece_identifier.object_digest()] = piece_identifier; |
| sync.AddObject(std::move(piece_identifier), |
| convert::ToString(piece->GetData()), ObjectAvailability::P2P); |
| }); |
| ASSERT_EQ(GetObjectDigestInfo(object_identifier.object_digest()).piece_type, PieceType::INDEX); |
| storage_->SetSyncDelegate(&sync); |
| |
| // Get a non-root piece as a value. |
| ObjectIdentifier non_root_piece_identifier; |
| for (const auto& [digest, identifier] : digest_to_identifier) { |
| if (identifier != object_identifier) { |
| non_root_piece_identifier = identifier; |
| break; |
| } |
| } |
| ASSERT_NE(non_root_piece_identifier, ObjectIdentifier()); |
| EXPECT_NE(non_root_piece_identifier, object_identifier); |
| // We are cheating and re-adding the object as available from both P2P and cloud without its |
| // content: this will only update the object availability. |
| sync.AddObject(non_root_piece_identifier, "", ObjectAvailability::P2P_AND_CLOUD); |
| |
| std::unique_ptr<const Object> value_object = |
| TryGetObject(non_root_piece_identifier, PageStorage::Location::ValueFromNetwork()); |
| EXPECT_THAT(sync.object_requests, |
| ElementsAre(Pair(non_root_piece_identifier, RetrievedObjectType::BLOB))); |
| |
| // Reset and fetch the whole tree. |
| sync.object_requests.clear(); |
| ResetStorage(); |
| storage_->SetSyncDelegate(&sync); |
| RetrackIdentifier(&non_root_piece_identifier); |
| RetrackIdentifier(&object_identifier); |
| |
| // Get the tree node containing the non-root piece. |
| CommitId dummy_commit = GetFirstHead()->GetId(); |
| std::unique_ptr<const Object> node_object = |
| TryGetObject(object_identifier, PageStorage::Location::TreeNodeFromNetwork(dummy_commit)); |
| EXPECT_THAT(sync.object_requests, |
| Contains(Pair(non_root_piece_identifier, RetrievedObjectType::TREE_NODE))); |
| EXPECT_THAT(sync.object_requests, |
| Not(Contains(Pair(non_root_piece_identifier, RetrievedObjectType::BLOB)))); |
| } |
| |
| TEST_F(PageStorageTest, UpdateClock) { |
| ResetStorage(CommitPruningPolicy::LOCAL_IMMEDIATE); |
| |
| // The clock should be empty; |
| bool called; |
| Status status; |
| Clock clock0; |
| storage_->GetClock(Capture(ledger::SetWhenCalled(&called), &status, &clock0)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| EXPECT_THAT(clock0, IsEmpty()); |
| |
| // Build the tree. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_A; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_A)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // The clock should contain one element, and point to the current head commit. |
| Clock clock1; |
| storage_->GetClock(Capture(ledger::SetWhenCalled(&called), &status, &clock1)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| EXPECT_THAT(clock1, ElementsAre(Pair(_, DeviceClockMatchesCommit(*commit_A)))); |
| |
| // It is updated after a new single head is present |
| |
| journal = storage_->StartCommit(std::move(commit_A)); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit_B; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_B)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // The clock should contain one element, and point to the current head commit. |
| Clock clock2; |
| storage_->GetClock(Capture(ledger::SetWhenCalled(&called), &status, &clock2)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| // The device ID should not have changed. |
| const clocks::DeviceId device_id = clock1.begin()->first; |
| EXPECT_THAT(clock2, ElementsAre(Pair(device_id, DeviceClockMatchesCommit(*commit_B)))); |
| |
| // If there is a conflict, no clock update should occur. |
| std::unique_ptr<Journal> journal1 = storage_->StartCommit(commit_B->Clone()); |
| journal1->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| |
| std::unique_ptr<Journal> journal2 = storage_->StartCommit(std::move(commit_B)); |
| journal2->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| |
| std::unique_ptr<const Commit> commit_C; |
| storage_->CommitJournal(std::move(journal1), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_C)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| std::unique_ptr<const Commit> commit_D; |
| storage_->CommitJournal(std::move(journal2), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit_D)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| Clock clock3; |
| storage_->GetClock(Capture(ledger::SetWhenCalled(&called), &status, &clock3)); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| |
| EXPECT_EQ(clock3, clock2); |
| } |
| |
| TEST_F(PageStorageTest, GetGenerationAndMissingParents) { |
| std::unique_ptr<const btree::TreeNode> node; |
| ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node)); |
| ObjectIdentifier root_identifier = node->GetIdentifier(); |
| |
| // Send a commit with two parents, one missing and one not. |
| std::vector<std::unique_ptr<const Commit>> parents; |
| parents.push_back(GetFirstHead()); |
| std::unique_ptr<const Commit> missing_parent = |
| storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parents)); |
| |
| parents.clear(); |
| parents.push_back(GetFirstHead()); |
| parents.push_back(missing_parent->Clone()); |
| std::unique_ptr<const Commit> commit_to_add = storage_->GetCommitFactory()->FromContentAndParents( |
| environment_.clock(), environment_.random(), root_identifier, std::move(parents)); |
| |
| PageStorage::CommitIdAndBytes id_and_bytes(commit_to_add->GetId(), |
| convert::ToString(commit_to_add->GetStorageBytes())); |
| bool called; |
| Status status; |
| uint64_t generation; |
| std::vector<CommitId> missing; |
| storage_->GetGenerationAndMissingParents( |
| id_and_bytes, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &generation, &missing)); |
| RunLoopUntilIdle(); |
| |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(generation, commit_to_add->GetGeneration()); |
| EXPECT_THAT(missing, ElementsAre(missing_parent->GetId())); |
| } |
| |
| TEST_F(PageStorageTest, EagerLiveReferencesGarbageCollection) { |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| |
| // The object is available. |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| |
| // Release the references to the object. |
| UntrackIdentifier(&object_identifier); |
| piece.reset(); |
| |
| // Give some time for the object to be collected. |
| RunLoopUntilIdle(); |
| |
| // Check that it has been collected. |
| RetrackIdentifier(&object_identifier); |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| }); |
| } |
| |
| TEST_F(PageStorageTestEagerRootNodesGC, EagerRootNodesGarbageCollection) { |
| ResetStorage(CommitPruningPolicy::LOCAL_IMMEDIATE); |
| |
| RunInCoroutine([this](CoroutineHandler* handler) { |
| ObjectData data = MakeObject("Some data", InlineBehavior::PREVENT); |
| |
| bool called; |
| Status status; |
| ObjectIdentifier object_identifier; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, data.ToDataSource(), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier)); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_EQ(object_identifier, data.object_identifier); |
| |
| // The object is available. |
| std::unique_ptr<const Piece> piece; |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| |
| // Create 3 commits: The first one contains the object we just created. The other two are |
| // necessary so that the first one is pruned and its root identifier has no references. |
| |
| // Commit 1: Add the object in a commit. |
| std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead()); |
| journal->Put("key", object_identifier, KeyPriority::EAGER); |
| std::unique_ptr<const Commit> commit = TryCommitJournal(std::move(journal), Status::OK); |
| EXPECT_TRUE(commit); |
| ObjectIdentifier root_identifier = commit->GetRootIdentifier(); |
| |
| // Both the value object and the root node are available. |
| ASSERT_EQ(ReadObject(handler, object_identifier, &piece), Status::OK); |
| ASSERT_EQ(ReadObject(handler, root_identifier, &piece), Status::OK); |
| |
| // Commit 2: Add another object. |
| ObjectData more_data = MakeObject("Some more data", InlineBehavior::PREVENT); |
| ObjectIdentifier object_identifier2; |
| storage_->AddObjectFromLocal( |
| ObjectType::BLOB, more_data.ToDataSource(), {}, |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &object_identifier2)); |
| RunLoopUntilIdle(); |
| journal = storage_->StartCommit(std::move(commit)); |
| journal->Put("key", object_identifier2, KeyPriority::EAGER); |
| commit = TryCommitJournal(std::move(journal), Status::OK); |
| EXPECT_TRUE(commit); |
| |
| // Commit 3: Remove all contents. |
| journal = storage_->StartCommit(std::move(commit)); |
| journal->Clear(); |
| commit = TryCommitJournal(std::move(journal), Status::OK); |
| EXPECT_TRUE(commit); |
| |
| // Release the references to the objects. |
| UntrackIdentifier(&object_identifier); |
| UntrackIdentifier(&root_identifier); |
| piece.reset(); |
| |
| // Give some time for the object to be collected. |
| RunLoopUntilIdle(); |
| |
| // Check that it has been collected. |
| RetrackIdentifier(&object_identifier); |
| EXPECT_EQ(ReadObject(handler, object_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| RetrackIdentifier(&root_identifier); |
| EXPECT_EQ(ReadObject(handler, root_identifier, &piece), Status::INTERNAL_NOT_FOUND); |
| }); |
| } |
| |
| // Tests that the object identifiers of parents are not garbage collected between the time the |
| // tree is written to disk, and the time the commit is written to disk. |
| TEST_F(PageStorageTest, CommitJournalKeepsParents) { |
| // We create the following tree, with commits numbered by creation order: |
| // (1) |
| // / \ |
| // (2) (3) |
| // Between (2) and (3), we mark (1) and (2) and their root objects as synced: the root object of |
| // (1) becomes garbage collectable as soon as we drop our reference to (1). |
| // Create two commits, mark the first one as synced, as well as its root object. |
| bool called; |
| Status status; |
| std::unique_ptr<const Commit> commit1 = TryCommitFromLocal(10); |
| ASSERT_TRUE(commit1); |
| std::unique_ptr<const Commit> commit2 = TryCommitFromLocal(10); |
| ASSERT_TRUE(commit2); |
| |
| CommitId commit1_id = commit1->GetId(); |
| ObjectIdentifier commit1_root_identifier = commit1->GetRootIdentifier(); |
| MarkCommitSynced(commit1_id); |
| MarkCommitSynced(commit2->GetId()); |
| MarkPieceSynced(commit1_root_identifier); |
| UntrackIdentifier(&commit1_root_identifier); |
| |
| std::unique_ptr<Journal> journal = storage_->StartCommit(std::move(commit1)); |
| journal->Put("key", RandomObjectIdentifier(), KeyPriority::EAGER); |
| // Take the commit lock before committing. This will leave time for garbage collection. |
| fit::closure unlock; |
| PageStorageImplAccessorForTest::GetCommitSerializer(storage_).Serialize( |
| [] {}, ledger::Capture(ledger::SetWhenCalled(&called), &unlock)); |
| ASSERT_TRUE(called); |
| std::unique_ptr<const Commit> commit3; |
| storage_->CommitJournal(std::move(journal), |
| ledger::Capture(ledger::SetWhenCalled(&called), &status, &commit3)); |
| RunLoopUntilIdle(); |
| EXPECT_FALSE(called); |
| unlock(); |
| RunLoopUntilIdle(); |
| ASSERT_TRUE(called); |
| EXPECT_EQ(status, Status::OK); |
| EXPECT_TRUE(commit3); |
| |
| // The root of |commit1| is alive, so the diff can be computed. |
| called = false; |
| storage_->GetDiffForCloud(*commit3, [&](Status status, CommitIdView base_commit_id, |
| std::vector<EntryChange> /*changes*/) { |
| called = true; |
| EXPECT_EQ(base_commit_id, commit1_id); |
| EXPECT_EQ(status, Status::OK); |
| }); |
| RunLoopUntilIdle(); |
| EXPECT_TRUE(called); |
| ASSERT_GT(PageStorageImplAccessorForTest::CountLiveReferences( |
| storage_, commit1_root_identifier.object_digest()), |
| 0); |
| |
| // Mark commit3 as synced: the root of commit1 is not needed anymore. |
| MarkCommitSynced(commit3->GetId()); |
| ASSERT_EQ(PageStorageImplAccessorForTest::CountLiveReferences( |
| storage_, commit1_root_identifier.object_digest()), |
| 0); |
| } |
| |
| } // namespace |
| |
| } // namespace storage |