// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef SRC_LEDGER_BIN_STORAGE_IMPL_PAGE_STORAGE_IMPL_H_
#define SRC_LEDGER_BIN_STORAGE_IMPL_PAGE_STORAGE_IMPL_H_

#include <lib/async/dispatcher.h>
#include <lib/fit/function.h>

#include <vector>

#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/encryption/public/encryption_service.h"
#include "src/ledger/bin/environment/environment.h"
#include "src/ledger/bin/storage/impl/commit_factory.h"
#include "src/ledger/bin/storage/impl/commit_pruner.h"
#include "src/ledger/bin/storage/impl/object_identifier_factory_impl.h"
#include "src/ledger/bin/storage/impl/page_db_impl.h"
#include "src/ledger/bin/storage/public/db.h"
#include "src/ledger/bin/storage/public/page_storage.h"
#include "src/ledger/bin/storage/public/page_sync_delegate.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/lib/coroutine/coroutine.h"
#include "src/ledger/lib/coroutine/coroutine_manager.h"
#include "src/lib/callback/managed_container.h"
#include "src/lib/callback/operation_serializer.h"
#include "src/lib/fsl/vmo/sized_vmo.h"
#include "src/lib/fxl/memory/ref_ptr.h"
#include "src/lib/fxl/memory/weak_ptr.h"
#include "src/lib/fxl/observer_list.h"
#include "src/lib/fxl/strings/string_view.h"

namespace storage {

class PageStorageImpl : public PageStorage, public CommitPruner::CommitPrunerDelegate {
 public:
  PageStorageImpl(ledger::Environment* environment,
                  encryption::EncryptionService* encryption_service, std::unique_ptr<Db> db,
                  PageId page_id, CommitPruningPolicy policy);
  PageStorageImpl(ledger::Environment* environment,
                  encryption::EncryptionService* encryption_service,
                  std::unique_ptr<PageDb> page_db, PageId page_id, CommitPruningPolicy policy);

  ~PageStorageImpl() override;

  // Initializes this PageStorageImpl. This includes initializing the underlying
  // database and adding the default page head if the page is empty.
  void Init(fit::function<void(Status)> callback);

  // Checks whether the given |object_identifier| is untracked, i.e. has been
  // created using |AddObjectFromLocal()|, but is not yet part of any commit.
  // Untracked objects are invalid after the PageStorageImpl object is
  // destroyed.
  void ObjectIsUntracked(ObjectIdentifier object_identifier,
                         fit::function<void(Status, bool)> callback);

  std::string GetEntryId();

  std::string GetEntryIdForMerge(fxl::StringView entry_name, CommitIdView left_parent_id,
                                 CommitIdView right_parent_id, fxl::StringView operation_list);

  void SetSyncDelegate(PageSyncDelegate* page_sync) override;

