// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <dirent.h>
#include <lib/async/cpp/task.h>
#include <lib/callback/capture.h>
#include <lib/callback/set_when_called.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/timekeeper/test_clock.h>

#include <chrono>
#include <memory>
#include <queue>
#include <set>

#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "peridot/lib/scoped_tmpfs/scoped_tmpfs.h"
#include "src/ledger/bin/encryption/fake/fake_encryption_service.h"
#include "src/ledger/bin/encryption/primitives/hash.h"
#include "src/ledger/bin/storage/impl/btree/encoding.h"
#include "src/ledger/bin/storage/impl/btree/tree_node.h"
#include "src/ledger/bin/storage/impl/commit_impl.h"
#include "src/ledger/bin/storage/impl/commit_random_impl.h"
#include "src/ledger/bin/storage/impl/constants.h"
#include "src/ledger/bin/storage/impl/journal_impl.h"
#include "src/ledger/bin/storage/impl/leveldb.h"
#include "src/ledger/bin/storage/impl/object_digest.h"
#include "src/ledger/bin/storage/impl/object_impl.h"
#include "src/ledger/bin/storage/impl/page_db_empty_impl.h"
#include "src/ledger/bin/storage/impl/page_storage_impl.h"
#include "src/ledger/bin/storage/impl/split.h"
#include "src/ledger/bin/storage/impl/storage_test_utils.h"
#include "src/ledger/bin/storage/public/commit_watcher.h"
#include "src/ledger/bin/storage/public/constants.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/bin/testing/test_with_environment.h"
#include "src/ledger/lib/coroutine/coroutine.h"
#include "src/lib/files/directory.h"
#include "src/lib/files/file.h"
#include "src/lib/files/path.h"
#include "src/lib/fxl/arraysize.h"
#include "src/lib/fxl/macros.h"
#include "src/lib/fxl/memory/ref_ptr.h"
#include "src/lib/fxl/strings/string_printf.h"

namespace storage {

class PageStorageImplAccessorForTest {
 public:
  static void AddPiece(const std::unique_ptr<PageStorageImpl>& storage,
                       std::unique_ptr<Piece> piece, ChangeSource source,
                       IsObjectSynced is_object_synced,
                       ObjectReferencesAndPriority references,
                       fit::function<void(Status)> callback) {
    storage->AddPiece(std::move(piece), source, is_object_synced,
                      std::move(references), std::move(callback));
  }

  static PageDb& GetDb(const std::unique_ptr<PageStorageImpl>& storage) {
    return *(storage->db_);
  }
};

namespace {

using ::coroutine::CoroutineHandler;
using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::UnorderedElementsAreArray;

std::vector<PageStorage::CommitIdAndBytes> CommitAndBytesFromCommit(
    const Commit& commit) {
  std::vector<PageStorage::CommitIdAndBytes> result;
  result.emplace_back(commit.GetId(), commit.GetStorageBytes().ToString());
  return result;
}

// DataSource that returns an error on the callback to Get().
class FakeErrorDataSource : public DataSource {
 public:
  explicit FakeErrorDataSource(async_dispatcher_t* dispatcher)
      : dispatcher_(dispatcher) {}

  uint64_t GetSize() override { return 1; }

  void Get(fit::function<void(std::unique_ptr<DataChunk>, Status)> callback)
      override {
    async::PostTask(dispatcher_, [callback = std::move(callback)] {
      callback(nullptr, DataSource::Status::ERROR);
    });
  }

  async_dispatcher_t* const dispatcher_;
};

class FakeCommitWatcher : public CommitWatcher {
 public:
  FakeCommitWatcher() {}

  void OnNewCommits(const std::vector<std::unique_ptr<const Commit>>& commits,
                    ChangeSource source) override {
    ++commit_count;
    last_commit_id = commits.back()->GetId();
    last_source = source;
  }

  int commit_count = 0;
  CommitId last_commit_id;
  ChangeSource last_source;
};

class DelayingFakeSyncDelegate : public PageSyncDelegate {
 public:
  explicit DelayingFakeSyncDelegate(
      fit::function<void(fit::closure)> on_get_object)
      : on_get_object_(std::move(on_get_object)) {}

  void AddObject(ObjectIdentifier object_identifier, const std::string& value) {
    digest_to_value_[std::move(object_identifier)] = value;
  }

  void GetObject(ObjectIdentifier object_identifier,
                 fit::function<void(Status, ChangeSource, IsObjectSynced,
                                    std::unique_ptr<DataSource::DataChunk>)>
                     callback) override {
    auto value_found = digest_to_value_.find(object_identifier);
    if (value_found == digest_to_value_.end()) {
      callback(Status::INTERNAL_NOT_FOUND, ChangeSource::CLOUD,
               IsObjectSynced::NO, nullptr);
      return;
    }
    std::string& value = value_found->second;
    object_requests.insert(std::move(object_identifier));
    on_get_object_([callback = std::move(callback), value] {
      callback(Status::OK, ChangeSource::CLOUD, IsObjectSynced::YES,
               DataSource::DataChunk::Create(value));
    });
  }

  size_t GetNumberOfObjectsStored() { return digest_to_value_.size(); }

  std::set<ObjectIdentifier> object_requests;

 private:
  fit::function<void(fit::closure)> on_get_object_;
  std::map<ObjectIdentifier, std::string> digest_to_value_;
};

class FakeSyncDelegate : public DelayingFakeSyncDelegate {
 public:
  FakeSyncDelegate()
      : DelayingFakeSyncDelegate([](fit::closure callback) { callback(); }) {}
};

// Implements |Init()|, |CreateJournalId() and |StartBatch()| and fails with a
// |NOT_IMPLEMENTED| error in all other cases.
class FakePageDbImpl : public PageDbEmptyImpl {
 public:
  FakePageDbImpl(rng::Random* random) : random_(random) {}

  Status StartBatch(CoroutineHandler* /*handler*/,
                    std::unique_ptr<PageDb::Batch>* batch) override {
    *batch = std::make_unique<FakePageDbImpl>(random_);
    return Status::OK;
  }

 private:
  rng::Random* const random_;
};

// Shim for LevelDB that allows to selectively fail some calls.
class ControlledLevelDb : public Db {
 public:
  explicit ControlledLevelDb(async_dispatcher_t* dispatcher,
                             ledger::DetachedPath db_path)
      : leveldb_(dispatcher, db_path) {}

  class ControlledBatch : public Batch {
   public:
    explicit ControlledBatch(ControlledLevelDb* controller,
                             std::unique_ptr<Batch> batch)
        : controller_(controller), batch_(std::move(batch)) {}

    // Batch:
    Status Put(coroutine::CoroutineHandler* handler,
               convert::ExtendedStringView key,
               fxl::StringView value) override {
      return batch_->Put(handler, key, value);
    }

    Status Delete(coroutine::CoroutineHandler* handler,
                  convert::ExtendedStringView key) override {
      return batch_->Delete(handler, key);
    }

    Status Execute(coroutine::CoroutineHandler* handler) override {
      if (controller_->fail_batch_execute_after_ == 0) {
        return Status::IO_ERROR;
      }
      if (controller_->fail_batch_execute_after_ > 0) {
        controller_->fail_batch_execute_after_--;
      }
      return batch_->Execute(handler);
    }

   private:
    ControlledLevelDb* controller_;
    std::unique_ptr<Batch> batch_;
  };

  Status Init() { return leveldb_.Init(); }

  // Sets the number of calls to |Batch::Execute()|, for batches generated by
  // this object, after which all calls would fail. It is used to simulate write
  // failures.
  // If |fail_batch_execute_after| is negative, or this method is not called,
  // |Batch::Execute()| calls will nevef fail.
  void SetFailBatchExecuteAfter(int fail_batch_execute_after) {
    fail_batch_execute_after_ = fail_batch_execute_after;
  }

  // Db:
  Status StartBatch(coroutine::CoroutineHandler* handler,
                    std::unique_ptr<Batch>* batch) override {
    std::unique_ptr<Batch> inner_batch;
    Status status = leveldb_.StartBatch(handler, &inner_batch);
    *batch = std::make_unique<ControlledBatch>(this, std::move(inner_batch));
    return status;
  }

  Status Get(coroutine::CoroutineHandler* handler,
             convert::ExtendedStringView key, std::string* value) override {
    return leveldb_.Get(handler, key, value);
  }

  Status HasKey(coroutine::CoroutineHandler* handler,
                convert::ExtendedStringView key) override {
    return leveldb_.HasKey(handler, key);
  }

  Status GetObject(coroutine::CoroutineHandler* handler,
                   convert::ExtendedStringView key,
                   ObjectIdentifier object_identifier,
                   std::unique_ptr<const Piece>* piece) override {
    return leveldb_.GetObject(handler, key, std::move(object_identifier),
                              piece);
  }

  Status GetByPrefix(coroutine::CoroutineHandler* handler,
                     convert::ExtendedStringView prefix,
                     std::vector<std::string>* key_suffixes) override {
    return leveldb_.GetByPrefix(handler, prefix, key_suffixes);
  }

  Status GetEntriesByPrefix(
      coroutine::CoroutineHandler* handler, convert::ExtendedStringView prefix,
      std::vector<std::pair<std::string, std::string>>* entries) override {
    return leveldb_.GetEntriesByPrefix(handler, prefix, entries);
  }

  Status GetIteratorAtPrefix(
      coroutine::CoroutineHandler* handler, convert::ExtendedStringView prefix,
      std::unique_ptr<Iterator<const std::pair<convert::ExtendedStringView,
                                               convert::ExtendedStringView>>>*
          iterator) override {
    return leveldb_.GetIteratorAtPrefix(handler, prefix, iterator);
  }

 private:
  // Number of calls to |Batch::Execute()| before they start failing. If
  // negative, |Batch::Execute()| calls will never fail.
  int fail_batch_execute_after_ = -1;
  LevelDb leveldb_;
};

class PageStorageTest : public ledger::TestWithEnvironment {
 public:
  PageStorageTest() : encryption_service_(dispatcher()) {}

  ~PageStorageTest() override {}

  // Test:
  void SetUp() override { ResetStorage(); }

