blob: 55efd616c1ca7ac91f7706b17cb7f767964336b3 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <iostream>
#include <memory>
#include <vector>
#include <fuchsia/ledger/cloud/cpp/fidl.h>
#include <lib/async-loop/cpp/loop.h>
#include <lib/component/cpp/startup_context.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/command_line.h>
#include <lib/fxl/files/directory.h>
#include <lib/fxl/files/file.h>
#include <lib/fxl/files/scoped_temp_dir.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/strings/string_number_conversions.h>
#include <lib/zx/time.h>
#include <trace/event.h>
#include "peridot/bin/cloud_provider_firestore/testing/cloud_provider_factory.h"
#include "peridot/bin/ledger/fidl/include/types.h"
#include "peridot/bin/ledger/testing/data_generator.h"
#include "peridot/bin/ledger/testing/get_ledger.h"
#include "peridot/bin/ledger/testing/get_page_ensure_initialized.h"
#include "peridot/bin/ledger/testing/page_data_generator.h"
#include "peridot/bin/ledger/testing/quit_on_error.h"
#include "peridot/bin/ledger/testing/run_with_tracing.h"
#include "peridot/bin/ledger/testing/sync_params.h"
#include "peridot/lib/convert/convert.h"
#include "peridot/lib/rng/test_random.h"
namespace ledger {
namespace {
constexpr fxl::StringView kBinaryPath =
"fuchsia-pkg://fuchsia.com/ledger_benchmarks#meta/fetch.cmx";
constexpr fxl::StringView kStoragePath = "/data/benchmark/ledger/fetch";
constexpr fxl::StringView kEntryCountFlag = "entry-count";
constexpr fxl::StringView kValueSizeFlag = "value-size";
constexpr fxl::StringView kPartSizeFlag = "part-size";
constexpr size_t kKeySize = 100;
const std::string kUserDirectory = "/fetch-user";
void PrintUsage() {
std::cout << "Usage: trace record "
<< kBinaryPath
// Comment to make clang format not break formatting.
<< " --" << kEntryCountFlag << "=<int>"
<< " --" << kValueSizeFlag << "=<int>"
<< " --" << kPartSizeFlag << "=<int>" << GetSyncParamsUsage()
<< std::endl;
}
// Benchmark that measures time to fetch lazy values from server.
// Parameters:
// --entry-count=<int> the number of entries to be put
// --value-size=<int> the size of a single value in bytes
// --part-size=<int> the size of the part to be read with one Fetch
// call. If equal to zero, the whole value will be read.
// --credentials-path=<file path> Firestore service account credentials
class FetchBenchmark : public SyncWatcher {
public:
FetchBenchmark(async::Loop* loop,
std::unique_ptr<component::StartupContext> startup_context,
size_t entry_count, size_t value_size, size_t part_size,
SyncParams sync_params);
void Run();
// SyncWatcher:
void SyncStateChanged(SyncState download, SyncState upload,
SyncStateChangedCallback callback) override;
private:
void Populate();
void WaitForWriterUpload();
void ConnectReader();
void WaitForReaderDownload();
void FetchValues(PageSnapshotPtr snapshot, size_t i);
void FetchPart(PageSnapshotPtr snapshot, size_t i, size_t part);
void ShutDown();
fit::closure QuitLoopClosure();
async::Loop* const loop_;
rng::TestRandom random_;
DataGenerator generator_;
PageDataGenerator page_data_generator_;
std::unique_ptr<component::StartupContext> startup_context_;
cloud_provider_firestore::CloudProviderFactory cloud_provider_factory_;
fidl::Binding<SyncWatcher> sync_watcher_binding_;
const size_t entry_count_;
const size_t value_size_;
const size_t part_size_;
const cloud_provider_firestore::CloudProviderFactory::UserId user_id_;
files::ScopedTempDir writer_tmp_dir_;
files::ScopedTempDir reader_tmp_dir_;
fuchsia::sys::ComponentControllerPtr writer_controller_;
fuchsia::sys::ComponentControllerPtr reader_controller_;
LedgerPtr writer_;
LedgerPtr reader_;
PageId page_id_;
PagePtr writer_page_;
PagePtr reader_page_;
std::vector<std::vector<uint8_t>> keys_;
fit::function<void(SyncState, SyncState)> on_sync_state_changed_;
FXL_DISALLOW_COPY_AND_ASSIGN(FetchBenchmark);
};
FetchBenchmark::FetchBenchmark(
async::Loop* loop,
std::unique_ptr<component::StartupContext> startup_context,
size_t entry_count, size_t value_size, size_t part_size,
SyncParams sync_params)
: loop_(loop),
random_(0),
generator_(&random_),
page_data_generator_(&random_),
startup_context_(std::move(startup_context)),
cloud_provider_factory_(startup_context_.get(), &random_,
std::move(sync_params.api_key),
std::move(sync_params.credentials)),
sync_watcher_binding_(this),
entry_count_(entry_count),
value_size_(value_size),
part_size_(part_size),
user_id_(cloud_provider_firestore::CloudProviderFactory::UserId::New()),
writer_tmp_dir_(kStoragePath),
reader_tmp_dir_(kStoragePath) {
FXL_DCHECK(loop_);
FXL_DCHECK(entry_count_ > 0);
FXL_DCHECK(value_size_ > 0);
FXL_DCHECK(part_size_ <= value_size);
cloud_provider_factory_.Init();
}
void FetchBenchmark::SyncStateChanged(SyncState download, SyncState upload,
SyncStateChangedCallback callback) {
if (on_sync_state_changed_) {
on_sync_state_changed_(download, upload);
}
callback();
}
void FetchBenchmark::Run() {
// Name of the storage directory currently identifies the user. Ensure the
// most nested directory has the same name to make the ledgers sync.
std::string writer_path = writer_tmp_dir_.path() + kUserDirectory;
bool ret = files::CreateDirectory(writer_path);
FXL_DCHECK(ret);
cloud_provider::CloudProviderPtr cloud_provider_writer;
cloud_provider_factory_.MakeCloudProvider(user_id_,
cloud_provider_writer.NewRequest());
Status status = GetLedger(
startup_context_.get(), writer_controller_.NewRequest(),
std::move(cloud_provider_writer), user_id_.user_id(), "fetch",
DetachedPath(std::move(writer_path)), QuitLoopClosure(), &writer_);
if (QuitOnError(QuitLoopClosure(), status, "Get writer ledger")) {
return;
}
GetPageEnsureInitialized(&writer_, nullptr, DelayCallback::YES,
QuitLoopClosure(),
[this](Status status, PagePtr page, PageId id) {
if (QuitOnError(QuitLoopClosure(), status,
"Writer page initialization")) {
return;
}
writer_page_ = std::move(page);
page_id_ = id;
Populate();
});
}
void FetchBenchmark::Populate() {
auto keys = generator_.MakeKeys(entry_count_, kKeySize, entry_count_);
for (size_t i = 0; i < entry_count_; i++) {
keys_.push_back(keys[i]);
}
page_data_generator_.Populate(
&writer_page_, std::move(keys), value_size_, entry_count_,
PageDataGenerator::ReferenceStrategy::REFERENCE, Priority::LAZY,
[this](Status status) {
if (QuitOnError(QuitLoopClosure(), status, "PageGenerator::Populate")) {
return;
}
WaitForWriterUpload();
});
}
void FetchBenchmark::WaitForWriterUpload() {
on_sync_state_changed_ = [this](SyncState download, SyncState upload) {
if (upload == SyncState::IDLE) {
on_sync_state_changed_ = nullptr;
// Stop watching sync state for this page.
sync_watcher_binding_.Unbind();
ConnectReader();
return;
}
};
writer_page_->SetSyncStateWatcher(
sync_watcher_binding_.NewBinding(),
QuitOnErrorCallback(QuitLoopClosure(), "Page::SetSyncStateWatcher"));
}
void FetchBenchmark::ConnectReader() {
std::string reader_path = reader_tmp_dir_.path() + kUserDirectory;
bool ret = files::CreateDirectory(reader_path);
FXL_DCHECK(ret);
cloud_provider::CloudProviderPtr cloud_provider_reader;
cloud_provider_factory_.MakeCloudProvider(user_id_,
cloud_provider_reader.NewRequest());
Status status = GetLedger(
startup_context_.get(), reader_controller_.NewRequest(),
std::move(cloud_provider_reader), user_id_.user_id(), "fetch",
DetachedPath(std::move(reader_path)), QuitLoopClosure(), &reader_);
if (QuitOnError(QuitLoopClosure(), status, "ConnectReader")) {
return;
}
reader_->GetPage(fidl::MakeOptional(page_id_), reader_page_.NewRequest(),
[this](Status status) {
if (QuitOnError(QuitLoopClosure(), status, "GetPage")) {
return;
}
WaitForReaderDownload();
});
}
void FetchBenchmark::WaitForReaderDownload() {
on_sync_state_changed_ = [this](SyncState download, SyncState upload) {
if (download == SyncState::IDLE) {
on_sync_state_changed_ = nullptr;
PageSnapshotPtr snapshot;
reader_page_->GetSnapshot(
snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), nullptr,
QuitOnErrorCallback(QuitLoopClosure(), "GetSnapshot"));
FetchValues(std::move(snapshot), 0);
return;
}
};
reader_page_->SetSyncStateWatcher(
sync_watcher_binding_.NewBinding(),
QuitOnErrorCallback(QuitLoopClosure(), "Page::SetSyncStateWatcher"));
}
void FetchBenchmark::FetchValues(PageSnapshotPtr snapshot, size_t i) {
if (i >= entry_count_) {
ShutDown();
return;
}
if (part_size_ > 0) {
TRACE_ASYNC_BEGIN("benchmark", "Fetch (cumulative)", i);
FetchPart(std::move(snapshot), i, 0);
return;
}
PageSnapshot* snapshot_ptr = snapshot.get();
TRACE_ASYNC_BEGIN("benchmark", "Fetch", i);
snapshot_ptr->Fetch(
std::move(keys_[i]),
[this, snapshot = std::move(snapshot), i](
Status status, fuchsia::mem::BufferPtr value) mutable {
if (QuitOnError(QuitLoopClosure(), status, "PageSnapshot::Fetch")) {
return;
}
TRACE_ASYNC_END("benchmark", "Fetch", i);
FetchValues(std::move(snapshot), i + 1);
});
}
void FetchBenchmark::FetchPart(PageSnapshotPtr snapshot, size_t i,
size_t part) {
if (part * part_size_ >= value_size_) {
TRACE_ASYNC_END("benchmark", "Fetch (cumulative)", i);
FetchValues(std::move(snapshot), i + 1);
return;
}
PageSnapshot* snapshot_ptr = snapshot.get();
auto trace_event_id = TRACE_NONCE();
TRACE_ASYNC_BEGIN("benchmark", "FetchPartial", trace_event_id);
snapshot_ptr->FetchPartial(
keys_[i], part * part_size_, part_size_,
[this, snapshot = std::move(snapshot), i, part, trace_event_id](
Status status, fuchsia::mem::BufferPtr value) mutable {
if (QuitOnError(QuitLoopClosure(), status,
"PageSnapshot::FetchPartial")) {
return;
}
TRACE_ASYNC_END("benchmark", "FetchPartial", trace_event_id);
FetchPart(std::move(snapshot), i, part + 1);
});
}
void FetchBenchmark::ShutDown() {
KillLedgerProcess(&writer_controller_);
KillLedgerProcess(&reader_controller_);
loop_->Quit();
}
fit::closure FetchBenchmark::QuitLoopClosure() {
return [this] { loop_->Quit(); };
}
int Main(int argc, const char** argv) {
fxl::CommandLine command_line = fxl::CommandLineFromArgcArgv(argc, argv);
async::Loop loop(&kAsyncLoopConfigAttachToThread);
auto startup_context = component::StartupContext::CreateFromStartupInfo();
std::string entry_count_str;
size_t entry_count;
std::string value_size_str;
size_t value_size;
std::string part_size_str;
size_t part_size;
SyncParams sync_params;
if (!command_line.GetOptionValue(kEntryCountFlag.ToString(),
&entry_count_str) ||
!fxl::StringToNumberWithError(entry_count_str, &entry_count) ||
entry_count == 0 ||
!command_line.GetOptionValue(kValueSizeFlag.ToString(),
&value_size_str) ||
!fxl::StringToNumberWithError(value_size_str, &value_size) ||
value_size == 0 ||
!command_line.GetOptionValue(kPartSizeFlag.ToString(), &part_size_str) ||
!fxl::StringToNumberWithError(part_size_str, &part_size) ||
!ParseSyncParamsFromCommandLine(command_line, startup_context.get(),
&sync_params)) {
PrintUsage();
return -1;
}
FetchBenchmark app(&loop, std::move(startup_context), entry_count, value_size,
part_size, std::move(sync_params));
return RunWithTracing(&loop, [&app] { app.Run(); });
}
} // namespace
} // namespace ledger
int main(int argc, const char** argv) { return ledger::Main(argc, argv); }