| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <iostream> |
| #include <memory> |
| #include <set> |
| #include <vector> |
| |
| #include <lib/async-loop/cpp/loop.h> |
| #include <lib/callback/waiter.h> |
| #include <lib/component/cpp/startup_context.h> |
| #include <lib/fidl/cpp/optional.h> |
| #include <lib/fit/function.h> |
| #include <lib/fxl/command_line.h> |
| #include <lib/fxl/files/directory.h> |
| #include <lib/fxl/files/file.h> |
| #include <lib/fxl/files/scoped_temp_dir.h> |
| #include <lib/fxl/logging.h> |
| #include <lib/fxl/memory/ref_ptr.h> |
| #include <lib/fxl/strings/string_number_conversions.h> |
| #include <lib/zx/time.h> |
| #include <trace/event.h> |
| |
| #include "peridot/bin/cloud_provider_firestore/testing/cloud_provider_factory.h" |
| #include "peridot/bin/ledger/fidl/include/types.h" |
| #include "peridot/bin/ledger/testing/data_generator.h" |
| #include "peridot/bin/ledger/testing/get_ledger.h" |
| #include "peridot/bin/ledger/testing/get_page_ensure_initialized.h" |
| #include "peridot/bin/ledger/testing/quit_on_error.h" |
| #include "peridot/bin/ledger/testing/run_with_tracing.h" |
| #include "peridot/bin/ledger/testing/sync_params.h" |
| #include "peridot/lib/convert/convert.h" |
| #include "peridot/lib/rng/test_random.h" |
| |
| namespace ledger { |
| namespace { |
| constexpr fxl::StringView kBinaryPath = |
| "fuchsia-pkg://fuchsia.com/ledger_benchmarks#meta/convergence.cmx"; |
| constexpr fxl::StringView kStoragePath = "/data/benchmark/ledger/convergence"; |
| constexpr fxl::StringView kEntryCountFlag = "entry-count"; |
| constexpr fxl::StringView kValueSizeFlag = "value-size"; |
| constexpr fxl::StringView kDeviceCountFlag = "device-count"; |
| |
| void PrintUsage() { |
| std::cout << "Usage: trace record " |
| << kBinaryPath |
| // Comment to make clang format not break formatting. |
| << " --" << kEntryCountFlag << "=<int>" |
| << " --" << kValueSizeFlag << "=<int>" |
| << " --" << kDeviceCountFlag << "=<int>" << GetSyncParamsUsage() |
| << std::endl; |
| } |
| |
| constexpr size_t kKeySize = 100; |
| |
| // Benchmark that measures the time it takes to sync and reconcile concurrent |
| // writes. |
| // |
| // In this scenario there are specified number of (emulated) devices. At each |
| // step, every device makes a concurrent write, and we measure the time until |
| // all the changes are visible to all devices. |
| // |
| // Parameters: |
| // --entry-count=<int> the number of entries to be put by each device |
| // --value-size=<int> the size of a single value in bytes |
| // --device-count=<int> number of devices writing to the same page |
| // --credentials-path=<file path> Firestore service account credentials |
| class ConvergenceBenchmark : public PageWatcher { |
| public: |
| ConvergenceBenchmark( |
| async::Loop* loop, |
| std::unique_ptr<component::StartupContext> startup_context, |
| int entry_count, int value_size, int device_count, |
| SyncParams sync_params); |
| |
| void Run(); |
| |
| // PageWatcher: |
| void OnChange(PageChange page_change, ResultState result_state, |
| OnChangeCallback callback) override; |
| |
| private: |
| // Instances needed to control the Ledger process associated with a device and |
| // interact with it. |
| struct DeviceContext { |
| std::unique_ptr<files::ScopedTempDir> storage_directory; |
| fuchsia::sys::ComponentControllerPtr controller; |
| LedgerPtr ledger; |
| PagePtr page_connection; |
| std::unique_ptr<fidl::Binding<PageWatcher>> page_watcher; |
| }; |
| |
| void Start(int step); |
| |
| void ShutDown(); |
| fit::closure QuitLoopClosure(); |
| |
| async::Loop* const loop_; |
| rng::TestRandom random_; |
| DataGenerator generator_; |
| std::unique_ptr<component::StartupContext> startup_context_; |
| cloud_provider_firestore::CloudProviderFactory cloud_provider_factory_; |
| const int entry_count_; |
| const int value_size_; |
| const int device_count_; |
| const cloud_provider_firestore::CloudProviderFactory::UserId user_id_; |
| // Track all Ledger instances running for this test and allow to interact with |
| // it. |
| std::vector<std::unique_ptr<DeviceContext>> devices_; |
| PageId page_id_; |
| std::multiset<std::string> remaining_keys_; |
| int current_step_ = -1; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(ConvergenceBenchmark); |
| }; |
| |
| ConvergenceBenchmark::ConvergenceBenchmark( |
| async::Loop* loop, |
| std::unique_ptr<component::StartupContext> startup_context, int entry_count, |
| int value_size, int device_count, SyncParams sync_params) |
| : loop_(loop), |
| random_(0), |
| generator_(&random_), |
| startup_context_(std::move(startup_context)), |
| cloud_provider_factory_(startup_context_.get(), &random_, |
| std::move(sync_params.api_key), |
| std::move(sync_params.credentials)), |
| entry_count_(entry_count), |
| value_size_(value_size), |
| device_count_(device_count), |
| user_id_(cloud_provider_firestore::CloudProviderFactory::UserId::New()), |
| devices_(device_count) { |
| FXL_DCHECK(loop_); |
| FXL_DCHECK(entry_count_ > 0); |
| FXL_DCHECK(value_size_ > 0); |
| FXL_DCHECK(device_count_ > 1); |
| for (auto& device_context : devices_) { |
| device_context = std::make_unique<DeviceContext>(); |
| device_context->storage_directory = |
| std::make_unique<files::ScopedTempDir>(kStoragePath); |
| device_context->page_watcher = |
| std::make_unique<fidl::Binding<PageWatcher>>(this); |
| } |
| page_id_ = generator_.MakePageId(); |
| cloud_provider_factory_.Init(); |
| } |
| |
| void ConvergenceBenchmark::Run() { |
| auto waiter = fxl::MakeRefCounted<callback::StatusWaiter<Status>>(Status::OK); |
| for (auto& device_context : devices_) { |
| // Initialize ledgers in different paths to emulate separate devices, |
| // but with the same lowest-level directory name, so they correspond to the |
| // same "user". |
| std::string synced_dir_path = |
| device_context->storage_directory->path() + "/convergence_user"; |
| bool ret = files::CreateDirectory(synced_dir_path); |
| FXL_DCHECK(ret); |
| |
| cloud_provider::CloudProviderPtr cloud_provider; |
| cloud_provider_factory_.MakeCloudProvider(user_id_, |
| cloud_provider.NewRequest()); |
| |
| Status status = GetLedger(startup_context_.get(), |
| device_context->controller.NewRequest(), |
| std::move(cloud_provider), user_id_.user_id(), |
| "convergence", DetachedPath(synced_dir_path), |
| QuitLoopClosure(), &device_context->ledger); |
| if (QuitOnError(QuitLoopClosure(), status, "GetLedger")) { |
| return; |
| } |
| device_context->ledger->GetPage( |
| fidl::MakeOptional(page_id_), |
| device_context->page_connection.NewRequest(), |
| QuitOnErrorCallback(QuitLoopClosure(), "GetPage")); |
| PageSnapshotPtr snapshot; |
| // Register a watcher; we don't really need the snapshot. |
| device_context->page_connection->GetSnapshot( |
| snapshot.NewRequest(), fidl::VectorPtr<uint8_t>::New(0), |
| device_context->page_watcher->NewBinding(), waiter->NewCallback()); |
| } |
| waiter->Finalize([this](Status status) { |
| if (QuitOnError(QuitLoopClosure(), status, "GetSnapshot")) { |
| return; |
| } |
| Start(0); |
| }); |
| } |
| |
| void ConvergenceBenchmark::Start(int step) { |
| if (step == entry_count_) { |
| ShutDown(); |
| return; |
| } |
| |
| for (int device_id = 0; device_id < device_count_; device_id++) { |
| std::vector<uint8_t> key = |
| generator_.MakeKey(device_count_ * step + device_id, kKeySize); |
| // Insert each key N times, as we will receive N notifications - one for |
| // each connection, sender included. |
| for (int receiving_device = 0; receiving_device < device_count_; |
| receiving_device++) { |
| remaining_keys_.insert(convert::ToString(key)); |
| } |
| fidl::VectorPtr<uint8_t> value = generator_.MakeValue(value_size_); |
| devices_[device_id]->page_connection->Put( |
| std::move(key), std::move(value), |
| QuitOnErrorCallback(QuitLoopClosure(), "Put")); |
| } |
| |
| TRACE_ASYNC_BEGIN("benchmark", "convergence", step); |
| // Persist the current step, so that we know which dispatcher event to end in |
| // OnChange(). |
| current_step_ = step; |
| } |
| |
| void ConvergenceBenchmark::OnChange(PageChange page_change, |
| ResultState result_state, |
| OnChangeCallback callback) { |
| FXL_DCHECK(result_state == ResultState::COMPLETED); |
| for (auto& change : page_change.changed_entries) { |
| auto find_one = remaining_keys_.find(convert::ToString(change.key)); |
| remaining_keys_.erase(find_one); |
| } |
| if (remaining_keys_.empty()) { |
| TRACE_ASYNC_END("benchmark", "convergence", current_step_); |
| Start(current_step_ + 1); |
| } |
| callback(nullptr); |
| } |
| |
| void ConvergenceBenchmark::ShutDown() { |
| for (auto& device_context : devices_) { |
| KillLedgerProcess(&device_context->controller); |
| } |
| loop_->Quit(); |
| } |
| |
| fit::closure ConvergenceBenchmark::QuitLoopClosure() { |
| return [this] { loop_->Quit(); }; |
| } |
| |
| int Main(int argc, const char** argv) { |
| fxl::CommandLine command_line = fxl::CommandLineFromArgcArgv(argc, argv); |
| async::Loop loop(&kAsyncLoopConfigAttachToThread); |
| auto startup_context = component::StartupContext::CreateFromStartupInfo(); |
| |
| std::string entry_count_str; |
| int entry_count; |
| std::string value_size_str; |
| int value_size; |
| std::string device_count_str; |
| int device_count; |
| SyncParams sync_params; |
| if (!command_line.GetOptionValue(kEntryCountFlag.ToString(), |
| &entry_count_str) || |
| !fxl::StringToNumberWithError(entry_count_str, &entry_count) || |
| entry_count <= 0 || |
| !command_line.GetOptionValue(kValueSizeFlag.ToString(), |
| &value_size_str) || |
| !fxl::StringToNumberWithError(value_size_str, &value_size) || |
| value_size <= 0 || |
| !command_line.GetOptionValue(kDeviceCountFlag.ToString(), |
| &device_count_str) || |
| !fxl::StringToNumberWithError(device_count_str, &device_count) || |
| device_count <= 0 || |
| !ParseSyncParamsFromCommandLine(command_line, startup_context.get(), |
| &sync_params)) { |
| PrintUsage(); |
| return -1; |
| } |
| |
| ConvergenceBenchmark app(&loop, std::move(startup_context), entry_count, |
| value_size, device_count, std::move(sync_params)); |
| return RunWithTracing(&loop, [&app] { app.Run(); }); |
| } |
| |
| } // namespace |
| } // namespace ledger |
| |
| int main(int argc, const char** argv) { return ledger::Main(argc, argv); } |