  void ResetStorage() {
    if (storage_) {
      storage_->SetSyncDelegate(nullptr);
      storage_.reset();
    }
    tmpfs_ = std::make_unique<scoped_tmpfs::ScopedTmpFS>();
    PageId id = RandomString(environment_.random(), 10);
    auto db = std::make_unique<ControlledLevelDb>(
        dispatcher(), ledger::DetachedPath(tmpfs_->root_fd()));
    leveldb_ = db.get();
    ASSERT_EQ(Status::OK, db->Init());
    storage_ = std::make_unique<PageStorageImpl>(
        &environment_, &encryption_service_, std::move(db), id);

    bool called;
    Status status;
    storage_->Init(
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_EQ(id, storage_->GetId());
  }

 protected:
  PageStorage* GetStorage() { return storage_.get(); }

  std::vector<std::unique_ptr<const Commit>> GetHeads() {
    std::vector<std::unique_ptr<const Commit>> heads;
    Status status = storage_->GetHeadCommits(&heads);
    EXPECT_EQ(Status::OK, status);
    return heads;
  }

  std::unique_ptr<const Commit> GetFirstHead() {
    std::vector<std::unique_ptr<const Commit>> heads = GetHeads();
    EXPECT_FALSE(heads.empty());
    return std::move(heads[0]);
  }

  std::unique_ptr<const Commit> GetCommit(const CommitId& id) {
    bool called;
    Status status;
    std::unique_ptr<const Commit> commit;
    storage_->GetCommit(id, callback::Capture(callback::SetWhenCalled(&called),
                                              &status, &commit));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    return commit;
  }

  std::unique_ptr<const Commit> TryCommitFromSync() {
    ObjectIdentifier root_identifier;
    EXPECT_TRUE(GetEmptyNodeIdentifier(&root_identifier));

    std::vector<std::unique_ptr<const Commit>> parent;
    parent.emplace_back(GetFirstHead());
    std::unique_ptr<const Commit> commit =
        CommitImpl::FromContentAndParents(environment_.clock(), storage_.get(),
                                          root_identifier, std::move(parent));

    bool called;
    Status status;
    std::vector<CommitId> missing_ids;
    storage_->AddCommitsFromSync(
        CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD,
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &missing_ids));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    return commit;
  }

  // Returns an empty pointer if |CommitJournal| times out.
  FXL_WARN_UNUSED_RESULT std::unique_ptr<const Commit> TryCommitJournal(
      std::unique_ptr<Journal> journal, Status expected_status) {
    bool called;
    Status status;
    std::unique_ptr<const Commit> commit;
    storage_->CommitJournal(
        std::move(journal),
        callback::Capture(callback::SetWhenCalled(&called), &status, &commit));

    RunLoopUntilIdle();
    EXPECT_EQ(expected_status, status);
    if (!called) {
      return std::unique_ptr<const Commit>();
    }
    return commit;
  }

  // Returns an empty pointer if |TryCommitJournal| failed.
  FXL_WARN_UNUSED_RESULT std::unique_ptr<const Commit> TryCommitFromLocal(
      int keys, size_t min_key_size = 0) {
    std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());

    for (int i = 0; i < keys; ++i) {
      auto key = fxl::StringPrintf("key%05d", i);
      if (key.size() < min_key_size) {
        key.resize(min_key_size);
      }
      journal->Put(key, RandomObjectIdentifier(environment_.random()),
                   KeyPriority::EAGER);
    }

    journal->Delete("key_does_not_exist");

    std::unique_ptr<const Commit> commit =
        TryCommitJournal(std::move(journal), Status::OK);
    if (!commit) {
      return commit;
    }

    // Check the contents.
    std::vector<Entry> entries = GetCommitContents(*commit);
    EXPECT_EQ(static_cast<size_t>(keys), entries.size());
    for (int i = 0; i < keys; ++i) {
      auto key = fxl::StringPrintf("key%05d", i);
      if (key.size() < min_key_size) {
        key.resize(min_key_size);
      }
      EXPECT_EQ(key, entries[i].key);
    }

    return commit;
  }

  void TryAddFromLocal(std::string content,
                       const ObjectIdentifier& expected_identifier) {
    bool called;
    Status status;
    ObjectIdentifier object_identifier;
    storage_->AddObjectFromLocal(
        ObjectType::BLOB, DataSource::Create(std::move(content)), {},
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &object_identifier));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_EQ(expected_identifier, object_identifier);
  }

  std::unique_ptr<const Object> TryGetObject(
      const ObjectIdentifier& object_identifier, PageStorage::Location location,
      Status expected_status = Status::OK) {
    bool called;
    Status status;
    std::unique_ptr<const Object> object;
    storage_->GetObject(
        object_identifier, location,
        callback::Capture(callback::SetWhenCalled(&called), &status, &object));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(expected_status, status);
    return object;
  }

  fsl::SizedVmo TryGetObjectPart(const ObjectIdentifier& object_identifier,
                                 size_t offset, size_t max_size,
                                 PageStorage::Location location,
                                 Status expected_status = Status::OK) {
    bool called;
    Status status;
    fsl::SizedVmo vmo;
    storage_->GetObjectPart(
        object_identifier, offset, max_size, location,
        callback::Capture(callback::SetWhenCalled(&called), &status, &vmo));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(expected_status, status);
    return vmo;
  }

  std::unique_ptr<const Piece> TryGetPiece(
      const ObjectIdentifier& object_identifier,
      Status expected_status = Status::OK) {
    bool called;
    Status status;
    std::unique_ptr<const Piece> piece;
    storage_->GetPiece(
        object_identifier,
        callback::Capture(callback::SetWhenCalled(&called), &status, &piece));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(expected_status, status);
    return piece;
  }

  std::vector<Entry> GetCommitContents(const Commit& commit) {
    bool called;
    Status status;
    std::vector<Entry> result;
    auto on_next = [&result](Entry e) {
      result.push_back(e);
      return true;
    };
    storage_->GetCommitContents(
        commit, "", std::move(on_next),
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    return result;
  }

  std::vector<std::unique_ptr<const Commit>> GetUnsyncedCommits() {
    bool called;
    Status status;
    std::vector<std::unique_ptr<const Commit>> commits;
    storage_->GetUnsyncedCommits(
        callback::Capture(callback::SetWhenCalled(&called), &status, &commits));
    RunLoopUntilIdle();
    EXPECT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    return commits;
  }

  Status WriteObject(
      CoroutineHandler* handler, ObjectData* data,
      PageDbObjectStatus object_status = PageDbObjectStatus::TRANSIENT,
      const ObjectReferencesAndPriority& references = {}) {
    return PageStorageImplAccessorForTest::GetDb(storage_).WriteObject(
        handler, DataChunkPiece(data->object_identifier, data->ToChunk()),
        object_status, references);
  }

  Status ReadObject(CoroutineHandler* handler,
                    ObjectIdentifier object_identifier,
                    std::unique_ptr<const Piece>* piece) {
    return PageStorageImplAccessorForTest::GetDb(storage_).ReadObject(
        handler, object_identifier, piece);
  }

  // Checks that |object_identifier| is referenced by |expected_references|.
  void CheckInboundObjectReferences(
      CoroutineHandler* handler, ObjectIdentifier object_identifier,
      ObjectReferencesAndPriority expected_references) {
    ObjectReferencesAndPriority stored_references;
    ASSERT_EQ(Status::OK,
              PageStorageImplAccessorForTest::GetDb(storage_)
                  .GetInboundObjectReferences(handler, object_identifier,
                                              &stored_references));
    EXPECT_THAT(stored_references,
                UnorderedElementsAreArray(expected_references));
  }

  // Checks that |object_identifier| is referenced by |expected_references|.
  void CheckInboundCommitReferences(
      CoroutineHandler* handler, ObjectIdentifier object_identifier,
      const std::vector<CommitId>& expected_references) {
    std::vector<CommitId> stored_references;
    ASSERT_EQ(Status::OK,
              PageStorageImplAccessorForTest::GetDb(storage_)
                  .GetInboundCommitReferences(handler, object_identifier,
                                              &stored_references));
    EXPECT_THAT(stored_references,
                UnorderedElementsAreArray(expected_references));
  }

  ::testing::AssertionResult ObjectIsUntracked(
      ObjectIdentifier object_identifier, bool expected_untracked) {
    bool called;
    Status status;
    bool is_untracked;
    storage_->ObjectIsUntracked(
        object_identifier, callback::Capture(callback::SetWhenCalled(&called),
                                             &status, &is_untracked));
    RunLoopUntilIdle();

    if (!called) {
      return ::testing::AssertionFailure()
             << "ObjectIsUntracked for id " << object_identifier
             << " didn't return.";
    }
    if (status != Status::OK) {
      return ::testing::AssertionFailure()
             << "ObjectIsUntracked for id " << object_identifier
             << " returned status " << status;
    }
    if (is_untracked != expected_untracked) {
      return ::testing::AssertionFailure()
             << "For id " << object_identifier
             << " expected to find the object " << (is_untracked ? "un" : "")
             << "tracked, but was " << (expected_untracked ? "un" : "")
             << "tracked, instead.";
    }
    return ::testing::AssertionSuccess();
  }

  ::testing::AssertionResult IsPieceSynced(ObjectIdentifier object_identifier,
                                           bool expected_synced) {
    bool called;
    Status status;
    bool is_synced;
    storage_->IsPieceSynced(object_identifier,
                            callback::Capture(callback::SetWhenCalled(&called),
                                              &status, &is_synced));
    RunLoopUntilIdle();

    if (!called) {
      return ::testing::AssertionFailure()
             << "IsPieceSynced for id " << object_identifier
             << " didn't return.";
    }
    if (status != Status::OK) {
      return ::testing::AssertionFailure()
             << "IsPieceSynced for id " << object_identifier
             << " returned status " << status;
    }
    if (is_synced != expected_synced) {
      return ::testing::AssertionFailure()
             << "For id " << object_identifier
             << " expected to find the object " << (is_synced ? "un" : "")
             << "synced, but was " << (expected_synced ? "un" : "")
             << "synced, instead.";
    }
    return ::testing::AssertionSuccess();
  }

  ::testing::AssertionResult CreateNodeFromIdentifier(
      ObjectIdentifier identifier,
      std::unique_ptr<const btree::TreeNode>* node) {
    bool called;
    Status status;
    std::unique_ptr<const btree::TreeNode> result;
    btree::TreeNode::FromIdentifier(
        GetStorage(), std::move(identifier),
        callback::Capture(callback::SetWhenCalled(&called), &status, &result));
    RunLoopUntilIdle();

    if (!called) {
      return ::testing::AssertionFailure()
             << "TreeNode::FromIdentifier callback was not executed.";
    }
    if (status != Status::OK) {
      return ::testing::AssertionFailure()
             << "TreeNode::FromIdentifier failed with status " << status;
    }
    node->swap(result);
    return ::testing::AssertionSuccess();
  }

  ::testing::AssertionResult CreateNodeFromEntries(
      const std::vector<Entry>& entries,
      const std::map<size_t, ObjectIdentifier>& children,
      std::unique_ptr<const btree::TreeNode>* node) {
    bool called;
    Status status;
    ObjectIdentifier identifier;
    btree::TreeNode::FromEntries(
        GetStorage(), 0u, entries, children,
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &identifier));
    RunLoopUntilIdle();
    if (!called) {
      return ::testing::AssertionFailure()
             << "TreeNode::FromEntries callback was not executed.";
    }
    if (status != Status::OK) {
      return ::testing::AssertionFailure()
             << "TreeNode::FromEntries failed with status " << status;
    }
    return CreateNodeFromIdentifier(identifier, node);
  }

