// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <iostream>
#include <memory>
#include <vector>

#include <fuchsia/ledger/cloud/cpp/fidl.h>
#include <lib/async-loop/cpp/loop.h>
#include <lib/component/cpp/startup_context.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/command_line.h>
#include <lib/fxl/files/directory.h>
#include <lib/fxl/files/file.h>
#include <lib/fxl/files/scoped_temp_dir.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/strings/string_number_conversions.h>
#include <lib/zx/time.h>
#include <trace/event.h>

#include "peridot/bin/cloud_provider_firestore/testing/cloud_provider_factory.h"
#include "peridot/bin/ledger/fidl/include/types.h"
#include "peridot/bin/ledger/testing/data_generator.h"
#include "peridot/bin/ledger/testing/get_ledger.h"
#include "peridot/bin/ledger/testing/get_page_ensure_initialized.h"
#include "peridot/bin/ledger/testing/page_data_generator.h"
#include "peridot/bin/ledger/testing/quit_on_error.h"
#include "peridot/bin/ledger/testing/run_with_tracing.h"
#include "peridot/bin/ledger/testing/sync_params.h"
#include "peridot/lib/convert/convert.h"
#include "peridot/lib/rng/test_random.h"

namespace ledger {
namespace {

constexpr fxl::StringView kBinaryPath =
    "fuchsia-pkg://fuchsia.com/ledger_benchmarks#meta/fetch.cmx";
constexpr fxl::StringView kStoragePath = "/data/benchmark/ledger/fetch";
constexpr fxl::StringView kEntryCountFlag = "entry-count";
constexpr fxl::StringView kValueSizeFlag = "value-size";
constexpr fxl::StringView kPartSizeFlag = "part-size";

constexpr size_t kKeySize = 100;

const std::string kUserDirectory = "/fetch-user";

void PrintUsage() {
  std::cout << "Usage: trace record "
            << kBinaryPath
            // Comment to make clang format not break formatting.
            << " --" << kEntryCountFlag << "=<int>"
            << " --" << kValueSizeFlag << "=<int>"
            << " --" << kPartSizeFlag << "=<int>" << GetSyncParamsUsage()
            << std::endl;
}

// Benchmark that measures time to fetch lazy values from server.
// Parameters:
//   --entry-count=<int> the number of entries to be put
//   --value-size=<int> the size of a single value in bytes
//   --part-size=<int> the size of the part to be read with one Fetch
//   call. If equal to zero, the whole value will be read.
//   --credentials-path=<file path> Firestore service account credentials
class FetchBenchmark : public SyncWatcher {
 public:
  FetchBenchmark(async::Loop* loop,
                 std::unique_ptr<component::StartupContext> startup_context,
                 size_t entry_count, size_t value_size, size_t part_size,
                 SyncParams sync_params);

  void Run();

  // SyncWatcher:
  void SyncStateChanged(SyncState download, SyncState upload,
                        SyncStateChangedCallback callback) override;

 private:
  void Populate();
  void WaitForWriterUpload();
  void ConnectReader();
  void WaitForReaderDownload();

  void FetchValues(PageSnapshotPtr snapshot, size_t i);
  void FetchPart(PageSnapshotPtr snapshot, size_t i, size_t part);

  void ShutDown();
  fit::closure QuitLoopClosure();

  async::Loop* const loop_;
  rng::TestRandom random_;
  DataGenerator generator_;
  PageDataGenerator page_data_generator_;
  std::unique_ptr<component::StartupContext> startup_context_;
  cloud_provider_firestore::CloudProviderFactory cloud_provider_factory_;
  fidl::Binding<SyncWatcher> sync_watcher_binding_;
  const size_t entry_count_;
  const size_t value_size_;
  const size_t part_size_;
  const cloud_provider_firestore::CloudProviderFactory::UserId user_id_;
  files::ScopedTempDir writer_tmp_dir_;
  files::ScopedTempDir reader_tmp_dir_;
  fuchsia::sys::ComponentControllerPtr writer_controller_;
  fuchsia::sys::ComponentControllerPtr reader_controller_;
  LedgerPtr writer_;
  LedgerPtr reader_;
  PageId page_id_;
  PagePtr writer_page_;
  PagePtr reader_page_;
  std::vector<fidl::VectorPtr<uint8_t>> keys_;
  fit::function<void(SyncState, SyncState)> on_sync_state_changed_;