  // PageStorage:
  PageId GetId() override;
  ObjectIdentifierFactory* GetObjectIdentifierFactory() override;
  Status GetHeadCommits(std::vector<std::unique_ptr<const Commit>>* head_commits) override;
  void GetMergeCommitIds(CommitIdView parent1_id, CommitIdView parent2_id,
                         fit::function<void(Status, std::vector<CommitId>)> callback) override;
  void GetCommit(CommitIdView commit_id,
                 fit::function<void(Status, std::unique_ptr<const Commit>)> callback) override;
  void GetGenerationAndMissingParents(
      const CommitIdAndBytes& id_and_bytes,
      fit::function<void(Status, uint64_t, std::vector<CommitId>)> callback) override;
  void AddCommitsFromSync(std::vector<CommitIdAndBytes> ids_and_bytes, ChangeSource source,
                          fit::function<void(Status)> callback) override;
  std::unique_ptr<Journal> StartCommit(std::unique_ptr<const Commit> commit_id) override;
  std::unique_ptr<Journal> StartMergeCommit(std::unique_ptr<const Commit> left,
                                            std::unique_ptr<const Commit> right) override;
  void CommitJournal(std::unique_ptr<Journal> journal,
                     fit::function<void(Status, std::unique_ptr<const Commit>)> callback) override;
  void AddCommitWatcher(CommitWatcher* watcher) override;
  void RemoveCommitWatcher(CommitWatcher* watcher) override;
  void IsSynced(fit::function<void(Status, bool)> callback) override;
  bool IsOnline() override;
  void IsEmpty(fit::function<void(Status, bool)> callback) override;
  void GetUnsyncedCommits(
      fit::function<void(Status, std::vector<std::unique_ptr<const Commit>>)> callback) override;
  void MarkCommitSynced(const CommitId& commit_id, fit::function<void(Status)> callback) override;
  void GetUnsyncedPieces(
      fit::function<void(Status, std::vector<ObjectIdentifier>)> callback) override;
  void MarkPieceSynced(ObjectIdentifier object_identifier,
                       fit::function<void(Status)> callback) override;
  void IsPieceSynced(ObjectIdentifier object_identifier,
                     fit::function<void(Status, bool)> callback) override;
  void MarkSyncedToPeer(fit::function<void(Status)> callback) override;
  void AddObjectFromLocal(ObjectType object_type, std::unique_ptr<DataSource> data_source,
                          ObjectReferencesAndPriority tree_references,
                          fit::function<void(Status, ObjectIdentifier)> callback) override;
  void GetObjectPart(ObjectIdentifier object_identifier, int64_t offset, int64_t max_size,
                     Location location,
                     fit::function<void(Status, fsl::SizedVmo)> callback) override;
  void GetObject(ObjectIdentifier object_identifier, Location location,
                 fit::function<void(Status, std::unique_ptr<const Object>)> callback) override;
  void GetPiece(ObjectIdentifier object_identifier,
                fit::function<void(Status, std::unique_ptr<const Piece>)> callback) override;
  void SetSyncMetadata(fxl::StringView key, fxl::StringView value,
                       fit::function<void(Status)> callback) override;
  void GetSyncMetadata(fxl::StringView key,
                       fit::function<void(Status, std::string)> callback) override;

  // Commit contents.
  void GetCommitContents(const Commit& commit, std::string min_key,
                         fit::function<bool(Entry)> on_next,
                         fit::function<void(Status)> on_done) override;
  void GetEntryFromCommit(const Commit& commit, std::string key,
                          fit::function<void(Status, Entry)> callback) override;
  void GetDiffForCloud(
      const Commit& target_commit,
      fit::function<void(Status, CommitIdView, std::vector<EntryChange>)> callback) override;
  void GetCommitContentsDiff(const Commit& base_commit, const Commit& other_commit,
                             std::string min_key, fit::function<bool(EntryChange)> on_next_diff,
                             fit::function<void(Status)> on_done) override;
  void GetThreeWayContentsDiff(const Commit& base_commit, const Commit& left_commit,
                               const Commit& right_commit, std::string min_key,
                               fit::function<bool(ThreeWayChange)> on_next_diff,
                               fit::function<void(Status)> on_done) override;

  void GetClock(fit::function<void(Status, std::map<DeviceId, ClockEntry>)> callback) override;

  void GetCommitIdFromRemoteId(fxl::StringView remote_commit_id,
                               fit::function<void(Status, CommitId)> callback) override;

  // CommitPrunerDelegate:
  Status DeleteCommits(coroutine::CoroutineHandler* handler,
                       std::vector<std::unique_ptr<const Commit>> commits) override;
  Status UpdateSelfClockEntry(coroutine::CoroutineHandler* handler,
                              const ClockEntry& entry) override;

  CommitFactory* GetCommitFactory();

 private:
  friend class PageStorageImplAccessorForTest;

  // A function that accepts a |piece|, an |object| and a |callback|. It
  // attempts to extract references from the |piece| and the |object| (which
  // must have the same object identifier) and to add the |piece| to storage
  // with those references. On success, returns |object| to the |callback|. On
  // failure, returns the error, and a nullptr as a second parameter (ie. drops
  // the |object|). See |GetOrDownloadPiece| for details on usage.
  using WritePieceCallback =
      fit::function<void(std::unique_ptr<const Piece>, std::unique_ptr<const Object>,
                         fit::function<void(Status, std::unique_ptr<const Object>)>)>;

  // Marks all pieces needed for the given objects as local.
  FXL_WARN_UNUSED_RESULT Status
  MarkAllPiecesLocal(coroutine::CoroutineHandler* handler, PageDb::Batch* batch,
                     std::vector<ObjectIdentifier> object_identifiers);

  FXL_WARN_UNUSED_RESULT Status ContainsCommit(coroutine::CoroutineHandler* handler,
                                               CommitIdView id);

  bool IsFirstCommit(CommitIdView id);