  ::testing::AssertionResult GetEmptyNodeIdentifier(
      ObjectIdentifier* empty_node_identifier) {
    bool called;
    Status status;
    btree::TreeNode::Empty(GetStorage(),
                           callback::Capture(callback::SetWhenCalled(&called),
                                             &status, empty_node_identifier));
    RunLoopUntilIdle();
    if (!called) {
      return ::testing::AssertionFailure()
             << "TreeNode::Empty callback was not executed.";
    }
    if (status != Status::OK) {
      return ::testing::AssertionFailure()
             << "TreeNode::Empty failed with status " << status;
    }
    return ::testing::AssertionSuccess();
  }

  ControlledLevelDb* leveldb_;
  std::unique_ptr<scoped_tmpfs::ScopedTmpFS> tmpfs_;
  encryption::FakeEncryptionService encryption_service_;
  std::unique_ptr<PageStorageImpl> storage_;

 private:
  FXL_DISALLOW_COPY_AND_ASSIGN(PageStorageTest);
};

TEST_F(PageStorageTest, AddGetLocalCommits) {
  // Search for a commit id that doesn't exist and see the error.
  bool called;
  Status status;
  std::unique_ptr<const Commit> lookup_commit;
  storage_->GetCommit(RandomCommitId(environment_.random()),
                      callback::Capture(callback::SetWhenCalled(&called),
                                        &status, &lookup_commit));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status);
  EXPECT_FALSE(lookup_commit);

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));
  CommitId id = commit->GetId();
  ObjectIdentifier root_node = commit->GetRootIdentifier();
  std::string storage_bytes = commit->GetStorageBytes().ToString();

  // Search for a commit that exists and check the content.
  storage_->AddCommitFromLocal(
      std::move(commit), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  std::unique_ptr<const Commit> found = GetCommit(id);
  EXPECT_EQ(storage_bytes, found->GetStorageBytes());
}

TEST_F(PageStorageTest, AddLocalCommitsReferences) {
  // Create two commits pointing to the same object identifier and check that
  // both are stored as inbound references of said object.
  ObjectIdentifier root_node = RandomObjectIdentifier(environment_.random());

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  std::unique_ptr<const Commit> commit1 = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_node, std::move(parent));
  CommitId id1 = commit1->GetId();
  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      std::move(commit1), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  parent.clear();
  parent.emplace_back(GetFirstHead());
  std::unique_ptr<const Commit> commit2 = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_node, std::move(parent));
  CommitId id2 = commit2->GetId();
  storage_->AddCommitFromLocal(
      std::move(commit2), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  RunInCoroutine([this, root_node, id1, id2](CoroutineHandler* handler) {
    CheckInboundCommitReferences(handler, root_node, {id1, id2});
  });
}

TEST_F(PageStorageTest, AddCommitFromLocalDoNotMarkUnsynedAlreadySyncedCommit) {
  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));
  CommitId id = commit->GetId();
  std::string storage_bytes = commit->GetStorageBytes().ToString();

  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      commit->Clone(), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  auto commits = GetUnsyncedCommits();
  EXPECT_EQ(1u, commits.size());
  EXPECT_EQ(id, commits[0]->GetId());

  storage_->MarkCommitSynced(
      id, callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  // Add the commit again.
  storage_->AddCommitFromLocal(
      commit->Clone(), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  // Check that the commit is not marked unsynced.
  commits = GetUnsyncedCommits();
  EXPECT_EQ(0u, commits.size());
}

TEST_F(PageStorageTest, AddCommitBeforeParentsError) {
  // Try to add a commit before its parent and see the error.
  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(
      std::make_unique<CommitRandomImpl>(environment_.random()));
  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));

  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      std::move(commit), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status);
}

TEST_F(PageStorageTest, AddCommitsOutOfOrderError) {
  std::unique_ptr<const btree::TreeNode> node;
  ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node));
  ObjectIdentifier root_identifier = node->GetIdentifier();

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  auto commit1 = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_identifier, std::move(parent));
  parent.clear();
  parent.push_back(commit1->Clone());
  auto commit2 = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_identifier, std::move(parent));

  std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes;
  commits_and_bytes.emplace_back(commit2->GetId(),
                                 commit2->GetStorageBytes().ToString());
  commits_and_bytes.emplace_back(commit1->GetId(),
                                 commit1->GetStorageBytes().ToString());

  bool called;
  Status status;
  std::vector<CommitId> missing_ids;
  storage_->AddCommitsFromSync(
      std::move(commits_and_bytes), ChangeSource::CLOUD,
      callback::Capture(callback::SetWhenCalled(&called), &status,
                        &missing_ids));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status);
}

TEST_F(PageStorageTest, AddGetSyncedCommits) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    FakeSyncDelegate sync;
    storage_->SetSyncDelegate(&sync);

    // Create a node with 2 values.
    ObjectData lazy_value("Some data", InlineBehavior::PREVENT);
    ObjectData eager_value("More data", InlineBehavior::PREVENT);
    std::vector<Entry> entries = {
        Entry{"key0", lazy_value.object_identifier, KeyPriority::LAZY},
        Entry{"key1", eager_value.object_identifier, KeyPriority::EAGER},
    };
    std::unique_ptr<const btree::TreeNode> node;
    ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node));
    ObjectIdentifier root_identifier = node->GetIdentifier();

    // Add the three objects to FakeSyncDelegate.
    sync.AddObject(lazy_value.object_identifier, lazy_value.value);
    sync.AddObject(eager_value.object_identifier, eager_value.value);

    {
      // Ensure root_object is not kept, as the storage it depends on will be
      // deleted.
      std::unique_ptr<const Object> root_object =
          TryGetObject(root_identifier, PageStorage::Location::NETWORK);

      fxl::StringView root_data;
      ASSERT_EQ(Status::OK, root_object->GetData(&root_data));
      sync.AddObject(root_identifier, root_data.ToString());
    }

    // Reset and clear the storage.
    ResetStorage();
    storage_->SetSyncDelegate(&sync);

    std::vector<std::unique_ptr<const Commit>> parent;
    parent.emplace_back(GetFirstHead());
    std::unique_ptr<const Commit> commit =
        CommitImpl::FromContentAndParents(environment_.clock(), storage_.get(),
                                          root_identifier, std::move(parent));
    CommitId id = commit->GetId();

    // Adding the commit should only request the tree node and the eager value.
    sync.object_requests.clear();
    bool called;
    Status status;
    std::vector<CommitId> missing_ids;
    storage_->AddCommitsFromSync(
        CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD,
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &missing_ids));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_EQ(2u, sync.object_requests.size());
    EXPECT_TRUE(sync.object_requests.find(root_identifier) !=
                sync.object_requests.end());
    EXPECT_TRUE(sync.object_requests.find(eager_value.object_identifier) !=
                sync.object_requests.end());

    // Adding the same commit twice should not request any objects from sync.
    sync.object_requests.clear();
    storage_->AddCommitsFromSync(
        CommitAndBytesFromCommit(*commit), ChangeSource::CLOUD,
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &missing_ids));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_TRUE(sync.object_requests.empty());

    std::unique_ptr<const Commit> found = GetCommit(id);
    EXPECT_EQ(commit->GetStorageBytes(), found->GetStorageBytes());

    // Check that the commit is not marked as unsynced.
    std::vector<std::unique_ptr<const Commit>> commits = GetUnsyncedCommits();
    EXPECT_TRUE(commits.empty());
  });
}

// Check that receiving a remote commit that is already present locally but not
// synced will mark the commit as synced.
TEST_F(PageStorageTest, MarkRemoteCommitSynced) {
  FakeSyncDelegate sync;
  storage_->SetSyncDelegate(&sync);

  std::unique_ptr<const btree::TreeNode> node;
  ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node));
  ObjectIdentifier root_identifier = node->GetIdentifier();

  std::unique_ptr<const Object> root_object =
      TryGetObject(root_identifier, PageStorage::Location::NETWORK);

  fxl::StringView root_data;
  ASSERT_EQ(Status::OK, root_object->GetData(&root_data));
  sync.AddObject(root_identifier, root_data.ToString());

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_identifier, std::move(parent));
  CommitId id = commit->GetId();

  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      std::move(commit), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  EXPECT_EQ(1u, GetUnsyncedCommits().size());
  storage_->GetCommit(id, callback::Capture(callback::SetWhenCalled(&called),
                                            &status, &commit));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes;
  commits_and_bytes.emplace_back(commit->GetId(),
                                 commit->GetStorageBytes().ToString());
  std::vector<CommitId> missing_ids;
  storage_->AddCommitsFromSync(
      std::move(commits_and_bytes), ChangeSource::CLOUD,
      callback::Capture(callback::SetWhenCalled(&called), &status,
                        &missing_ids));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);

  EXPECT_EQ(0u, GetUnsyncedCommits().size());
}

TEST_F(PageStorageTest, SyncCommits) {
  std::vector<std::unique_ptr<const Commit>> commits = GetUnsyncedCommits();

  // Initially there should be no unsynced commits.
  EXPECT_TRUE(commits.empty());

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  // After adding a commit it should marked as unsynced.
  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));
  CommitId id = commit->GetId();
  std::string storage_bytes = commit->GetStorageBytes().ToString();

  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      std::move(commit), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  commits = GetUnsyncedCommits();
  EXPECT_EQ(1u, commits.size());
  EXPECT_EQ(storage_bytes, commits[0]->GetStorageBytes());

  // Mark it as synced.
  storage_->MarkCommitSynced(
      id, callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  commits = GetUnsyncedCommits();
  EXPECT_TRUE(commits.empty());
}

