// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/bin/ledger/app/merging/auto_merge_strategy.h"

#include <memory>
#include <string>
#include <vector>

#include <lib/callback/scoped_callback.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/ref_ptr.h>
#include <lib/fxl/memory/weak_ptr.h>

#include "peridot/bin/ledger/app/merging/conflict_resolver_client.h"
#include "peridot/bin/ledger/app/page_manager.h"
#include "peridot/bin/ledger/app/page_utils.h"

namespace ledger {
class AutoMergeStrategy::AutoMerger {
 public:
  AutoMerger(storage::PageStorage* storage, PageManager* page_manager,
             ConflictResolver* conflict_resolver,
             std::unique_ptr<const storage::Commit> left,
             std::unique_ptr<const storage::Commit> right,
             std::unique_ptr<const storage::Commit> ancestor,
             fit::function<void(Status)> callback);
  ~AutoMerger();

  void Start();
  void Cancel();
  void Done(Status status);

 private:
  void OnRightChangeReady(
      storage::Status status,
      std::unique_ptr<std::vector<storage::EntryChange>> right_change);
  void OnComparisonDone(
      storage::Status status,
      std::unique_ptr<std::vector<storage::EntryChange>> right_changes,
      bool distinct);
  void ApplyDiffOnJournal(
      std::unique_ptr<storage::Journal> journal,
      std::unique_ptr<std::vector<storage::EntryChange>> diff);

  storage::PageStorage* const storage_;
  PageManager* const manager_;
  ConflictResolver* const conflict_resolver_;

  std::unique_ptr<const storage::Commit> left_;
  std::unique_ptr<const storage::Commit> right_;
  std::unique_ptr<const storage::Commit> ancestor_;

  std::unique_ptr<ConflictResolverClient> delegated_merge_;

  fit::function<void(Status)> callback_;

  bool cancelled_ = false;