  // Adds the given synced |piece| object.
  void AddPiece(std::unique_ptr<const Piece> piece, ChangeSource source,
                IsObjectSynced is_object_synced, ObjectReferencesAndPriority references,
                fit::function<void(Status)> callback);

  // Returns the piece identified by |object_identifier|. |location| is either LOCAL and NETWORK,
  // and defines whether the piece should be looked up remotely if not available locally.
  // When the piece has been retrieved remotely, attempts to add it to storage before returning
  // it. If this is not possible, ie. when the piece is an index tree-node that requires the full
  // object to compute its references, also returns a WritePieceCallback. It is the callers
  // responsability to invoke this callback to add the piece to storage once they have gathered
  // the full object. The WritePieceCallback is safe to call as long as this class is valid. It
  // should not outlive the returned piece (since a reference to the piece must be passed to it
  // when invoked), and in practice should be called as soon as the full object containing the
  // piece has been constructed to ensure data is persisted to disk as early as possible.
  void GetOrDownloadPiece(
      ObjectIdentifier object_identifier, Location location,
      fit::function<void(Status, std::unique_ptr<const Piece>, WritePieceCallback)> callback);

  // Reads the content of a piece into a provided VMO. Takes into account the global offset and size
  // in order to be able to read only the requested part of an object.
  // |global_offset| is the offset from the beginning of the full object in bytes. |global_size| is
  // the maximum size requested to be read into the vmo. |current_position| is the position of the
  // currently read piece (defined by |object_identifier|) in the full object. |object_size| is the
  // size of the currently read piece.
  // |location| is either LOCAL and NETWORK and defines the behavior in the case where the object is
  // not found locally.
  void FillBufferWithObjectContent(const Piece& piece, fsl::SizedVmo vmo, int64_t global_offset,
                                   int64_t global_size, int64_t current_position,
                                   int64_t object_size, Location location,
                                   fit::function<void(Status)> callback);

  // Treating the |piece| as FileIndex, initializes a VMO of a needed size and calls
  // FillBufferWithObjectContent on it.
  // |offset| and |max_size| are used to denote partial mapping (see GetObjectPart for details).
  // This method fills |child_identifiers|, if not nullptr, with the identifiers of the direct
  // children of |piece|.
  void GetIndexObject(const Piece& piece, int64_t offset, int64_t max_size, Location location,
                      std::vector<ObjectIdentifier>* child_identifiers,
                      fit::function<void(Status, fsl::SizedVmo)> callback);

  // Notifies the registered watchers of |new_commits|.
  void NotifyWatchersOfNewCommits(const std::vector<std::unique_ptr<const Commit>>& new_commits,
                                  ChangeSource source);

  // Find the root identifier of a commit, even if it is in the process of being added to the
  // storage. This breaks a circual dependency: |GetObject| needs the root identifier of commits to
  // apply and check diffs, but we need to get the objects referred to by a commit before adding it
  // in the storage.
  void GetCommitRootIdentifier(CommitIdView commit_id,
                               fit::function<void(Status, ObjectIdentifier)> callback);

  // Deletes the piece identifier by |object_digest| from local storage. On success, returns the
  // references from the deleted piece to other pieces. Aborts if there is already a pending
  // deletion of |object_digest|.
  Status DeleteObject(coroutine::CoroutineHandler* handler, ObjectDigest object_digest,
                      ObjectReferencesAndPriority* references);

  // Attempts to delete |object_digest|, and recursively schedules the deletion of the objects it
  // references upon success.
  void ScheduleObjectGarbageCollection(const ObjectDigest& object_digest);

  // Synchronous versions of API methods using coroutines.
  FXL_WARN_UNUSED_RESULT Status SynchronousInit(coroutine::CoroutineHandler* handler);

  FXL_WARN_UNUSED_RESULT Status SynchronousGetCommit(coroutine::CoroutineHandler* handler,
                                                     CommitId commit_id,
                                                     std::unique_ptr<const Commit>* commit);

  // Adds the given locally created |commit| in this |PageStorage|.
  FXL_WARN_UNUSED_RESULT Status SynchronousAddCommitFromLocal(
      coroutine::CoroutineHandler* handler, std::unique_ptr<const Commit> commit,
      std::vector<ObjectIdentifier> new_objects);

  FXL_WARN_UNUSED_RESULT Status
  SynchronousAddCommitsFromSync(coroutine::CoroutineHandler* handler,
                                std::vector<CommitIdAndBytes> ids_and_bytes, ChangeSource source);