TEST_F(PageStorageTest, HeadCommits) {
  // Every page should have one initial head commit.
  std::vector<std::unique_ptr<const Commit>> heads = GetHeads();
  EXPECT_EQ(1u, heads.size());

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  // Adding a new commit with the previous head as its parent should replace the
  // old head.
  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));
  CommitId id = commit->GetId();

  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      std::move(commit), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  heads = GetHeads();
  ASSERT_EQ(1u, heads.size());
  EXPECT_EQ(id, heads[0]->GetId());
}

TEST_F(PageStorageTest, OrderHeadCommitsByTimestampThenId) {
  timekeeper::TestClock test_clock;
  // We generate a few timestamps: some random, and a few equal constants to
  // test ID ordering.
  std::vector<zx::time_utc> timestamps(7);
  std::generate(timestamps.begin(), timestamps.end(),
                [this] { return environment_.random()->Draw<zx::time_utc>(); });
  timestamps.insert(timestamps.end(), {zx::time_utc(1000), zx::time_utc(1000),
                                       zx::time_utc(1000)});

  // We first generate the commits. The will be shuffled at a later time.
  std::vector<std::unique_ptr<const Commit>> commits;
  std::vector<std::pair<zx::time_utc, CommitId>> sorted_commits;

  for (size_t i = 0; i < timestamps.size(); i++) {
    std::vector<std::unique_ptr<const Commit>> parent;
    parent.emplace_back(GetFirstHead());

    test_clock.Set(timestamps[i]);
    // Adding a new commit with the previous head as its parent should replace
    // the old head.
    std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
        &test_clock, storage_.get(),
        RandomObjectIdentifier(environment_.random()), std::move(parent));

    sorted_commits.emplace_back(timestamps[i], commit->GetId());
    commits.push_back(std::move(commit));
  }

  // Insert the commits in a random order.
  auto rng = environment_.random()->NewBitGenerator<uint64_t>();
  std::shuffle(commits.begin(), commits.end(), rng);
  for (size_t i = 0; i < commits.size(); i++) {
    bool called;
    Status status;
    storage_->AddCommitFromLocal(
        std::move(commits[i]), {},
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
  }

  // Check that GetHeadCommitIds returns sorted commits.
  std::vector<std::unique_ptr<const Commit>> heads;
  Status status = storage_->GetHeadCommits(&heads);
  EXPECT_EQ(Status::OK, status);
  std::sort(sorted_commits.begin(), sorted_commits.end());
  for (size_t i = 0; i < sorted_commits.size(); ++i) {
    EXPECT_EQ(sorted_commits[i].second, heads[i]->GetId());
  }
}

TEST_F(PageStorageTest, CreateJournals) {
  // Explicit journal.
  auto left_commit = TryCommitFromLocal(5);
  ASSERT_TRUE(left_commit);
  auto right_commit = TryCommitFromLocal(10);
  ASSERT_TRUE(right_commit);

  // Journal for merge commit.
  std::unique_ptr<Journal> journal = storage_->StartMergeCommit(
      std::move(left_commit), std::move(right_commit));
}

TEST_F(PageStorageTest, CreateJournalHugeNode) {
  std::unique_ptr<const Commit> commit = TryCommitFromLocal(500, 1024);
  ASSERT_TRUE(commit);
  std::vector<Entry> entries = GetCommitContents(*commit);

  EXPECT_EQ(500u, entries.size());
  for (const auto& entry : entries) {
    EXPECT_EQ(1024u, entry.key.size());
  }

  // Check that all node's parts are marked as unsynced.
  bool called;
  Status status;
  std::vector<ObjectIdentifier> object_identifiers;
  storage_->GetUnsyncedPieces(callback::Capture(
      callback::SetWhenCalled(&called), &status, &object_identifiers));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);

  bool found_index = false;
  std::set<ObjectIdentifier> unsynced_identifiers(object_identifiers.begin(),
                                                  object_identifiers.end());
  for (const auto& identifier : unsynced_identifiers) {
    EXPECT_FALSE(GetObjectDigestInfo(identifier.object_digest()).is_inlined());

    if (GetObjectDigestInfo(identifier.object_digest()).piece_type ==
        PieceType::INDEX) {
      found_index = true;
      std::set<ObjectIdentifier> sub_identifiers;
      IterationStatus iteration_status = IterationStatus::ERROR;
      CollectPieces(
          identifier,
          [this](ObjectIdentifier identifier,
                 fit::function<void(Status, fxl::StringView)> callback) {
            storage_->GetPiece(
                std::move(identifier),
                [callback = std::move(callback)](
                    Status status, std::unique_ptr<const Piece> piece) {
                  if (status != Status::OK) {
                    callback(status, "");
                    return;
                  }
                  callback(status, piece->GetData());
                });
          },
          [&iteration_status, &sub_identifiers](IterationStatus status,
                                                ObjectIdentifier identifier) {
            iteration_status = status;
            if (status == IterationStatus::IN_PROGRESS) {
              EXPECT_TRUE(sub_identifiers.insert(identifier).second);
            }
            return true;
          });
      RunLoopUntilIdle();
      EXPECT_EQ(IterationStatus::DONE, iteration_status);
      for (const auto& identifier : sub_identifiers) {
        EXPECT_EQ(1u, unsynced_identifiers.count(identifier));
      }
    }
  }
  EXPECT_TRUE(found_index);
}

TEST_F(PageStorageTest, DestroyUncommittedJournal) {
  // It is not an error if a journal is not committed or rolled back.
  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());
  journal->Put("key", RandomObjectIdentifier(environment_.random()),
               KeyPriority::EAGER);
}

TEST_F(PageStorageTest, AddObjectFromLocal) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("Some data", InlineBehavior::PREVENT);

    bool called;
    Status status;
    ObjectIdentifier object_identifier;
    storage_->AddObjectFromLocal(
        ObjectType::BLOB, data.ToDataSource(), {},
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &object_identifier));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_EQ(data.object_identifier, object_identifier);

    std::unique_ptr<const Piece> piece;
    ASSERT_EQ(Status::OK, ReadObject(handler, object_identifier, &piece));
    EXPECT_EQ(data.value, piece->GetData());
    EXPECT_TRUE(ObjectIsUntracked(object_identifier, true));
    EXPECT_TRUE(IsPieceSynced(object_identifier, false));
  });
}

TEST_F(PageStorageTest, AddSmallObjectFromLocal) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("Some data");

    bool called;
    Status status;
    ObjectIdentifier object_identifier;
    storage_->AddObjectFromLocal(
        ObjectType::BLOB, data.ToDataSource(), {},
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &object_identifier));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_EQ(data.object_identifier, object_identifier);
    EXPECT_EQ(data.value,
              ExtractObjectDigestData(object_identifier.object_digest()));

    std::unique_ptr<const Piece> piece;
    EXPECT_EQ(Status::INTERNAL_NOT_FOUND,
              ReadObject(handler, object_identifier, &piece));
    // Inline objects do not need to ever be tracked.
    EXPECT_TRUE(ObjectIsUntracked(object_identifier, false));
  });
}

TEST_F(PageStorageTest, InterruptAddObjectFromLocal) {
  ObjectData data("Some data");

  storage_->AddObjectFromLocal(
      ObjectType::BLOB, data.ToDataSource(), {},
      [](Status returned_status, ObjectIdentifier object_identifier) {});

  // Checking that we do not crash when deleting the storage while an AddObject
  // call is in progress.
  storage_.reset();
}

TEST_F(PageStorageTest, AddObjectFromLocalError) {
  auto data_source = std::make_unique<FakeErrorDataSource>(dispatcher());
  bool called;
  Status status;
  ObjectIdentifier object_identifier;
  storage_->AddObjectFromLocal(
      ObjectType::BLOB, std::move(data_source), {},
      callback::Capture(callback::SetWhenCalled(&called), &status,
                        &object_identifier));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::IO_ERROR, status);
}

TEST_F(PageStorageTest, AddLocalPiece) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("Some data", InlineBehavior::PREVENT);
    const ObjectIdentifier reference =
        RandomObjectIdentifier(environment_.random());

    bool called;
    Status status;
    PageStorageImplAccessorForTest::AddPiece(
        storage_, data.ToPiece(), ChangeSource::LOCAL, IsObjectSynced::NO,
        {{reference.object_digest(), KeyPriority::LAZY}},
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);

    std::unique_ptr<const Piece> piece;
    ASSERT_EQ(Status::OK, ReadObject(handler, data.object_identifier, &piece));
    EXPECT_EQ(data.value, piece->GetData());
    EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true));
    EXPECT_TRUE(IsPieceSynced(data.object_identifier, false));

    CheckInboundObjectReferences(
        handler, reference,
        {{data.object_identifier.object_digest(), KeyPriority::LAZY}});
  });
}

TEST_F(PageStorageTest, AddSyncPiece) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("Some data", InlineBehavior::PREVENT);
    const ObjectIdentifier reference =
        RandomObjectIdentifier(environment_.random());

    bool called;
    Status status;
    PageStorageImplAccessorForTest::AddPiece(
        storage_, data.ToPiece(), ChangeSource::CLOUD, IsObjectSynced::YES,
        {{reference.object_digest(), KeyPriority::EAGER}},
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);

    std::unique_ptr<const Piece> piece;
    ASSERT_EQ(Status::OK, ReadObject(handler, data.object_identifier, &piece));
    EXPECT_EQ(data.value, piece->GetData());
    EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false));
    EXPECT_TRUE(IsPieceSynced(data.object_identifier, true));

    CheckInboundObjectReferences(
        handler, reference,
        {{data.object_identifier.object_digest(), KeyPriority::EAGER}});
  });
}

TEST_F(PageStorageTest, AddP2PPiece) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("Some data", InlineBehavior::PREVENT);

    bool called;
    Status status;
    PageStorageImplAccessorForTest::AddPiece(
        storage_, data.ToPiece(), ChangeSource::P2P, IsObjectSynced::NO, {},
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);

    std::unique_ptr<const Piece> piece;
    ASSERT_EQ(Status::OK, ReadObject(handler, data.object_identifier, &piece));
    EXPECT_EQ(data.value, piece->GetData());
    EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false));
    EXPECT_TRUE(IsPieceSynced(data.object_identifier, false));
  });
}

