// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/bin/ledger/app/diff_utils.h"

#include <limits>
#include <memory>
#include <vector>

#include <lib/callback/waiter.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/memory/ref_ptr.h>

#include "peridot/bin/ledger/app/fidl/serialization_size.h"
#include "peridot/bin/ledger/app/page_utils.h"
#include "peridot/bin/ledger/storage/public/object.h"
#include "peridot/lib/util/ptr.h"

namespace ledger {
namespace diff_utils {
namespace {
// Returns the key of a storage::ThreeWayChange object. This key is guaranteed
// to be unique.
const std::string& GetKey(const storage::ThreeWayChange& change) {
  if (change.base) {
    return change.base->key;
  }
  if (change.left) {
    return change.left->key;
  }
  return change.right->key;
}

// Constructs a ValuePtr object from an entry. The contents of the ValuePtr will
// be provided through the |waiter|.
ValuePtr GetValueFromEntry(
    storage::PageStorage* const storage,
    const std::unique_ptr<storage::Entry>& entry,
    fit::function<void(Status, fsl::SizedVmo)> callback) {
  if (!entry) {
    callback(Status::OK, fsl::SizedVmo());
    return nullptr;
  }
  ValuePtr value = Value::New();
  switch (entry->priority) {
    case storage::KeyPriority::EAGER:
      value->priority = Priority::EAGER;
      break;
    case storage::KeyPriority::LAZY:
      value->priority = Priority::LAZY;
      break;
  }
  PageUtils::ResolveObjectIdentifierAsBuffer(
      storage, entry->object_identifier, 0u,
      std::numeric_limits<int64_t>::max(),
      storage::PageStorage::Location::LOCAL, Status::OK, std::move(callback));
  return value;
}

// Returns true if the change is automatically mergeable, ie. is not
// conflicting.
bool IsMergeable(const storage::ThreeWayChange& change) {
  return util::EqualPtr(change.base, change.left) ||
         util::EqualPtr(change.base, change.right) ||
         util::EqualPtr(change.left, change.right);
}
}  // namespace

void ComputePageChange(
    storage::PageStorage* storage, const storage::Commit& base,
    const storage::Commit& other, std::string prefix_key, std::string min_key,
    PaginationBehavior pagination_behavior,
    fit::function<void(Status, std::pair<PageChangePtr, std::string>)>
        callback) {
  struct Context {
    // The PageChangePtr to be returned through the callback.
    PageChangePtr page_change = PageChange::New();
    // The serialization size of all entries.
    size_t fidl_size = fidl_serialization::kPageChangeHeaderSize;
    // The number of handles.
    size_t handles_count = 0u;
    // The next token to be returned through the callback.
    std::string next_token = "";
  };

  auto waiter =
      fxl::MakeRefCounted<callback::Waiter<Status, fsl::SizedVmo>>(Status::OK);

  auto context = std::make_unique<Context>();
  context->page_change->timestamp = other.GetTimestamp().get();
  context->page_change->changed_entries.resize(0);
  context->page_change->deleted_keys.resize(0);

  if (min_key < prefix_key) {
    min_key = prefix_key;
  }

  // |on_next| is called for each change on the diff
  auto on_next = [storage, waiter, prefix_key = std::move(prefix_key),
                  context = context.get(),
                  pagination_behavior](storage::EntryChange change) {
    if (!PageUtils::MatchesPrefix(change.entry.key, prefix_key)) {
      return false;
    }
    size_t entry_size =
        change.deleted
            ? fidl_serialization::GetByteVectorSize(change.entry.key.size())
            : fidl_serialization::GetEntrySize(change.entry.key.size());
    size_t entry_handle_count = change.deleted ? 0 : 1;
    if (pagination_behavior == PaginationBehavior::BY_SIZE &&
        (context->fidl_size + entry_size >
             fidl_serialization::kMaxInlineDataSize ||
         context->handles_count + entry_handle_count >
             fidl_serialization::kMaxMessageHandles

         )) {
      context->next_token = change.entry.key;
      return false;
    }
    context->fidl_size += entry_size;
    context->handles_count += entry_handle_count;
    if (change.deleted) {
      context->page_change->deleted_keys.push_back(
          convert::ToArray(change.entry.key));
      return true;
    }

    Entry entry;
    entry.key = convert::ToArray(change.entry.key);
    entry.priority = change.entry.priority == storage::KeyPriority::EAGER
                         ? Priority::EAGER
                         : Priority::LAZY;
    context->page_change->changed_entries.push_back(std::move(entry));
    PageUtils::ResolveObjectIdentifierAsBuffer(
        storage, change.entry.object_identifier, 0u,
        std::numeric_limits<int64_t>::max(),
        storage::PageStorage::Location::LOCAL, Status::OK,
        waiter->NewCallback());
    return true;
  };

  // |on_done| is called when the full diff is computed.
  auto on_done = [waiter = std::move(waiter), context = std::move(context),
                  callback =
                      std::move(callback)](storage::Status status) mutable {
    if (status != storage::Status::OK) {
      FXL_LOG(ERROR) << "Unable to compute diff for PageChange: "
                     << fidl::ToUnderlying(status);
      callback(PageUtils::ConvertStatus(status), std::make_pair(nullptr, ""));
      return;
    }
    if (context->page_change->changed_entries.empty()) {
      if (context->page_change->deleted_keys.empty()) {
        callback(Status::OK, std::make_pair(nullptr, ""));
      } else {
        callback(Status::OK,
                 std::make_pair(std::move(context->page_change), ""));
      }
      return;
    }

    // We need to retrieve the values for each changed key/value pair in order
    // to send it inside the PageChange object. |waiter| collates these
    // asynchronous calls and |result_callback| processes them.
    auto result_callback = [context = std::move(context),
                            callback = std::move(callback)](
                               Status status,
                               std::vector<fsl::SizedVmo> results) mutable {
      if (status != Status::OK) {
        FXL_LOG(ERROR)
            << "Error while reading changed values when computing PageChange: "
            << fidl::ToUnderlying(status);
        callback(status, std::make_pair(nullptr, ""));
        return;
      }
      FXL_DCHECK(results.size() ==
                 context->page_change->changed_entries.size());
      for (size_t i = 0; i < results.size(); i++) {
        FXL_DCHECK(results[i].vmo());
        context->page_change->changed_entries.at(i).value =
            fidl::MakeOptional(std::move(results[i]).ToTransport());
      }
      callback(Status::OK, std::make_pair(std::move(context->page_change),
                                          std::move(context->next_token)));
    };
    waiter->Finalize(std::move(result_callback));
  };
  storage->GetCommitContentsDiff(base, other, std::move(min_key),
                                 std::move(on_next), std::move(on_done));
}

void ComputeThreeWayDiff(
    storage::PageStorage* storage, const storage::Commit& base,
    const storage::Commit& left, const storage::Commit& right,
    std::string prefix_key, std::string min_key, DiffType diff_type,
    fit::function<void(Status,
                       std::pair<std::vector<DiffEntry>, std::string>)>
        callback) {
  struct Context {
    // The array to be returned through the callback.
    std::vector<DiffEntry> changes;
    // The serialization size of all entries.
    size_t fidl_size = fidl_serialization::kVectorHeaderSize;
    // The number of handles.
    size_t handles_count = 0u;
    // The next token to be returned through the callback.
    std::string next_token = "";
  };

  // This waiter collects the values (as VMOs) for all changes that will be
  // returned. As each |DiffEntry| struct has three values, we ensure that
  // values are always returned in a specific order (base, left, right). Some
  // values may be empty, to denote a lack of diff.
  auto waiter =
      fxl::MakeRefCounted<callback::Waiter<Status, fsl::SizedVmo>>(Status::OK);

  auto context = std::make_unique<Context>();

  if (min_key < prefix_key) {
    min_key = prefix_key;
  }

  // |on_next| is called for each change on the diff
  auto on_next = [storage, waiter, prefix_key = std::move(prefix_key),
                  context = context.get(),
                  diff_type](storage::ThreeWayChange change) mutable {
    const std::string& key = GetKey(change);
    if (!PageUtils::MatchesPrefix(key, prefix_key)) {
      return false;
    }
    int number_of_values =
        bool(change.base) + bool(change.left) + bool(change.right);
    size_t diffentry_size =
        fidl_serialization::GetDiffEntrySize(key.size(), number_of_values);
    if (context->fidl_size + diffentry_size >
            fidl_serialization::kMaxInlineDataSize ||
        context->handles_count + number_of_values >
            fidl_serialization::kMaxMessageHandles) {
      context->next_token = key;
      // Stop the iteration as we are over the message capacity.
      return false;
    }

    if (diff_type == DiffType::CONFLICTING && IsMergeable(change)) {
      // We are not interested in this change, continue to the next one.
      return true;
    }

    context->fidl_size += diffentry_size;
    context->handles_count += number_of_values;

    DiffEntry diff_entry;
    diff_entry.key = convert::ToArray(key);
    diff_entry.base =
        GetValueFromEntry(storage, change.base, waiter->NewCallback());
    diff_entry.left =
        GetValueFromEntry(storage, change.left, waiter->NewCallback());
    diff_entry.right =
        GetValueFromEntry(storage, change.right, waiter->NewCallback());
    context->changes.push_back(std::move(diff_entry));
    return true;
  };

  // |on_done| is called when the full diff is computed.
  auto on_done = [waiter = std::move(waiter), context = std::move(context),
                  callback =
                      std::move(callback)](storage::Status status) mutable {
    if (status != storage::Status::OK) {
      FXL_LOG(ERROR) << "Unable to compute diff for PageChange: "
                     << fidl::ToUnderlying(status);
      callback(PageUtils::ConvertStatus(status),
               std::make_pair(std::vector<DiffEntry>(), ""));
      return;
    }
    if (context->changes.empty()) {
      callback(Status::OK,
               std::make_pair(std::vector<DiffEntry>(), ""));
      return;
    }

    // We need to retrieve the values for each changed key/value pair in order
    // to send it inside the PageChange object. |waiter| collates these
    // asynchronous calls and |result_callback| processes them.
    auto result_callback = [context = std::move(context),
                            callback = std::move(callback)](
                               Status status,
                               std::vector<fsl::SizedVmo> results) mutable {
      if (status != Status::OK) {
        FXL_LOG(ERROR)
            << "Error while reading changed values when computing PageChange: "
            << fidl::ToUnderlying(status);
        callback(status,
                 std::make_pair(std::vector<DiffEntry>(), ""));
        return;
      }
      FXL_DCHECK(results.size() == 3 * context->changes.size());
      for (size_t i = 0; i < context->changes.size(); i++) {
        if (results[3 * i]) {
          context->changes.at(i).base->value =
              fidl::MakeOptional(std::move(results[3 * i]).ToTransport());
        }
        if (results[3 * i + 1]) {
          context->changes.at(i).left->value =
              fidl::MakeOptional(std::move(results[3 * i + 1]).ToTransport());
        }
        if (results[3 * i + 2]) {
          context->changes.at(i).right->value =
              fidl::MakeOptional(std::move(results[3 * i + 2]).ToTransport());
        }
      }
      callback(Status::OK, std::make_pair(std::move(context->changes),
                                          std::move(context->next_token)));
    };
    waiter->Finalize(std::move(result_callback));
  };
  storage->GetThreeWayContentsDiff(base, left, right, std::move(min_key),
                                   std::move(on_next), std::move(on_done));
}

}  // namespace diff_utils
}  // namespace ledger