  FXL_WARN_UNUSED_RESULT Status
  SynchronousGetUnsyncedCommits(coroutine::CoroutineHandler* handler,
                                std::vector<std::unique_ptr<const Commit>>* unsynced_commits);

  FXL_WARN_UNUSED_RESULT Status SynchronousMarkCommitSynced(coroutine::CoroutineHandler* handler,
                                                            const CommitId& commit_id);

  FXL_WARN_UNUSED_RESULT Status SynchronousMarkCommitSyncedInBatch(
      coroutine::CoroutineHandler* handler, PageDb::Batch* batch, const CommitId& commit_id);

  FXL_WARN_UNUSED_RESULT Status SynchronousAddCommits(
      coroutine::CoroutineHandler* handler, std::vector<std::unique_ptr<const Commit>> commits,
      ChangeSource source, std::vector<ObjectIdentifier> new_objects);

  FXL_WARN_UNUSED_RESULT Status SynchronousAddPiece(coroutine::CoroutineHandler* handler,
                                                    const Piece& piece, ChangeSource source,
                                                    IsObjectSynced is_object_synced,
                                                    ObjectReferencesAndPriority references);

  // Synchronous helper methods.

  // Marks this page as online.
  FXL_WARN_UNUSED_RESULT Status SynchronousMarkPageOnline(coroutine::CoroutineHandler* handler,
                                                          PageDb::Batch* batch);

  // Updates the given |empty_node_id| to point to the empty node's
  // ObjectIdentifier.
  FXL_WARN_UNUSED_RESULT Status SynchronousGetEmptyNodeIdentifier(
      coroutine::CoroutineHandler* handler, ObjectIdentifier** empty_node_id);

  // Returns the root identifier of the base parent of |commit|.
  FXL_WARN_UNUSED_RESULT Status
  GetBaseParentRootIdentifier(coroutine::CoroutineHandler* handler, const Commit& commit,
                              ObjectIdentifier* base_parent_root_identifier);

  // Checks if a tracked object identifier is tracked by this PageStorage.
  // Returns true for all untracked object identifiers.
  bool IsTokenValid(const ObjectIdentifier& object_identifier);

  // Finds good commits to use as diff bases for |target_id|. The commits will be locally present
  // and synced.
  void ChooseDiffBases(CommitIdView target_id,
                       fit::callback<void(Status, std::vector<CommitId>)> callback);

  ledger::Environment* environment_;
  encryption::EncryptionService* const encryption_service_;
  const PageId page_id_;
  ObjectIdentifierFactoryImpl object_identifier_factory_;
  // Objects currently handled by |DeleteObject|.
  std::set<ObjectDigest> pending_garbage_collection_;
  CommitFactory commit_factory_;
  std::unique_ptr<PageDb> db_;
  // The commit pruner accesses the database, it must be destructed before |db_|.
  CommitPruner commit_pruner_;
  fxl::ObserverList<CommitWatcher> watchers_;
  callback::ManagedContainer managed_container_;
  PageSyncDelegate* page_sync_;
  bool page_is_online_ = false;
  std::unique_ptr<ObjectIdentifier> empty_node_id_ = nullptr;
  // Temporarily stores the root of commits being added from sync, so they can be used to apply
  // diffs. A commit will be removed from this set once it is successfully added to the storage.
  std::map<CommitId, ObjectIdentifier, std::less<>> roots_of_commits_being_added_;
  // Temporarily stores the mapping from remote commit id to local commit id for commits that have
  // not yet been added to the storage. A commit will be removed from this set once it is
  // successfully added to the storage.
  std::map<std::string, CommitId, std::less<>> remote_ids_of_commits_being_added_;
  // Identifier for this device on the page clock. It does not need to be consistent across pages.
  DeviceId device_id_;

  callback::OperationSerializer commit_serializer_;
  coroutine::CoroutineManager download_manager_;
  coroutine::CoroutineManager coroutine_manager_;

  // This must be the last member of the class.
  fxl::WeakPtrFactory<PageStorageImpl> weak_factory_;

  FXL_DISALLOW_COPY_AND_ASSIGN(PageStorageImpl);
};

}  // namespace storage

#endif  // SRC_LEDGER_BIN_STORAGE_IMPL_PAGE_STORAGE_IMPL_H_