TEST_F(PageStorageTest, GetObject) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("Some data");
    ASSERT_EQ(Status::OK, WriteObject(handler, &data));

    std::unique_ptr<const Object> object =
        TryGetObject(data.object_identifier, PageStorage::Location::LOCAL);
    EXPECT_EQ(data.object_identifier, object->GetIdentifier());
    fxl::StringView object_data;
    ASSERT_EQ(Status::OK, object->GetData(&object_data));
    EXPECT_EQ(data.value, convert::ToString(object_data));
  });
}

TEST_F(PageStorageTest, GetObjectPart) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("_Some data_");
    ASSERT_EQ(Status::OK, WriteObject(handler, &data));

    fsl::SizedVmo object_part = TryGetObjectPart(
        data.object_identifier, 1, data.size - 2, PageStorage::Location::LOCAL);
    std::string object_part_data;
    ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
    EXPECT_EQ(data.value.substr(1, data.size - 2),
              convert::ToString(object_part_data));
  });
}

TEST_F(PageStorageTest, GetObjectPartLargeOffset) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("_Some data_");
    ASSERT_EQ(Status::OK, WriteObject(handler, &data));

    fsl::SizedVmo object_part =
        TryGetObjectPart(data.object_identifier, data.size * 2, data.size,
                         PageStorage::Location::LOCAL);
    std::string object_part_data;
    ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
    EXPECT_EQ("", convert::ToString(object_part_data));
  });
}

TEST_F(PageStorageTest, GetObjectPartLargeMaxSize) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("_Some data_");
    ASSERT_EQ(Status::OK, WriteObject(handler, &data));

    fsl::SizedVmo object_part = TryGetObjectPart(
        data.object_identifier, 0, data.size * 2, PageStorage::Location::LOCAL);
    std::string object_part_data;
    ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
    EXPECT_EQ(data.value, convert::ToString(object_part_data));
  });
}

TEST_F(PageStorageTest, GetObjectPartNegativeArgs) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    ObjectData data("_Some data_");
    ASSERT_EQ(Status::OK, WriteObject(handler, &data));

    fsl::SizedVmo object_part =
        TryGetObjectPart(data.object_identifier, -data.size + 1, -1,
                         PageStorage::Location::LOCAL);
    std::string object_part_data;
    ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
    EXPECT_EQ(data.value.substr(1, data.size - 1),
              convert::ToString(object_part_data));
  });
}

TEST_F(PageStorageTest, GetLargeObjectPart) {
  std::string data_str = RandomString(environment_.random(), 65536);
  size_t offset = 6144;
  size_t size = 49152;

  ObjectData data(std::move(data_str), ObjectType::TREE_NODE,
                  InlineBehavior::PREVENT);

  ASSERT_EQ(
      PieceType::INDEX,
      GetObjectDigestInfo(data.object_identifier.object_digest()).piece_type);

  bool called;
  Status status;
  ObjectIdentifier object_identifier;
  storage_->AddObjectFromLocal(
      ObjectType::TREE_NODE, data.ToDataSource(), /*tree_references=*/{},
      callback::Capture(callback::SetWhenCalled(&called), &status,
                        &object_identifier));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);

  EXPECT_EQ(Status::OK, status);
  EXPECT_EQ(data.object_identifier, object_identifier);

  fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size,
                                               PageStorage::Location::LOCAL);
  std::string object_part_data;
  ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
  std::string result_str = convert::ToString(object_part_data);
  EXPECT_EQ(size, result_str.size());
  EXPECT_EQ(data.value.substr(offset, size), result_str);
}

TEST_F(PageStorageTest, GetObjectPartFromSync) {
  ObjectData data("_Some data_", InlineBehavior::PREVENT);
  FakeSyncDelegate sync;
  sync.AddObject(data.object_identifier, data.value);
  storage_->SetSyncDelegate(&sync);

  fsl::SizedVmo object_part = TryGetObjectPart(
      data.object_identifier, 1, data.size - 2, PageStorage::Location::NETWORK);
  std::string object_part_data;
  ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
  EXPECT_EQ(data.value.substr(1, data.size - 2),
            convert::ToString(object_part_data));

  storage_->SetSyncDelegate(nullptr);
  ObjectData other_data("_Some other data_", InlineBehavior::PREVENT);
  TryGetObjectPart(other_data.object_identifier, 1, other_data.size - 2,
                   PageStorage::Location::LOCAL, Status::INTERNAL_NOT_FOUND);
  TryGetObjectPart(other_data.object_identifier, 1, other_data.size - 2,
                   PageStorage::Location::NETWORK, Status::NETWORK_ERROR);
}

TEST_F(PageStorageTest, GetHugeObjectPartFromSync) {
  std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1);
  int64_t offset = 28672;
  int64_t size = 128;

  FakeSyncDelegate sync;
  ObjectIdentifier object_identifier = ForEachPiece(
      data_str, ObjectType::BLOB, [&sync](std::unique_ptr<const Piece> piece) {
        ObjectIdentifier object_identifier = piece->GetIdentifier();
        if (GetObjectDigestInfo(object_identifier.object_digest())
                .is_inlined()) {
          return;
        }
        sync.AddObject(std::move(object_identifier),
                       piece->GetData().ToString());
      });
  ASSERT_EQ(PieceType::INDEX,
            GetObjectDigestInfo(object_identifier.object_digest()).piece_type);
  storage_->SetSyncDelegate(&sync);

  fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size,
                                               PageStorage::Location::NETWORK);
  std::string object_part_data;
  ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
  EXPECT_EQ(data_str.substr(offset, size), convert::ToString(object_part_data));
  EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored());
}

TEST_F(PageStorageTest, GetHugeObjectPartFromSyncNegativeOffset) {
  std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1);
  int64_t offset = -28672;
  int64_t size = 128;

  FakeSyncDelegate sync;
  ObjectIdentifier object_identifier = ForEachPiece(
      data_str, ObjectType::BLOB, [&sync](std::unique_ptr<const Piece> piece) {
        ObjectIdentifier object_identifier = piece->GetIdentifier();
        if (GetObjectDigestInfo(object_identifier.object_digest())
                .is_inlined()) {
          return;
        }
        sync.AddObject(std::move(object_identifier),
                       piece->GetData().ToString());
      });
  ASSERT_EQ(PieceType::INDEX,
            GetObjectDigestInfo(object_identifier.object_digest()).piece_type);
  storage_->SetSyncDelegate(&sync);

  fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size,
                                               PageStorage::Location::NETWORK);
  std::string object_part_data;
  ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
  EXPECT_EQ(data_str.substr(data_str.size() + offset, size),
            convert::ToString(object_part_data));
  EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored());
}

TEST_F(PageStorageTest, GetObjectFromSync) {
  ObjectData data("Some data", InlineBehavior::PREVENT);
  FakeSyncDelegate sync;
  sync.AddObject(data.object_identifier, data.value);
  storage_->SetSyncDelegate(&sync);

  std::unique_ptr<const Object> object =
      TryGetObject(data.object_identifier, PageStorage::Location::NETWORK);
  EXPECT_EQ(data.object_identifier, object->GetIdentifier());
  fxl::StringView object_data;
  ASSERT_EQ(Status::OK, object->GetData(&object_data));
  EXPECT_EQ(data.value, convert::ToString(object_data));

  storage_->SetSyncDelegate(nullptr);
  ObjectData other_data("Some other data", InlineBehavior::PREVENT);
  TryGetObject(other_data.object_identifier, PageStorage::Location::LOCAL,
               Status::INTERNAL_NOT_FOUND);
  TryGetObject(other_data.object_identifier, PageStorage::Location::NETWORK,
               Status::NETWORK_ERROR);
}

TEST_F(PageStorageTest, FullDownloadAfterPartial) {
  std::string data_str = RandomString(environment_.random(), 2 * 65536 + 1);
  int64_t offset = 0;
  int64_t size = 128;

  FakeSyncDelegate sync;
  ObjectIdentifier object_identifier = ForEachPiece(
      data_str, ObjectType::BLOB, [&sync](std::unique_ptr<const Piece> piece) {
        ObjectIdentifier object_identifier = piece->GetIdentifier();
        if (GetObjectDigestInfo(object_identifier.object_digest())
                .is_inlined()) {
          return;
        }
        sync.AddObject(std::move(object_identifier),
                       piece->GetData().ToString());
      });
  ASSERT_EQ(PieceType::INDEX,
            GetObjectDigestInfo(object_identifier.object_digest()).piece_type);
  storage_->SetSyncDelegate(&sync);

  fsl::SizedVmo object_part = TryGetObjectPart(object_identifier, offset, size,
                                               PageStorage::Location::NETWORK);
  std::string object_part_data;
  ASSERT_TRUE(fsl::StringFromVmo(object_part, &object_part_data));
  EXPECT_EQ(data_str.substr(offset, size), convert::ToString(object_part_data));
  EXPECT_LT(sync.object_requests.size(), sync.GetNumberOfObjectsStored());
  TryGetObject(object_identifier, PageStorage::LOCAL,
               Status::INTERNAL_NOT_FOUND);

  std::unique_ptr<const Object> object =
      TryGetObject(object_identifier, PageStorage::Location::NETWORK);
  fxl::StringView object_data;
  ASSERT_EQ(Status::OK, object->GetData(&object_data));
  EXPECT_EQ(data_str, convert::ToString(object_data));
  EXPECT_EQ(sync.object_requests.size(), sync.GetNumberOfObjectsStored());
  TryGetObject(object_identifier, PageStorage::LOCAL, Status::OK);
}

TEST_F(PageStorageTest, GetObjectFromSyncWrongId) {
  ObjectData data("Some data", InlineBehavior::PREVENT);
  ObjectData data2("Some data2", InlineBehavior::PREVENT);
  FakeSyncDelegate sync;
  sync.AddObject(data.object_identifier, data2.value);
  storage_->SetSyncDelegate(&sync);

  TryGetObject(data.object_identifier, PageStorage::Location::NETWORK,
               Status::OBJECT_DIGEST_MISMATCH);
}

