blob: 8a3c96ffd687fd93c379f5126acd5542476bb7f7 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <fuchsia/ledger/cloud/cpp/fidl.h>
#include <fuchsia/modular/auth/cpp/fidl.h>
#include <lib/callback/auto_cleanable.h>
#include <lib/callback/capture.h>
#include <lib/callback/waiter.h>
#include <lib/fidl/cpp/binding.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/vector.h>
#include <lib/fxl/memory/ref_ptr.h>
#include <lib/zx/time.h>
#include <trace/event.h>
#include "peridot/bin/ledger/storage/public/types.h"
#include "peridot/bin/ledger/testing/data_generator.h"
#include "peridot/bin/ledger/testing/get_page_ensure_initialized.h"
#include "peridot/bin/ledger/tests/integration/integration_test.h"
#include "peridot/lib/convert/convert.h"
namespace ledger {
namespace {
fidl::VectorPtr<uint8_t> DoubleToArray(double dbl) {
fidl::VectorPtr<uint8_t> array =
fidl::VectorPtr<uint8_t>::New(sizeof(double));
std::memcpy(array->data(), &dbl, sizeof(double));
return array;
}
::testing::AssertionResult VmoToDouble(const fuchsia::mem::BufferPtr& vmo,
double* dbl) {
*dbl = 0;
if (vmo->size != sizeof(double)) {
return ::testing::AssertionFailure()
<< "VMO has the wrong size: " << vmo->size << " instead of "
<< sizeof(double) << ".";
}
zx_status_t status = vmo->vmo.read(dbl, 0, sizeof(double));
if (status < 0) {
return ::testing::AssertionFailure() << "Unable to read the VMO.";
}
return ::testing::AssertionSuccess();
}
class RefCountedPageSnapshot
: public fxl::RefCountedThreadSafe<RefCountedPageSnapshot> {
public:
RefCountedPageSnapshot() {}
PageSnapshotPtr& operator->() { return snapshot_; }
PageSnapshotPtr& operator*() { return snapshot_; }
private:
PageSnapshotPtr snapshot_;
};
class PageWatcherImpl : public PageWatcher {
public:
PageWatcherImpl(fidl::InterfaceRequest<PageWatcher> request,
fxl::RefPtr<RefCountedPageSnapshot> base_snapshot)
: binding_(this, std::move(request)),
current_snapshot_(std::move(base_snapshot)) {}
int changes = 0;
void GetInlineOnLatestSnapshot(std::vector<uint8_t> key,
PageSnapshot::GetInlineCallback callback) {
// We need to make sure the PageSnapshotPtr used to make the |GetInline|
// call survives as long as the call is active, even if a new snapshot
// arrives in between.
(*current_snapshot_)
->GetInline(
std::move(key),
[snapshot = current_snapshot_.Clone(),
callback = std::move(callback)](
Status status, std::unique_ptr<InlinedValue> value) mutable {
callback(status, std::move(value));
});
}
private:
// PageWatcher:
void OnChange(PageChange /*page_change*/, ResultState /*result_state*/,
OnChangeCallback callback) override {
changes++;
current_snapshot_ = fxl::MakeRefCounted<RefCountedPageSnapshot>();
callback((**current_snapshot_).NewRequest());
}
fidl::Binding<PageWatcher> binding_;
fxl::RefPtr<RefCountedPageSnapshot> current_snapshot_;
FXL_DISALLOW_COPY_AND_ASSIGN(PageWatcherImpl);
};
class SyncWatcherImpl : public SyncWatcher {
public:
SyncWatcherImpl() : binding_(this) {}
auto NewBinding() { return binding_.NewBinding(); }
bool new_state = false;
SyncState download;
SyncState upload;
private:
// SyncWatcher
void SyncStateChanged(SyncState download, SyncState upload,
SyncStateChangedCallback callback) override {
this->download = download;
this->upload = upload;
new_state = true;
callback();
}
fidl::Binding<SyncWatcher> binding_;
FXL_DISALLOW_COPY_AND_ASSIGN(SyncWatcherImpl);
};
// NonAssociativeConflictResolverImpl uses a merge function which is neither
// associative nor commutative. This means that merging ((1, 2), 3) results in
// a different value than merging ((2, 3), 1), or ((2, 1), 3).
// This conflict resolver only works on numeric data. For values A and B, it
// produces the merged value (4*A+B)/3.
class NonAssociativeConflictResolverImpl : public ConflictResolver {
public:
explicit NonAssociativeConflictResolverImpl(
fidl::InterfaceRequest<ConflictResolver> request)
: binding_(this, std::move(request)) {}
~NonAssociativeConflictResolverImpl() override {}
private:
// ConflictResolver:
void Resolve(
fidl::InterfaceHandle<PageSnapshot> /*left_version*/,
fidl::InterfaceHandle<PageSnapshot> /*right_version*/,
fidl::InterfaceHandle<PageSnapshot> /*common_version*/,
fidl::InterfaceHandle<MergeResultProvider> result_provider) override {
auto merge_result_provider =
std::make_unique<MergeResultProviderPtr>(result_provider.Bind());
merge_result_provider->set_error_handler(
[](zx_status_t status) { EXPECT_EQ(ZX_OK, status); });
MergeResultProvider* merge_result_provider_ptr =
merge_result_provider->get();
merge_result_provider_ptr->GetFullDiffNew(
nullptr, [merge_result_provider = std::move(merge_result_provider)](
IterationStatus status, std::vector<DiffEntry> changes,
std::unique_ptr<Token> next_token) mutable {
ASSERT_EQ(IterationStatus::OK, status);
ASSERT_EQ(1u, changes.size());
double d1, d2;
EXPECT_TRUE(VmoToDouble(changes.at(0).left->value, &d1));
EXPECT_TRUE(VmoToDouble(changes.at(0).right->value, &d2));
double new_value = (4 * d1 + d2) / 3;
MergedValue merged_value;
merged_value.key = std::move(changes.at(0).key);
merged_value.source = ValueSource::NEW;
merged_value.new_value = BytesOrReference::New();
merged_value.new_value->set_bytes(DoubleToArray(new_value));
std::vector<MergedValue> merged_values;
merged_values.push_back(std::move(merged_value));
(*merge_result_provider)->MergeNew(std::move(merged_values));
(*merge_result_provider)->DoneNew();
});
}
fidl::Binding<ConflictResolver> binding_;
};
class TestConflictResolverFactory : public ConflictResolverFactory {
public:
explicit TestConflictResolverFactory(
fidl::InterfaceRequest<ConflictResolverFactory> request)
: binding_(this, std::move(request)) {}
private:
// ConflictResolverFactory:
void GetPolicy(PageId /*page_id*/, GetPolicyCallback callback) override {
callback(MergePolicy::CUSTOM);
}
void NewConflictResolver(
PageId page_id,
fidl::InterfaceRequest<ConflictResolver> resolver) override {
resolvers.emplace(std::piecewise_construct,
std::forward_as_tuple(convert::ToString(page_id.id)),
std::forward_as_tuple(std::move(resolver)));
}
std::map<storage::PageId, NonAssociativeConflictResolverImpl> resolvers;
fidl::Binding<ConflictResolverFactory> binding_;
};
enum class MergeType {
LAST_ONE_WINS,
NON_ASSOCIATIVE_CUSTOM,
};
class ConvergenceTest
: public BaseIntegrationTest,
public ::testing::WithParamInterface<
std::tuple<MergeType, int, const LedgerAppInstanceFactoryBuilder*>> {
public:
ConvergenceTest()
: BaseIntegrationTest(
std::get<const LedgerAppInstanceFactoryBuilder*>(GetParam())) {}
~ConvergenceTest() override{};
void SetUp() override {
BaseIntegrationTest::SetUp();
data_generator_ = std::make_unique<DataGenerator>(GetRandom());
std::tie(merge_function_type_, num_ledgers_, std::ignore) = GetParam();
ASSERT_GT(num_ledgers_, 1);
PageId page_id;
for (int i = 0; i < num_ledgers_; i++) {
auto ledger_instance = NewLedgerAppInstance();
ASSERT_TRUE(ledger_instance);
ledger_instances_.push_back(std::move(ledger_instance));
pages_.emplace_back();
LedgerPtr ledger_ptr = ledger_instances_[i]->GetTestLedger();
Status status = Status::UNKNOWN_ERROR;
auto loop_waiter = NewWaiter();
GetPageEnsureInitialized(
&ledger_ptr,
// The first ledger gets a random page id, the others use the
// same id for their pages.
i == 0 ? nullptr : fidl::MakeOptional(page_id), DelayCallback::NO,
[&] {
ADD_FAILURE() << "Page should not be disconnected.";
StopLoop();
},
callback::Capture(loop_waiter->GetCallback(), &status, &pages_[i],
&page_id));
ASSERT_TRUE(loop_waiter->RunUntilCalled());
ASSERT_EQ(Status::OK, status);
}
}
protected:
std::unique_ptr<PageWatcherImpl> WatchPageContents(PagePtr* page) {
PageWatcherPtr page_watcher;
auto page_snapshot = fxl::MakeRefCounted<RefCountedPageSnapshot>();
fidl::InterfaceRequest<PageSnapshot> page_snapshot_request =
(**page_snapshot).NewRequest();
std::unique_ptr<PageWatcherImpl> watcher =
std::make_unique<PageWatcherImpl>(page_watcher.NewRequest(),
std::move(page_snapshot));
Status status = Status::UNKNOWN_ERROR;
auto loop_waiter = NewWaiter();
(*page)->GetSnapshot(
std::move(page_snapshot_request), fidl::VectorPtr<uint8_t>::New(0),
std::move(page_watcher),
callback::Capture(loop_waiter->GetCallback(), &status));
EXPECT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_EQ(Status::OK, status);
return watcher;
}
std::unique_ptr<SyncWatcherImpl> WatchPageSyncState(PagePtr* page) {
std::unique_ptr<SyncWatcherImpl> watcher =
std::make_unique<SyncWatcherImpl>();
Status status = Status::UNKNOWN_ERROR;
auto loop_waiter = NewWaiter();
(*page)->SetSyncStateWatcher(
watcher->NewBinding(),
callback::Capture(loop_waiter->GetCallback(), &status));
EXPECT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_EQ(Status::OK, status);
return watcher;
}
// Returns true if the values for |key| on all the watchers are identical.
bool AreValuesIdentical(
const std::vector<std::unique_ptr<PageWatcherImpl>>& watchers,
std::string key) {
std::vector<std::unique_ptr<InlinedValue>> values;
for (int i = 0; i < num_ledgers_; i++) {
values.emplace_back();
Status status = Status::UNKNOWN_ERROR;
auto loop_waiter = NewWaiter();
watchers[i]->GetInlineOnLatestSnapshot(
convert::ToArray(key),
callback::Capture(loop_waiter->GetCallback(), &status, &values[i]));
EXPECT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_EQ(Status::OK, status);
}
bool values_are_identical = true;
for (int i = 1; i < num_ledgers_; i++) {
values_are_identical &= convert::ExtendedStringView(values[0]->value) ==
convert::ExtendedStringView(values[i]->value);
}
return values_are_identical;
}
int num_ledgers_;
MergeType merge_function_type_;
std::vector<std::unique_ptr<LedgerAppInstanceFactory::LedgerAppInstance>>
ledger_instances_;
std::vector<PagePtr> pages_;
std::unique_ptr<DataGenerator> data_generator_;
};
// Verify that the Ledger converges over different settings of merging functions
// and number of ledger instances.
TEST_P(ConvergenceTest, NLedgersConverge) {
std::vector<std::unique_ptr<PageWatcherImpl>> watchers;
std::vector<std::unique_ptr<SyncWatcherImpl>> sync_watchers;
std::vector<std::unique_ptr<TestConflictResolverFactory>> resolver_factories;
std::independent_bits_engine<std::default_random_engine, CHAR_BIT, uint8_t>
generator;
std::uniform_real_distribution<> distribution(1, 100);
for (int i = 0; i < num_ledgers_; i++) {
Status status = Status::UNKNOWN_ERROR;
if (merge_function_type_ == MergeType::NON_ASSOCIATIVE_CUSTOM) {
ConflictResolverFactoryPtr resolver_factory_ptr;
resolver_factories.push_back(
std::make_unique<TestConflictResolverFactory>(
resolver_factory_ptr.NewRequest()));
LedgerPtr ledger = ledger_instances_[i]->GetTestLedger();
auto loop_waiter = NewWaiter();
ledger->SetConflictResolverFactory(
std::move(resolver_factory_ptr),
callback::Capture(loop_waiter->GetCallback(), &status));
ASSERT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_EQ(Status::OK, status);
}
watchers.push_back(WatchPageContents(&pages_[i]));
sync_watchers.push_back(WatchPageSyncState(&pages_[i]));
auto loop_waiter = NewWaiter();
pages_[i]->StartTransaction(
callback::Capture(loop_waiter->GetCallback(), &status));
ASSERT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_EQ(Status::OK, status);
loop_waiter = NewWaiter();
if (merge_function_type_ == MergeType::NON_ASSOCIATIVE_CUSTOM) {
pages_[i]->Put(convert::ToArray("value"),
DoubleToArray(distribution(generator)),
callback::Capture(loop_waiter->GetCallback(), &status));
} else {
pages_[i]->Put(convert::ToArray("value"), data_generator_->MakeValue(50),
callback::Capture(loop_waiter->GetCallback(), &status));
}
ASSERT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_EQ(Status::OK, status);
}
auto commit_waiter =
fxl::MakeRefCounted<callback::StatusWaiter<Status>>(Status::OK);
Status status = Status::UNKNOWN_ERROR;
for (int i = 0; i < num_ledgers_; i++) {
pages_[i]->Commit(commit_waiter->NewCallback());
}
auto loop_waiter = NewWaiter();
commit_waiter->Finalize(
callback::Capture(loop_waiter->GetCallback(), &status));
ASSERT_TRUE(loop_waiter->RunUntilCalled());
// Function to verify if the visible Ledger state has not changed since last
// call and all values are identical.
fit::function<bool()> has_state_converged = [this, &watchers,
&sync_watchers]() {
// Counts the number of visible changes. Used to verify that the minimal
// number of changes for all Ledgers to have communicated is accounted for
// (see also below).
int num_changes = 0;
for (int i = 0; i < num_ledgers_; i++) {
num_changes += watchers[i]->changes;
}
// All ledgers should see their own change (num_ledgers_). Then, at least
// all but one should receive a change with the "final" value. There might
// be more changes seen, though.
if (num_changes < 2 * num_ledgers_ - 1) {
return false;
}
// All synchronization must be idle.
bool idle = true;
for (int i = 0; i < num_ledgers_; i++) {
if (sync_watchers[i]->download != SyncState::IDLE ||
sync_watchers[i]->upload != SyncState::IDLE ||
sync_watchers[i]->new_state) {
idle = false;
}
// With this, we make sure we can verify if the state changes during the
// next cycle. If it has changed, we are sure the convergence has not
// happened yet.
sync_watchers[i]->new_state = false;
}
return idle && AreValuesIdentical(watchers, "value");
};
bool merge_done = false;
ConflictResolutionWaitStatus wait_status =
ConflictResolutionWaitStatus::NO_CONFLICTS;
fxl::RefPtr<callback::StatusWaiter<ConflictResolutionWaitStatus>> waiter;
// In addition of verifying that the external states of the ledgers have
// converged, we also verify we are not currently performing a merge in the
// background, indicating that the convergence did not finish.
auto is_sync_and_merge_complete = [this, &has_state_converged, &merge_done,
&wait_status, &waiter] {
TRACE_DURATION("ledger", "ledger_test_is_sync_and_merge_complete");
if (has_state_converged()) {
if (merge_done &&
wait_status == ConflictResolutionWaitStatus::NO_CONFLICTS) {
return true;
}
if (!waiter) {
waiter = fxl::MakeRefCounted<
callback::StatusWaiter<ConflictResolutionWaitStatus>>(
ConflictResolutionWaitStatus::NO_CONFLICTS);
for (int i = 0; i < num_ledgers_; i++) {
pages_[i]->WaitForConflictResolution(waiter->NewCallback());
}
waiter->Finalize([&merge_done, &wait_status,
&waiter](ConflictResolutionWaitStatus status) {
merge_done = true;
wait_status = status;
waiter = nullptr;
});
}
return false;
}
merge_done = false;
if (waiter) {
waiter->Cancel();
waiter = nullptr;
}
return false;
};
// If |RunLoopUntil| returns, the condition is met, thus the ledgers have
// converged.
RunLoopUntil(std::move(is_sync_and_merge_complete));
int num_changes = 0;
for (int i = 0; i < num_ledgers_; i++) {
num_changes += watchers[i]->changes;
}
EXPECT_GE(num_changes, 2 * num_ledgers_ - 1);
// All synchronization must still be idle.
for (int i = 0; i < num_ledgers_; i++) {
EXPECT_FALSE(sync_watchers[i]->new_state);
EXPECT_EQ(SyncState::IDLE, sync_watchers[i]->download);
EXPECT_EQ(SyncState::IDLE, sync_watchers[i]->upload);
}
EXPECT_TRUE(AreValuesIdentical(watchers, "value"));
}
INSTANTIATE_TEST_CASE_P(
ManyLedgersConvergenceTest, ConvergenceTest,
::testing::Combine(
::testing::Values(MergeType::LAST_ONE_WINS,
MergeType::NON_ASSOCIATIVE_CUSTOM),
// Temporarily reduce the number of simulated Ledgers to reduce flaky
// failures on bots, see LE-624 and ZX-2613. TODO(ppi): revert back to
// (2, 6).
// ::testing::Range(2, 6),
::testing::Range(2, 3),
::testing::ValuesIn(GetLedgerAppInstanceFactoryBuilders())));
} // namespace
} // namespace ledger