  FXL_DISALLOW_COPY_AND_ASSIGN(FetchBenchmark);
};

FetchBenchmark::FetchBenchmark(
    async::Loop* loop,
    std::unique_ptr<component::StartupContext> startup_context,
    size_t entry_count, size_t value_size, size_t part_size,
    SyncParams sync_params)
    : loop_(loop),
      random_(0),
      generator_(&random_),
      page_data_generator_(&random_),
      startup_context_(std::move(startup_context)),
      cloud_provider_factory_(startup_context_.get(), &random_,
                              std::move(sync_params.api_key),
                              std::move(sync_params.credentials)),
      sync_watcher_binding_(this),
      entry_count_(entry_count),
      value_size_(value_size),
      part_size_(part_size),
      user_id_(cloud_provider_firestore::CloudProviderFactory::UserId::New()),
      writer_tmp_dir_(kStoragePath),
      reader_tmp_dir_(kStoragePath) {
  FXL_DCHECK(loop_);
  FXL_DCHECK(entry_count_ > 0);
  FXL_DCHECK(value_size_ > 0);
  FXL_DCHECK(part_size_ <= value_size);
  cloud_provider_factory_.Init();
}

void FetchBenchmark::SyncStateChanged(SyncState download, SyncState upload,
                                      SyncStateChangedCallback callback) {
  if (on_sync_state_changed_) {
    on_sync_state_changed_(download, upload);
  }
  callback();
}

void FetchBenchmark::Run() {
  // Name of the storage directory currently identifies the user. Ensure the
  // most nested directory has the same name to make the ledgers sync.
  std::string writer_path = writer_tmp_dir_.path() + kUserDirectory;
  bool ret = files::CreateDirectory(writer_path);
  FXL_DCHECK(ret);

  cloud_provider::CloudProviderPtr cloud_provider_writer;
  cloud_provider_factory_.MakeCloudProvider(user_id_,
                                            cloud_provider_writer.NewRequest());
  Status status = GetLedger(
      startup_context_.get(), writer_controller_.NewRequest(),
      std::move(cloud_provider_writer), "fetch",
      DetachedPath(std::move(writer_path)), QuitLoopClosure(), &writer_);
  if (QuitOnError(QuitLoopClosure(), status, "Get writer ledger")) {
    return;
  }

  GetPageEnsureInitialized(&writer_, nullptr, QuitLoopClosure(),
                           [this](Status status, PagePtr page, PageId id) {
                             if (QuitOnError(QuitLoopClosure(), status,
                                             "Writer page initialization")) {
                               return;
                             }
                             writer_page_ = std::move(page);
                             page_id_ = id;

                             Populate();
                           });
}

void FetchBenchmark::Populate() {
  auto keys = generator_.MakeKeys(entry_count_, kKeySize, entry_count_);
  for (size_t i = 0; i < entry_count_; i++) {
    keys_.push_back(keys[i].Clone());
  }

  page_data_generator_.Populate(
      &writer_page_, std::move(keys), value_size_, entry_count_,
      PageDataGenerator::ReferenceStrategy::REFERENCE, Priority::LAZY,
      [this](Status status) {
        if (QuitOnError(QuitLoopClosure(), status, "PageGenerator::Populate")) {
          return;
        }
        WaitForWriterUpload();
      });
}

void FetchBenchmark::WaitForWriterUpload() {
  on_sync_state_changed_ = [this](SyncState download, SyncState upload) {
    if (upload == SyncState::IDLE) {
      on_sync_state_changed_ = nullptr;
      // Stop watching sync state for this page.
      sync_watcher_binding_.Unbind();
      ConnectReader();
      return;
    }
  };
  writer_page_->SetSyncStateWatcher(
      sync_watcher_binding_.NewBinding(),
      QuitOnErrorCallback(QuitLoopClosure(), "Page::SetSyncStateWatcher"));
}

void FetchBenchmark::ConnectReader() {
  std::string reader_path = reader_tmp_dir_.path() + kUserDirectory;
  bool ret = files::CreateDirectory(reader_path);
  FXL_DCHECK(ret);

  cloud_provider::CloudProviderPtr cloud_provider_reader;
  cloud_provider_factory_.MakeCloudProvider(user_id_,
                                            cloud_provider_reader.NewRequest());
  Status status = GetLedger(
      startup_context_.get(), reader_controller_.NewRequest(),
      std::move(cloud_provider_reader), "fetch",
      DetachedPath(std::move(reader_path)), QuitLoopClosure(), &reader_);
  if (QuitOnError(QuitLoopClosure(), status, "ConnectReader")) {
    return;
  }

  reader_->GetPage(fidl::MakeOptional(page_id_), reader_page_.NewRequest(),
                   [this](Status status) {
                     if (QuitOnError(QuitLoopClosure(), status, "GetPage")) {
                       return;
                     }
                     WaitForReaderDownload();
                   });
}

void FetchBenchmark::WaitForReaderDownload() {
  on_sync_state_changed_ = [this](SyncState download, SyncState upload) {
    if (download == SyncState::IDLE) {
      on_sync_state_changed_ = nullptr;
      PageSnapshotPtr snapshot;
      reader_page_->GetSnapshot(
          snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), nullptr,
          QuitOnErrorCallback(QuitLoopClosure(), "GetSnapshot"));
      FetchValues(std::move(snapshot), 0);
      return;
    }
  };
  reader_page_->SetSyncStateWatcher(
      sync_watcher_binding_.NewBinding(),
      QuitOnErrorCallback(QuitLoopClosure(), "Page::SetSyncStateWatcher"));
}

void FetchBenchmark::FetchValues(PageSnapshotPtr snapshot, size_t i) {
  if (i >= entry_count_) {
    ShutDown();
    return;
  }

  if (part_size_ > 0) {
    TRACE_ASYNC_BEGIN("benchmark", "Fetch (cumulative)", i);
    FetchPart(std::move(snapshot), i, 0);
    return;
  }
  PageSnapshot* snapshot_ptr = snapshot.get();

  TRACE_ASYNC_BEGIN("benchmark", "Fetch", i);
  snapshot_ptr->Fetch(
      std::move(keys_[i]),
      [this, snapshot = std::move(snapshot), i](
          Status status, fuchsia::mem::BufferPtr value) mutable {
        if (QuitOnError(QuitLoopClosure(), status, "PageSnapshot::Fetch")) {
          return;
        }
        TRACE_ASYNC_END("benchmark", "Fetch", i);
        FetchValues(std::move(snapshot), i + 1);
      });
}

void FetchBenchmark::FetchPart(PageSnapshotPtr snapshot, size_t i,
                               size_t part) {
  if (part * part_size_ >= value_size_) {
    TRACE_ASYNC_END("benchmark", "Fetch (cumulative)", i);
    FetchValues(std::move(snapshot), i + 1);
    return;
  }
  PageSnapshot* snapshot_ptr = snapshot.get();
  auto trace_event_id = TRACE_NONCE();
  TRACE_ASYNC_BEGIN("benchmark", "FetchPartial", trace_event_id);
  snapshot_ptr->FetchPartial(
      keys_[i].Clone(), part * part_size_, part_size_,
      [this, snapshot = std::move(snapshot), i, part, trace_event_id](
          Status status, fuchsia::mem::BufferPtr value) mutable {
        if (QuitOnError(QuitLoopClosure(), status,
                        "PageSnapshot::FetchPartial")) {
          return;
        }
        TRACE_ASYNC_END("benchmark", "FetchPartial", trace_event_id);
        FetchPart(std::move(snapshot), i, part + 1);
      });
}

void FetchBenchmark::ShutDown() {
  KillLedgerProcess(&writer_controller_);
  KillLedgerProcess(&reader_controller_);
  loop_->Quit();
}

fit::closure FetchBenchmark::QuitLoopClosure() {
  return [this] { loop_->Quit(); };
}

int Main(int argc, const char** argv) {
  fxl::CommandLine command_line = fxl::CommandLineFromArgcArgv(argc, argv);
  async::Loop loop(&kAsyncLoopConfigAttachToThread);
  auto startup_context = component::StartupContext::CreateFromStartupInfo();

  std::string entry_count_str;
  size_t entry_count;
  std::string value_size_str;
  size_t value_size;
  std::string part_size_str;
  size_t part_size;
  SyncParams sync_params;
  if (!command_line.GetOptionValue(kEntryCountFlag.ToString(),
                                   &entry_count_str) ||
      !fxl::StringToNumberWithError(entry_count_str, &entry_count) ||
      entry_count == 0 ||
      !command_line.GetOptionValue(kValueSizeFlag.ToString(),
                                   &value_size_str) ||
      !fxl::StringToNumberWithError(value_size_str, &value_size) ||
      value_size == 0 ||
      !command_line.GetOptionValue(kPartSizeFlag.ToString(), &part_size_str) ||
      !fxl::StringToNumberWithError(part_size_str, &part_size) ||
      !ParseSyncParamsFromCommandLine(command_line, startup_context.get(),
                                      &sync_params)) {
    PrintUsage();
    return -1;
  }

  FetchBenchmark app(&loop, std::move(startup_context), entry_count, value_size,
                     part_size, std::move(sync_params));
  return RunWithTracing(&loop, [&app] { app.Run(); });
}

}  // namespace
}  // namespace ledger

int main(int argc, const char** argv) { return ledger::Main(argc, argv); }