TEST_F(PageStorageTest, AddAndGetHugeTreenodeFromLocal) {
  std::string data_str = RandomString(environment_.random(), 65536);

  ObjectData data(std::move(data_str), ObjectType::TREE_NODE,
                  InlineBehavior::PREVENT);
  // An identifier to another tree node pointed at by the current one.
  const ObjectIdentifier tree_reference =
      RandomObjectIdentifier(environment_.random());
  ASSERT_EQ(
      ObjectType::TREE_NODE,
      GetObjectDigestInfo(data.object_identifier.object_digest()).object_type);
  ASSERT_EQ(
      PieceType::INDEX,
      GetObjectDigestInfo(data.object_identifier.object_digest()).piece_type);
  ASSERT_EQ(
      InlinedPiece::NO,
      GetObjectDigestInfo(data.object_identifier.object_digest()).inlined);

  bool called;
  Status status;
  ObjectIdentifier object_identifier;
  storage_->AddObjectFromLocal(
      ObjectType::TREE_NODE, data.ToDataSource(),
      {{tree_reference.object_digest(), KeyPriority::LAZY}},
      callback::Capture(callback::SetWhenCalled(&called), &status,
                        &object_identifier));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);

  EXPECT_EQ(Status::OK, status);
  // This ensures that the object is encoded with an index, as we checked the
  // piece type of |data.object_identifier| above.
  EXPECT_EQ(data.object_identifier, object_identifier);

  std::unique_ptr<const Object> object =
      TryGetObject(object_identifier, PageStorage::Location::LOCAL);
  fxl::StringView content;
  ASSERT_EQ(Status::OK, object->GetData(&content));
  EXPECT_EQ(data.value, content);
  EXPECT_TRUE(ObjectIsUntracked(object_identifier, true));
  EXPECT_TRUE(IsPieceSynced(object_identifier, false));

  // Check that the index piece obtained at |object_identifier| is different
  // from the object itself, ie. that some splitting occurred.
  std::unique_ptr<const Piece> piece = TryGetPiece(object_identifier);
  EXPECT_NE(content, piece->GetData());

  RunInCoroutine([this, piece = std::move(piece), tree_reference,
                  object_identifier](CoroutineHandler* handler) {
    // Check tree reference.
    CheckInboundObjectReferences(
        handler, tree_reference,
        {{object_identifier.object_digest(), KeyPriority::LAZY}});
    // Check piece references.
    ASSERT_EQ(
        Status::OK,
        ForEachIndexChild(
            piece->GetData(), [this, handler, object_identifier](
                                  ObjectIdentifier piece_identifier) {
              CheckInboundObjectReferences(
                  handler, piece_identifier,
                  {{object_identifier.object_digest(), KeyPriority::EAGER}});
              return Status::OK;
            }));
  });
}

TEST_F(PageStorageTest, UnsyncedPieces) {
  ObjectData data_array[] = {
      ObjectData("Some data", InlineBehavior::PREVENT),
      ObjectData("Some more data", InlineBehavior::PREVENT),
      ObjectData("Even more data", InlineBehavior::PREVENT),
  };
  constexpr size_t size = arraysize(data_array);
  for (auto& data : data_array) {
    TryAddFromLocal(data.value, data.object_identifier);
    EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true));
    EXPECT_TRUE(IsPieceSynced(data.object_identifier, false));
  }

  std::vector<CommitId> commits;

  // Add one key-value pair per commit.
  for (size_t i = 0; i < size; ++i) {
    std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());

    journal->Put(fxl::StringPrintf("key%lu", i),
                 data_array[i].object_identifier, KeyPriority::LAZY);
    EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK));
    commits.push_back(GetFirstHead()->GetId());
  }

  // GetUnsyncedPieces should return the ids of all objects: 3 values and
  // the 3 root nodes of the 3 commits.
  bool called;
  Status status;
  std::vector<ObjectIdentifier> object_identifiers;
  storage_->GetUnsyncedPieces(callback::Capture(
      callback::SetWhenCalled(&called), &status, &object_identifiers));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_EQ(6u, object_identifiers.size());
  for (size_t i = 0; i < size; ++i) {
    std::unique_ptr<const Commit> commit = GetCommit(commits[i]);
    EXPECT_TRUE(std::find_if(object_identifiers.begin(),
                             object_identifiers.end(),
                             [&](const auto& identifier) {
                               return identifier == commit->GetRootIdentifier();
                             }) != object_identifiers.end());
  }
  for (auto& data : data_array) {
    EXPECT_TRUE(std::find(object_identifiers.begin(), object_identifiers.end(),
                          data.object_identifier) != object_identifiers.end());
  }

  // Mark the 2nd object as synced. We now expect to still find the 2 unsynced
  // values and the (also unsynced) root node.
  storage_->MarkPieceSynced(
      data_array[1].object_identifier,
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  std::vector<ObjectIdentifier> objects;
  storage_->GetUnsyncedPieces(
      callback::Capture(callback::SetWhenCalled(&called), &status, &objects));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_EQ(5u, objects.size());
  std::unique_ptr<const Commit> commit = GetCommit(commits[2]);
  EXPECT_TRUE(std::find(objects.begin(), objects.end(),
                        commit->GetRootIdentifier()) != objects.end());
  EXPECT_TRUE(std::find(objects.begin(), objects.end(),
                        data_array[0].object_identifier) != objects.end());
  EXPECT_TRUE(std::find(objects.begin(), objects.end(),
                        data_array[2].object_identifier) != objects.end());
}

TEST_F(PageStorageTest, PageIsSynced) {
  ObjectData data_array[] = {
      ObjectData("Some data", InlineBehavior::PREVENT),
      ObjectData("Some more data", InlineBehavior::PREVENT),
      ObjectData("Even more data", InlineBehavior::PREVENT),
  };
  constexpr size_t size = arraysize(data_array);
  for (auto& data : data_array) {
    TryAddFromLocal(data.value, data.object_identifier);
    EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true));
    EXPECT_TRUE(IsPieceSynced(data.object_identifier, false));
  }

  // The objects have not been added in a commit: there is nothing to sync and
  // the page is considered synced.
  bool called;
  Status status;
  bool is_synced;
  storage_->IsSynced(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_EQ(true, is_synced);

  // Add all objects in one commit.
  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());
  for (size_t i = 0; i < size; ++i) {
    journal->Put(fxl::StringPrintf("key%lu", i),
                 data_array[i].object_identifier, KeyPriority::LAZY);
  }
  EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK));
  CommitId commit_id = GetFirstHead()->GetId();

  // After commiting, the page is unsynced.
  called = false;
  storage_->IsSynced(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_FALSE(is_synced);
  // Mark objects (and the root tree node) as synced and expect that the page is
  // still unsynced.
  for (const auto& data : data_array) {
    called = false;
    storage_->MarkPieceSynced(
        data.object_identifier,
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
  }

  called = false;
  storage_->MarkPieceSynced(
      GetFirstHead()->GetRootIdentifier(),
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  called = false;
  storage_->IsSynced(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_FALSE(is_synced);

  // Mark the commit as synced and expect that the page is synced.
  called = false;
  storage_->MarkCommitSynced(
      commit_id, callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  called = false;
  storage_->IsSynced(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_synced));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_TRUE(is_synced);

  // All objects should be synced now.
  for (auto& data : data_array) {
    EXPECT_TRUE(IsPieceSynced(data.object_identifier, true));
  }
}

TEST_F(PageStorageTest, PageIsMarkedOnlineAfterCloudSync) {
  // Check that the page is initially not marked as online.
  EXPECT_FALSE(storage_->IsOnline());

  // Create a local commit: the page is still not online.
  int size = 10;
  std::unique_ptr<const Commit> commit = TryCommitFromLocal(size);
  EXPECT_FALSE(storage_->IsOnline());

  // Mark all objects as synced. The page is still not online: other devices
  // will only see these objects if the corresponding commit is also synced to
  // the cloud.
  bool called;
  Status status;
  std::vector<ObjectIdentifier> object_identifiers;
  storage_->GetUnsyncedPieces(callback::Capture(
      callback::SetWhenCalled(&called), &status, &object_identifiers));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  for (ObjectIdentifier& object_identifier : object_identifiers) {
    storage_->MarkPieceSynced(
        object_identifier,
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
  }
  EXPECT_FALSE(storage_->IsOnline());

  // Mark the commit as synced. The page should now be marked as online.
  storage_->MarkCommitSynced(
      commit->GetId(),
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_TRUE(storage_->IsOnline());
}

TEST_F(PageStorageTest, PageIsMarkedOnlineSyncWithPeer) {
  // Check that the page is initially not marked as online.
  EXPECT_FALSE(storage_->IsOnline());

  // Mark the page as synced to peer and expect that it is marked as online.
  bool called;
  Status status;
  storage_->MarkSyncedToPeer(
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_TRUE(storage_->IsOnline());
}

TEST_F(PageStorageTest, PageIsEmpty) {
  ObjectData value("Some value", InlineBehavior::PREVENT);
  bool called;
  Status status;
  bool is_empty;

  // Initially the page is empty.
  storage_->IsEmpty(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_empty));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_TRUE(is_empty);

  // Add an entry and expect that the page is not empty any more.
  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());
  journal->Put("key", value.object_identifier, KeyPriority::LAZY);
  EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK));

  storage_->IsEmpty(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_empty));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_FALSE(is_empty);

  // Clear the page and expect it to be empty again.
  journal = storage_->StartCommit(GetFirstHead());
  journal->Delete("key");
  EXPECT_TRUE(TryCommitJournal(std::move(journal), Status::OK));

  storage_->IsEmpty(
      callback::Capture(callback::SetWhenCalled(&called), &status, &is_empty));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);
  EXPECT_TRUE(is_empty);
}

TEST_F(PageStorageTest, UntrackedObjectsSimple) {
  ObjectData data("Some data", InlineBehavior::PREVENT);

  // The object is not yet created and its id should not be marked as untracked.
  EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false));

  // After creating the object it should be marked as untracked.
  TryAddFromLocal(data.value, data.object_identifier);
  EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true));

  // After adding the object in a commit it should not be untracked any more.
  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());
  journal->Put("key", data.object_identifier, KeyPriority::EAGER);
  EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true));
  ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK));
  EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, false));
}