  // This must be the last member of the class.
  fxl::WeakPtrFactory<AutoMergeStrategy::AutoMerger> weak_factory_;
};

AutoMergeStrategy::AutoMerger::AutoMerger(
    storage::PageStorage* storage, PageManager* page_manager,
    ConflictResolver* conflict_resolver,
    std::unique_ptr<const storage::Commit> left,
    std::unique_ptr<const storage::Commit> right,
    std::unique_ptr<const storage::Commit> ancestor,
    fit::function<void(Status)> callback)
    : storage_(storage),
      manager_(page_manager),
      conflict_resolver_(conflict_resolver),
      left_(std::move(left)),
      right_(std::move(right)),
      ancestor_(std::move(ancestor)),
      callback_(std::move(callback)),
      weak_factory_(this) {
  FXL_DCHECK(callback_);
}

AutoMergeStrategy::AutoMerger::~AutoMerger() {}

void AutoMergeStrategy::AutoMerger::Start() {
  auto changes = std::make_unique<std::vector<storage::EntryChange>>();
  auto on_next = [weak_this = weak_factory_.GetWeakPtr(),
                  changes = changes.get()](storage::EntryChange change) {
    if (!weak_this) {
      return false;
    }

    if (weak_this->cancelled_) {
      return false;
    }

    changes->push_back(change);
    return true;
  };

  auto callback = callback::MakeScoped(
      weak_factory_.GetWeakPtr(),
      [this, changes = std::move(changes)](storage::Status status) mutable {
        if (cancelled_) {
          Done(Status::INTERNAL_ERROR);
          return;
        }
        OnRightChangeReady(status, std::move(changes));
      });

  storage_->GetCommitContentsDiff(*ancestor_, *right_, "", std::move(on_next),
                                  std::move(callback));
}

void AutoMergeStrategy::AutoMerger::OnRightChangeReady(
    storage::Status status,
    std::unique_ptr<std::vector<storage::EntryChange>> right_change) {
  if (cancelled_) {
    Done(Status::INTERNAL_ERROR);
    return;
  }

  if (status != storage::Status::OK) {
    FXL_LOG(ERROR) << "Unable to compute right diff due to error " << status
                   << ", aborting.";
    Done(PageUtils::ConvertStatus(status));
    return;
  }

  if (right_change->empty()) {
    OnComparisonDone(storage::Status::OK, std::move(right_change), true);
    return;
  }

  struct PageChangeIndex {
    size_t entry_index = 0;
    bool distinct = true;
  };

  auto index = std::make_unique<PageChangeIndex>();

  auto on_next = [weak_this = weak_factory_.GetWeakPtr(), index = index.get(),
                  right_change =
                      right_change.get()](storage::EntryChange change) {
    if (!weak_this || weak_this->cancelled_) {
      return false;
    }

    while (change.entry.key > (*right_change)[index->entry_index].entry.key) {
      index->entry_index++;
      if (index->entry_index >= right_change->size()) {
        return false;
      }
    }
    if (change.entry.key == (*right_change)[index->entry_index].entry.key) {
      if (change == (*right_change)[index->entry_index]) {
        return true;
      }
      index->distinct = false;
      return false;
    }
    return true;
  };

  // |callback| is called when the full diff is computed.
  auto callback = callback::MakeScoped(
      weak_factory_.GetWeakPtr(),
      [this, right_change = std::move(right_change),
       index = std::move(index)](storage::Status status) mutable {
        if (cancelled_) {
          Done(Status::INTERNAL_ERROR);
          return;
        }
        OnComparisonDone(status, std::move(right_change), index->distinct);
      });

  storage_->GetCommitContentsDiff(*ancestor_, *left_, "", std::move(on_next),
                                  std::move(callback));
}

void AutoMergeStrategy::AutoMerger::OnComparisonDone(
    storage::Status status,
    std::unique_ptr<std::vector<storage::EntryChange>> right_changes,
    bool distinct) {
  if (cancelled_) {
    Done(Status::INTERNAL_ERROR);
    return;
  }

  if (status != storage::Status::OK) {
    FXL_LOG(ERROR) << "Unable to compute left diff due to error " << status
                   << ", aborting.";
    Done(PageUtils::ConvertStatus(status));
    return;
  }

  if (!distinct) {
    // Some keys are overlapping, so we need to proceed like the CUSTOM
    // strategy. We could be more efficient if we reused |right_changes| instead
    // of re-computing the diff inside |ConflictResolverClient|.
    delegated_merge_ = std::make_unique<ConflictResolverClient>(
        storage_, manager_, conflict_resolver_, std::move(left_),
        std::move(right_), std::move(ancestor_),
        callback::MakeScoped(weak_factory_.GetWeakPtr(), [this](Status status) {
          if (cancelled_) {
            Done(Status::INTERNAL_ERROR);
            return;
          }
          Done(status);
        }));

    delegated_merge_->Start();
    return;
  }

  // Here, we reuse the diff we computed before to create the merge commit. As
  // StartMergeCommit uses the left commit (first parameter) as its base, we
  // only have to apply the right diff to it and we are done.
  storage_->StartMergeCommit(
      left_->GetId(), right_->GetId(),
      callback::MakeScoped(
          weak_factory_.GetWeakPtr(),
          [this, right_changes = std::move(right_changes)](
              storage::Status s,
              std::unique_ptr<storage::Journal> journal) mutable {
            if (cancelled_) {
              Done(Status::INTERNAL_ERROR);
              return;
            }
            if (s != storage::Status::OK) {
              FXL_LOG(ERROR) << "Unable to start merge commit: " << s;
              Done(PageUtils::ConvertStatus(s));
              return;
            }
            ApplyDiffOnJournal(std::move(journal), std::move(right_changes));
          }));
}

void AutoMergeStrategy::AutoMerger::ApplyDiffOnJournal(
    std::unique_ptr<storage::Journal> journal,
    std::unique_ptr<std::vector<storage::EntryChange>> diff) {
  auto waiter = fxl::MakeRefCounted<callback::StatusWaiter<storage::Status>>(
      storage::Status::OK);
  for (const storage::EntryChange& change : *diff) {
    if (change.deleted) {
      journal->Delete(change.entry.key, waiter->NewCallback());
    } else {
      journal->Put(change.entry.key, change.entry.object_identifier,
                   change.entry.priority, waiter->NewCallback());
    }
  }

  waiter->Finalize([weak_this = weak_factory_.GetWeakPtr(),
                    journal = std::move(journal)](storage::Status s) mutable {
    if (!weak_this) {
      return;
    }
    if (weak_this->cancelled_) {
      weak_this->Done(Status::INTERNAL_ERROR);
      return;
    }
    if (s != storage::Status::OK) {
      FXL_LOG(ERROR) << "Unable to commit merge journal: " << s;
      weak_this->Done(PageUtils::ConvertStatus(s));
      return;
    }
    weak_this->storage_->CommitJournal(
        std::move(journal),
        [weak_this = std::move(weak_this)](
            storage::Status s,
            std::unique_ptr<const storage::Commit> /*commit*/) {
          if (s != storage::Status::OK) {
            FXL_LOG(ERROR) << "Unable to commit merge journal: " << s;
          }
          if (weak_this) {
            weak_this->Done(PageUtils::ConvertStatus(s));
          }
        });
  });
}

void AutoMergeStrategy::AutoMerger::Cancel() {
  cancelled_ = true;
  if (delegated_merge_) {
    delegated_merge_->Cancel();
  }
}

void AutoMergeStrategy::AutoMerger::Done(Status status) {
  delegated_merge_.reset();
  auto callback = std::move(callback_);
  callback_ = nullptr;
  callback(status);
}

AutoMergeStrategy::AutoMergeStrategy(ConflictResolverPtr conflict_resolver)
    : conflict_resolver_(std::move(conflict_resolver)) {
  conflict_resolver_.set_error_handler([this](zx_status_t status) {
    // If a merge is in progress, it must be terminated.
    if (in_progress_merge_) {
      // The actual cleanup of in_progress_merge_ will happen in its callback
      // callback.
      in_progress_merge_->Cancel();
    }
    if (on_error_) {
      // It is safe to call |on_error_| because the error handler waits for the
      // merges to finish before deleting this object.
      on_error_();
    }
  });
}

AutoMergeStrategy::~AutoMergeStrategy() {}

void AutoMergeStrategy::SetOnError(fit::closure on_error) {
  on_error_ = std::move(on_error);
}

void AutoMergeStrategy::Merge(storage::PageStorage* storage,
                              PageManager* page_manager,
                              std::unique_ptr<const storage::Commit> head_1,
                              std::unique_ptr<const storage::Commit> head_2,
                              std::unique_ptr<const storage::Commit> ancestor,
                              fit::function<void(Status)> callback) {
  FXL_DCHECK(head_1->GetTimestamp() <= head_2->GetTimestamp());
  FXL_DCHECK(!in_progress_merge_);

  in_progress_merge_ = std::make_unique<AutoMergeStrategy::AutoMerger>(
      storage, page_manager, conflict_resolver_.get(), std::move(head_2),
      std::move(head_1), std::move(ancestor),
      [this, callback = std::move(callback)](Status status) {
        in_progress_merge_.reset();
        callback(status);
      });

  in_progress_merge_->Start();
}

void AutoMergeStrategy::Cancel() {
  if (in_progress_merge_) {
    in_progress_merge_->Cancel();
  }
}

}  // namespace ledger