TEST_F(PageStorageTest, UntrackedObjectsComplex) {
  ObjectData data_array[] = {
      ObjectData("Some data", InlineBehavior::PREVENT),
      ObjectData("Some more data", InlineBehavior::PREVENT),
      ObjectData("Even more data", InlineBehavior::PREVENT),
  };
  for (auto& data : data_array) {
    TryAddFromLocal(data.value, data.object_identifier);
    EXPECT_TRUE(ObjectIsUntracked(data.object_identifier, true));
  }

  // Add a first commit containing data_array[0].
  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());
  journal->Put("key0", data_array[0].object_identifier, KeyPriority::LAZY);
  EXPECT_TRUE(ObjectIsUntracked(data_array[0].object_identifier, true));
  ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK));
  EXPECT_TRUE(ObjectIsUntracked(data_array[0].object_identifier, false));
  EXPECT_TRUE(ObjectIsUntracked(data_array[1].object_identifier, true));
  EXPECT_TRUE(ObjectIsUntracked(data_array[2].object_identifier, true));

  // Create a second commit. After calling Put for "key1" for the second time
  // data_array[1] is no longer part of this commit: it should remain
  // untracked after committing.
  journal = storage_->StartCommit(GetFirstHead());
  journal->Put("key1", data_array[1].object_identifier, KeyPriority::LAZY);
  journal->Put("key2", data_array[2].object_identifier, KeyPriority::LAZY);
  journal->Put("key1", data_array[2].object_identifier, KeyPriority::LAZY);
  journal->Put("key3", data_array[0].object_identifier, KeyPriority::LAZY);
  ASSERT_TRUE(TryCommitJournal(std::move(journal), Status::OK));
  EXPECT_TRUE(ObjectIsUntracked(data_array[0].object_identifier, false));
  EXPECT_TRUE(ObjectIsUntracked(data_array[1].object_identifier, true));
  EXPECT_TRUE(ObjectIsUntracked(data_array[2].object_identifier, false));
}

TEST_F(PageStorageTest, CommitWatchers) {
  FakeCommitWatcher watcher;
  storage_->AddCommitWatcher(&watcher);

  // Add a watcher and receive the commit.
  auto expected = TryCommitFromLocal(10);
  ASSERT_TRUE(expected);
  EXPECT_EQ(1, watcher.commit_count);
  EXPECT_EQ(expected->GetId(), watcher.last_commit_id);
  EXPECT_EQ(ChangeSource::LOCAL, watcher.last_source);

  // Add a second watcher.
  FakeCommitWatcher watcher2;
  storage_->AddCommitWatcher(&watcher2);
  expected = TryCommitFromLocal(10);
  ASSERT_TRUE(expected);
  EXPECT_EQ(2, watcher.commit_count);
  EXPECT_EQ(expected->GetId(), watcher.last_commit_id);
  EXPECT_EQ(ChangeSource::LOCAL, watcher.last_source);
  EXPECT_EQ(1, watcher2.commit_count);
  EXPECT_EQ(expected->GetId(), watcher2.last_commit_id);
  EXPECT_EQ(ChangeSource::LOCAL, watcher2.last_source);

  // Remove one watcher.
  storage_->RemoveCommitWatcher(&watcher2);
  expected = TryCommitFromSync();
  EXPECT_EQ(3, watcher.commit_count);
  EXPECT_EQ(expected->GetId(), watcher.last_commit_id);
  EXPECT_EQ(ChangeSource::CLOUD, watcher.last_source);
  EXPECT_EQ(1, watcher2.commit_count);
}

// If a commit fails to be persisted on disk, no notification should be sent.
TEST_F(PageStorageTest, CommitFailNoWatchNotification) {
  FakeCommitWatcher watcher;
  storage_->AddCommitWatcher(&watcher);
  EXPECT_EQ(0, watcher.commit_count);

  // Create the commit.
  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());
  journal->Put("key1", RandomObjectIdentifier(environment_.random()),
               KeyPriority::EAGER);

  leveldb_->SetFailBatchExecuteAfter(1);
  std::unique_ptr<const Commit> commit =
      TryCommitJournal(std::move(journal), Status::IO_ERROR);

  // The watcher is not called.
  EXPECT_EQ(0, watcher.commit_count);
}

TEST_F(PageStorageTest, SyncMetadata) {
  std::vector<std::pair<fxl::StringView, fxl::StringView>> keys_and_values = {
      {"foo1", "foo2"}, {"bar1", " bar2 "}};
  for (const auto& key_and_value : keys_and_values) {
    auto key = key_and_value.first;
    auto value = key_and_value.second;
    bool called;
    Status status;
    std::string returned_value;
    storage_->GetSyncMetadata(
        key, callback::Capture(callback::SetWhenCalled(&called), &status,
                               &returned_value));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status);

    storage_->SetSyncMetadata(
        key, value,
        callback::Capture(callback::SetWhenCalled(&called), &status));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);

    storage_->GetSyncMetadata(
        key, callback::Capture(callback::SetWhenCalled(&called), &status,
                               &returned_value));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);
    EXPECT_EQ(value, returned_value);
  }
}

TEST_F(PageStorageTest, AddMultipleCommitsFromSync) {
  RunInCoroutine([this](CoroutineHandler* handler) {
    FakeSyncDelegate sync;
    storage_->SetSyncDelegate(&sync);

    // Build the commit Tree with:
    //         0
    //         |
    //         1  2
    std::vector<ObjectIdentifier> object_identifiers;
    object_identifiers.resize(3);
    for (size_t i = 0; i < object_identifiers.size(); ++i) {
      ObjectData value("value" + std::to_string(i), InlineBehavior::PREVENT);
      std::vector<Entry> entries = {Entry{"key" + std::to_string(i),
                                          value.object_identifier,
                                          KeyPriority::EAGER}};
      std::unique_ptr<const btree::TreeNode> node;
      ASSERT_TRUE(CreateNodeFromEntries(entries, {}, &node));
      object_identifiers[i] = node->GetIdentifier();
      sync.AddObject(value.object_identifier, value.value);
      std::unique_ptr<const Object> root_object =
          TryGetObject(object_identifiers[i], PageStorage::Location::NETWORK);
      fxl::StringView root_data;
      ASSERT_EQ(Status::OK, root_object->GetData(&root_data));
      sync.AddObject(object_identifiers[i], root_data.ToString());
    }

    // Reset and clear the storage.
    ResetStorage();
    storage_->SetSyncDelegate(&sync);

    std::vector<std::unique_ptr<const Commit>> parent;
    parent.emplace_back(GetFirstHead());
    std::unique_ptr<const Commit> commit0 = CommitImpl::FromContentAndParents(
        environment_.clock(), storage_.get(), object_identifiers[0],
        std::move(parent));
    parent.clear();

    parent.emplace_back(GetFirstHead());
    std::unique_ptr<const Commit> commit1 = CommitImpl::FromContentAndParents(
        environment_.clock(), storage_.get(), object_identifiers[1],
        std::move(parent));
    parent.clear();

    parent.emplace_back(commit1->Clone());
    std::unique_ptr<const Commit> commit2 = CommitImpl::FromContentAndParents(
        environment_.clock(), storage_.get(), object_identifiers[2],
        std::move(parent));

    std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes;
    commits_and_bytes.emplace_back(commit0->GetId(),
                                   commit0->GetStorageBytes().ToString());
    commits_and_bytes.emplace_back(commit1->GetId(),
                                   commit1->GetStorageBytes().ToString());
    commits_and_bytes.emplace_back(commit2->GetId(),
                                   commit2->GetStorageBytes().ToString());

    bool called;
    Status status;
    std::vector<CommitId> missing_ids;
    storage_->AddCommitsFromSync(
        std::move(commits_and_bytes), ChangeSource::CLOUD,
        callback::Capture(callback::SetWhenCalled(&called), &status,
                          &missing_ids));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    EXPECT_EQ(Status::OK, status);

    EXPECT_EQ(4u, sync.object_requests.size());
    EXPECT_NE(sync.object_requests.find(object_identifiers[0]),
              sync.object_requests.end());
    EXPECT_EQ(sync.object_requests.find(object_identifiers[1]),
              sync.object_requests.end());
    EXPECT_NE(sync.object_requests.find(object_identifiers[2]),
              sync.object_requests.end());
  });
}

TEST_F(PageStorageTest, Generation) {
  std::unique_ptr<const Commit> commit1 = TryCommitFromLocal(3);
  ASSERT_TRUE(commit1);
  EXPECT_EQ(1u, commit1->GetGeneration());

  std::unique_ptr<const Commit> commit2 = TryCommitFromLocal(3);
  ASSERT_TRUE(commit2);
  EXPECT_EQ(2u, commit2->GetGeneration());

  std::unique_ptr<Journal> journal =
      storage_->StartMergeCommit(std::move(commit1), std::move(commit2));

  std::unique_ptr<const Commit> commit3 =
      TryCommitJournal(std::move(journal), Status::OK);
  ASSERT_TRUE(commit3);
  EXPECT_EQ(3u, commit3->GetGeneration());
}

TEST_F(PageStorageTest, GetEntryFromCommit) {
  int size = 10;
  std::unique_ptr<const Commit> commit = TryCommitFromLocal(size);
  ASSERT_TRUE(commit);

  bool called;
  Status status;
  Entry entry;
  storage_->GetEntryFromCommit(
      *commit, "key not found",
      callback::Capture(callback::SetWhenCalled(&called), &status, &entry));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  ASSERT_EQ(Status::KEY_NOT_FOUND, status);

  for (int i = 0; i < size; ++i) {
    std::string expected_key = fxl::StringPrintf("key%05d", i);
    storage_->GetEntryFromCommit(
        *commit, expected_key,
        callback::Capture(callback::SetWhenCalled(&called), &status, &entry));
    RunLoopUntilIdle();
    ASSERT_TRUE(called);
    ASSERT_EQ(Status::OK, status);
    EXPECT_EQ(expected_key, entry.key);
  }
}

TEST_F(PageStorageTest, WatcherForReEntrantCommits) {
  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());

  std::unique_ptr<const Commit> commit1 = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));
  CommitId id1 = commit1->GetId();

  parent.clear();
  parent.emplace_back(commit1->Clone());

  std::unique_ptr<const Commit> commit2 = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(),
      RandomObjectIdentifier(environment_.random()), std::move(parent));
  CommitId id2 = commit2->GetId();

  FakeCommitWatcher watcher;
  storage_->AddCommitWatcher(&watcher);

  bool called;
  Status status;
  storage_->AddCommitFromLocal(
      std::move(commit1), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  storage_->AddCommitFromLocal(
      std::move(commit2), {},
      callback::Capture(callback::SetWhenCalled(&called), &status));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  EXPECT_EQ(2, watcher.commit_count);
  EXPECT_EQ(id2, watcher.last_commit_id);
}

TEST_F(PageStorageTest, NoOpCommit) {
  std::vector<std::unique_ptr<const Commit>> heads = GetHeads();
  ASSERT_FALSE(heads.empty());

  std::unique_ptr<Journal> journal = storage_->StartCommit(GetFirstHead());

  // Create a key, and delete it.
  journal->Put("key", RandomObjectIdentifier(environment_.random()),
               KeyPriority::EAGER);
  journal->Delete("key");

  // Commit the journal.
  bool called;
  Status status;
  std::unique_ptr<const Commit> commit;
  storage_->CommitJournal(
      std::move(journal),
      callback::Capture(callback::SetWhenCalled(&called), &status, &commit));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);

  // Commiting a no-op commit should result in a successful status, but a null
  // commit.
  ASSERT_EQ(Status::OK, status);
  ASSERT_FALSE(commit);
}

// Check that receiving a remote commit and commiting locally at the same time
// do not prevent the commit to be marked as unsynced.
TEST_F(PageStorageTest, MarkRemoteCommitSyncedRace) {
  bool sync_delegate_called;
  fit::closure sync_delegate_call;
  DelayingFakeSyncDelegate sync(callback::Capture(
      callback::SetWhenCalled(&sync_delegate_called), &sync_delegate_call));
  storage_->SetSyncDelegate(&sync);

  // We need to create new nodes for the storage to be asynchronous. The empty
  // node is already there, so we create two (child, which is empty, and root,
  // which contains child).
  std::string child_data = btree::EncodeNode(0u, std::vector<Entry>(), {});
  ObjectIdentifier child_identifier = encryption_service_.MakeObjectIdentifier(
      ComputeObjectDigest(PieceType::CHUNK, ObjectType::TREE_NODE, child_data));
  sync.AddObject(child_identifier, child_data);

  std::string root_data =
      btree::EncodeNode(0u, std::vector<Entry>(), {{0u, child_identifier}});
  ObjectIdentifier root_identifier = encryption_service_.MakeObjectIdentifier(
      ComputeObjectDigest(PieceType::CHUNK, ObjectType::TREE_NODE, root_data));
  sync.AddObject(root_identifier, root_data);

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());

  std::unique_ptr<const Commit> commit = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_identifier, std::move(parent));
  CommitId id = commit->GetId();

  // Start adding the remote commit.
  bool commits_from_sync_called;
  Status commits_from_sync_status;
  std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes;
  commits_and_bytes.emplace_back(commit->GetId(),
                                 commit->GetStorageBytes().ToString());
  std::vector<CommitId> missing_ids;
  storage_->AddCommitsFromSync(
      std::move(commits_and_bytes), ChangeSource::CLOUD,
      callback::Capture(callback::SetWhenCalled(&commits_from_sync_called),
                        &commits_from_sync_status, &missing_ids));

  // Make the loop run until GetObject is called in sync, and before
  // AddCommitsFromSync finishes.
  RunLoopUntilIdle();
  EXPECT_TRUE(sync_delegate_called);
  EXPECT_FALSE(commits_from_sync_called);

  // Add the local commit.
  bool commits_from_local_called;
  Status commits_from_local_status;
  storage_->AddCommitFromLocal(
      std::move(commit), {},
      callback::Capture(callback::SetWhenCalled(&commits_from_local_called),
                        &commits_from_local_status));

  RunLoopUntilIdle();
  EXPECT_FALSE(commits_from_sync_called);
  // The local commit should be commited.
  EXPECT_TRUE(commits_from_local_called);
  ASSERT_TRUE(sync_delegate_call);
  sync_delegate_call();

  // Let the two AddCommit finish.
  RunLoopUntilIdle();
  EXPECT_TRUE(commits_from_sync_called);
  EXPECT_TRUE(commits_from_local_called);
  EXPECT_EQ(Status::OK, commits_from_sync_status);
  EXPECT_EQ(Status::OK, commits_from_local_status);

  // Verify that the commit is added correctly.
  bool called;
  Status status;
  storage_->GetCommit(id, callback::Capture(callback::SetWhenCalled(&called),
                                            &status, &commit));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::OK, status);

  // The commit should be marked as synced.
  EXPECT_EQ(0u, GetUnsyncedCommits().size());
}

// Verifies that GetUnsyncedCommits() returns commits ordered by their
// generation, and not by the timestamp.
//
// In this test the commits have the following structure:
//              (root)
//             /   |   \
//           (A)  (B)  (C)
//             \  /
//           (merge)
// C is the last commit to be created. The test verifies that the unsynced
// commits are returned in the generation order, with the merge commit being the
// last despite not being the most recent.
TEST_F(PageStorageTest, GetUnsyncedCommits) {
  std::unique_ptr<const Commit> root = GetFirstHead();
  std::unique_ptr<Journal> journal_a = storage_->StartCommit(root->Clone());
  journal_a->Put("a", RandomObjectIdentifier(environment_.random()),
                 KeyPriority::EAGER);
  std::unique_ptr<const Commit> commit_a =
      TryCommitJournal(std::move(journal_a), Status::OK);
  ASSERT_TRUE(commit_a);
  EXPECT_EQ(1u, commit_a->GetGeneration());

  std::unique_ptr<Journal> journal_b = storage_->StartCommit(root->Clone());
  journal_b->Put("b", RandomObjectIdentifier(environment_.random()),
                 KeyPriority::EAGER);
  std::unique_ptr<const Commit> commit_b =
      TryCommitJournal(std::move(journal_b), Status::OK);
  ASSERT_TRUE(commit_b);
  EXPECT_EQ(1u, commit_b->GetGeneration());

  std::unique_ptr<Journal> journal_merge =
      storage_->StartMergeCommit(std::move(commit_a), std::move(commit_b));

  std::unique_ptr<const Commit> commit_merge =
      TryCommitJournal(std::move(journal_merge), Status::OK);
  ASSERT_TRUE(commit_merge);
  EXPECT_EQ(2u, commit_merge->GetGeneration());

  std::unique_ptr<Journal> journal_c = storage_->StartCommit(std::move(root));
  journal_c->Put("c", RandomObjectIdentifier(environment_.random()),
                 KeyPriority::EAGER);
  std::unique_ptr<const Commit> commit_c =
      TryCommitJournal(std::move(journal_c), Status::OK);
  ASSERT_TRUE(commit_c);
  EXPECT_EQ(1u, commit_c->GetGeneration());

  // Verify that the merge commit is returned as last, even though commit C is
  // older.
  std::vector<std::unique_ptr<const Commit>> unsynced_commits =
      GetUnsyncedCommits();
  EXPECT_EQ(4u, unsynced_commits.size());
  EXPECT_EQ(commit_merge->GetId(), unsynced_commits.back()->GetId());
  EXPECT_LT(commit_merge->GetTimestamp(), commit_c->GetTimestamp());
}

// Add a commit for which we don't have its parent. Verify that an error is
// returned, along with the id of the missing parent.
TEST_F(PageStorageTest, AddCommitsMissingParent) {
  std::unique_ptr<const btree::TreeNode> node;
  ASSERT_TRUE(CreateNodeFromEntries({}, {}, &node));
  ObjectIdentifier root_identifier = node->GetIdentifier();

  std::vector<std::unique_ptr<const Commit>> parent;
  parent.emplace_back(GetFirstHead());
  auto commit_parent = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_identifier, std::move(parent));
  parent.clear();
  parent.push_back(commit_parent->Clone());
  auto commit_child = CommitImpl::FromContentAndParents(
      environment_.clock(), storage_.get(), root_identifier, std::move(parent));

  std::vector<PageStorage::CommitIdAndBytes> commits_and_bytes;
  commits_and_bytes.emplace_back(commit_child->GetId(),
                                 commit_child->GetStorageBytes().ToString());

  bool called;
  Status status;
  std::vector<CommitId> missing_ids;
  storage_->AddCommitsFromSync(
      std::move(commits_and_bytes), ChangeSource::P2P,
      callback::Capture(callback::SetWhenCalled(&called), &status,
                        &missing_ids));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_EQ(Status::INTERNAL_NOT_FOUND, status);
  EXPECT_THAT(missing_ids, ElementsAre(commit_parent->GetId()));
}

TEST_F(PageStorageTest, GetMergeCommitIdsEmpty) {
  std::unique_ptr<const Commit> parent1 = TryCommitFromLocal(3);
  ASSERT_TRUE(parent1);

  std::unique_ptr<const Commit> parent2 = TryCommitFromLocal(3);
  ASSERT_TRUE(parent2);

  // Check that there is no merge of |parent1| and |parent2|.
  bool called;
  Status status;
  std::vector<CommitId> merges;
  storage_->GetMergeCommitIds(
      parent1->GetId(), parent2->GetId(),
      callback::Capture(callback::SetWhenCalled(&called), &status, &merges));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_THAT(merges, IsEmpty());
}

TEST_F(PageStorageTest, GetMergeCommitIdsNonEmpty) {
  std::unique_ptr<const Commit> parent1 = TryCommitFromLocal(3);
  ASSERT_TRUE(parent1);

  std::unique_ptr<const Commit> parent2 = TryCommitFromLocal(3);
  ASSERT_TRUE(parent2);

  std::unique_ptr<Journal> journal =
      storage_->StartMergeCommit(parent1->Clone(), parent2->Clone());

  std::unique_ptr<const Commit> merge =
      TryCommitJournal(std::move(journal), Status::OK);
  ASSERT_TRUE(merge);

  // Check that |merge| is in the list of merges.
  bool called;
  Status status;
  std::vector<CommitId> merges;
  storage_->GetMergeCommitIds(
      parent1->GetId(), parent2->GetId(),
      callback::Capture(callback::SetWhenCalled(&called), &status, &merges));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_THAT(merges, ElementsAre(merge->GetId()));

  storage_->GetMergeCommitIds(
      parent2->GetId(), parent1->GetId(),
      callback::Capture(callback::SetWhenCalled(&called), &status, &merges));
  RunLoopUntilIdle();
  ASSERT_TRUE(called);
  EXPECT_THAT(merges, ElementsAre(merge->GetId()));
}

}  // namespace

}  // namespace storage
