blob: 27985056eeaeba4e31f1301a2c97e4eb05da3ef7 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <fuchsia/ledger/cloud/cpp/fidl.h>
#include <lib/fidl/cpp/binding.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/trace/event.h>
#include <lib/zx/time.h>
#include "fuchsia/ledger/cpp/fidl.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/bin/testing/data_generator.h"
#include "src/ledger/bin/testing/get_page_ensure_initialized.h"
#include "src/ledger/bin/testing/ledger_app_instance_factory.h"
#include "src/ledger/bin/tests/integration/integration_test.h"
#include "src/ledger/lib/callback/capture.h"
#include "src/ledger/lib/callback/waiter.h"
#include "src/ledger/lib/convert/convert.h"
#include "src/ledger/lib/memory/ref_counted.h"
#include "src/ledger/lib/memory/ref_ptr.h"
#include "src/ledger/lib/vmo/vector.h"
namespace ledger {
namespace {
std::vector<uint8_t> DoubleToArray(double dbl) {
std::vector<uint8_t> array(sizeof(double));
std::memcpy(array.data(), &dbl, sizeof(double));
return array;
}
::testing::AssertionResult VmoToDouble(const fuchsia::mem::BufferPtr& vmo, double* dbl) {
*dbl = 0;
if (vmo->size != sizeof(double)) {
return ::testing::AssertionFailure()
<< "VMO has the wrong size: " << vmo->size << " instead of " << sizeof(double) << ".";
}
zx_status_t status = vmo->vmo.read(dbl, 0, sizeof(double));
if (status < 0) {
return ::testing::AssertionFailure() << "Unable to read the VMO.";
}
return ::testing::AssertionSuccess();
}
class RefCountedPageSnapshot : public RefCountedThreadSafe<RefCountedPageSnapshot> {
public:
RefCountedPageSnapshot() = default;
PageSnapshotPtr& operator->() { return snapshot_; }
PageSnapshotPtr& operator*() { return snapshot_; }
private:
PageSnapshotPtr snapshot_;
};
class PageWatcherImpl : public PageWatcher {
public:
PageWatcherImpl(fidl::InterfaceRequest<PageWatcher> request,
RefPtr<RefCountedPageSnapshot> base_snapshot)
: binding_(this, std::move(request)), current_snapshot_(std::move(base_snapshot)) {}
PageWatcherImpl(const PageWatcherImpl&) = delete;
PageWatcherImpl& operator=(const PageWatcherImpl&) = delete;
int changes = 0;
void GetInlineOnLatestSnapshot(std::vector<uint8_t> key,
PageSnapshot::GetInlineCallback callback) {
// We need to make sure the PageSnapshotPtr used to make the |GetInline|
// call survives as long as the call is active, even if a new snapshot
// arrives in between.
(*current_snapshot_)
->GetInline(std::move(key),
[snapshot = current_snapshot_.Clone(), callback = std::move(callback)](
fuchsia::ledger::PageSnapshot_GetInline_Result result) mutable {
callback(std::move(result));
});
}
private:
// PageWatcher:
void OnChange(PageChange /*page_change*/, ResultState /*result_state*/,
OnChangeCallback callback) override {
changes++;
current_snapshot_ = MakeRefCounted<RefCountedPageSnapshot>();
callback((**current_snapshot_).NewRequest());
}
fidl::Binding<PageWatcher> binding_;
RefPtr<RefCountedPageSnapshot> current_snapshot_;
};
class SyncWatcherImpl : public SyncWatcher {
public:
SyncWatcherImpl() : binding_(this) {}
SyncWatcherImpl(const SyncWatcherImpl&) = delete;
SyncWatcherImpl& operator=(const SyncWatcherImpl&) = delete;
auto NewBinding() { return binding_.NewBinding(); }
bool new_state = false;
SyncState download;
SyncState upload;
private:
// SyncWatcher
void SyncStateChanged(SyncState download, SyncState upload,
SyncStateChangedCallback callback) override {
this->download = download;
this->upload = upload;
new_state = true;
callback();
}
fidl::Binding<SyncWatcher> binding_;
};
// NonAssociativeConflictResolverImpl uses a merge function which is neither
// associative nor commutative. This means that merging ((1, 2), 3) results in
// a different value than merging ((2, 3), 1), or ((2, 1), 3).
// This conflict resolver only works on numeric data. For values A and B, it
// produces the merged value (4*A+B)/3.
class NonAssociativeConflictResolverImpl : public ConflictResolver {
public:
explicit NonAssociativeConflictResolverImpl(fidl::InterfaceRequest<ConflictResolver> request)
: binding_(this, std::move(request)) {}
~NonAssociativeConflictResolverImpl() override = default;
private:
// ConflictResolver:
void Resolve(fidl::InterfaceHandle<PageSnapshot> /*left_version*/,
fidl::InterfaceHandle<PageSnapshot> /*right_version*/,
fidl::InterfaceHandle<PageSnapshot> /*common_version*/,
fidl::InterfaceHandle<MergeResultProvider> result_provider) override {
auto merge_result_provider = std::make_unique<MergeResultProviderPtr>(result_provider.Bind());
merge_result_provider->set_error_handler([](zx_status_t status) { EXPECT_EQ(status, ZX_OK); });
MergeResultProvider* merge_result_provider_ptr = merge_result_provider->get();
merge_result_provider_ptr->GetFullDiff(
nullptr, [merge_result_provider = std::move(merge_result_provider)](
std::vector<DiffEntry> changes, std::unique_ptr<Token> next_token) mutable {
ASSERT_FALSE(next_token);
ASSERT_EQ(changes.size(), 1u);
double d1, d2;
EXPECT_TRUE(VmoToDouble(changes.at(0).left->value, &d1));
EXPECT_TRUE(VmoToDouble(changes.at(0).right->value, &d2));
double new_value = (4 * d1 + d2) / 3;
MergedValue merged_value;
merged_value.key = std::move(changes.at(0).key);
merged_value.source = ValueSource::NEW;
merged_value.new_value = BytesOrReference::New();
merged_value.new_value->set_bytes(DoubleToArray(new_value));
std::vector<MergedValue> merged_values;
merged_values.push_back(std::move(merged_value));
(*merge_result_provider)->Merge(std::move(merged_values));
(*merge_result_provider)->Done();
});
}
fidl::Binding<ConflictResolver> binding_;
};
class TestConflictResolverFactory : public ConflictResolverFactory {
public:
explicit TestConflictResolverFactory(fidl::InterfaceRequest<ConflictResolverFactory> request)
: binding_(this, std::move(request)) {}
private:
// ConflictResolverFactory:
void GetPolicy(PageId /*page_id*/, GetPolicyCallback callback) override {
callback(MergePolicy::CUSTOM);
}
void NewConflictResolver(PageId page_id,
fidl::InterfaceRequest<ConflictResolver> resolver) override {
resolvers.try_emplace(convert::ToString(page_id.id), std::move(resolver));
}
std::map<storage::PageId, NonAssociativeConflictResolverImpl> resolvers;
fidl::Binding<ConflictResolverFactory> binding_;
};
enum class MergeType {
LAST_ONE_WINS,
NON_ASSOCIATIVE_CUSTOM,
};
class ConvergenceTest : public BaseIntegrationTest,
public ::testing::WithParamInterface<
std::tuple<MergeType, int, const LedgerAppInstanceFactoryBuilder*>> {
public:
ConvergenceTest()
: BaseIntegrationTest(std::get<const LedgerAppInstanceFactoryBuilder*>(GetParam())) {}
~ConvergenceTest() override = default;
void SetUp() override {
BaseIntegrationTest::SetUp();
data_generator_ = std::make_unique<DataGenerator>(GetRandom());
std::tie(merge_function_type_, num_ledgers_, std::ignore) = GetParam();
ASSERT_GT(num_ledgers_, 1);
PageId page_id;
for (int i = 0; i < num_ledgers_; i++) {
auto ledger_instance = NewLedgerAppInstance();
ASSERT_TRUE(ledger_instance);
ledger_instances_.push_back(std::move(ledger_instance));
pages_.emplace_back();
LedgerPtr ledger_ptr = ledger_instances_[i]->GetTestLedger();
Status status;
auto loop_waiter = NewWaiter();
GetPageEnsureInitialized(
&ledger_ptr,
// The first ledger gets a random page id, the others use the
// same id for their pages.
i == 0 ? nullptr : fidl::MakeOptional(page_id), DelayCallback::NO,
[&] {
ADD_FAILURE() << "Page should not be disconnected.";
StopLoop();
},
Capture(loop_waiter->GetCallback(), &status, &pages_[i], &page_id));
ASSERT_TRUE(loop_waiter->RunUntilCalled());
ASSERT_EQ(status, Status::OK);
}
}
protected:
std::unique_ptr<PageWatcherImpl> WatchPageContents(PagePtr* page) {
PageWatcherPtr page_watcher;
auto page_snapshot = MakeRefCounted<RefCountedPageSnapshot>();
fidl::InterfaceRequest<PageSnapshot> page_snapshot_request = (**page_snapshot).NewRequest();
std::unique_ptr<PageWatcherImpl> watcher =
std::make_unique<PageWatcherImpl>(page_watcher.NewRequest(), std::move(page_snapshot));
(*page)->GetSnapshot(std::move(page_snapshot_request), {}, std::move(page_watcher));
return watcher;
}
std::unique_ptr<SyncWatcherImpl> WatchPageSyncState(PagePtr* page) {
std::unique_ptr<SyncWatcherImpl> watcher = std::make_unique<SyncWatcherImpl>();
(*page)->SetSyncStateWatcher(watcher->NewBinding());
return watcher;
}
// Returns true if the values for |key| on all the watchers are identical.
bool AreValuesIdentical(const std::vector<std::unique_ptr<PageWatcherImpl>>& watchers,
std::string key) {
std::vector<InlinedValue> values;
for (int i = 0; i < num_ledgers_; i++) {
auto loop_waiter = NewWaiter();
fuchsia::ledger::PageSnapshot_GetInline_Result result;
watchers[i]->GetInlineOnLatestSnapshot(convert::ToArray(key),
Capture(loop_waiter->GetCallback(), &result));
EXPECT_TRUE(loop_waiter->RunUntilCalled());
EXPECT_TRUE(result.is_response());
values.emplace_back(std::move(result.response().value));
}
bool values_are_identical = true;
for (int i = 1; i < num_ledgers_; i++) {
values_are_identical &= convert::ExtendedStringView(values[0].value) ==
convert::ExtendedStringView(values[i].value);
}
return values_are_identical;
}
int num_ledgers_;
MergeType merge_function_type_;
std::vector<std::unique_ptr<LedgerAppInstanceFactory::LedgerAppInstance>> ledger_instances_;
std::vector<PagePtr> pages_;
std::unique_ptr<DataGenerator> data_generator_;
};
// Verify that the Ledger converges over different settings of merging functions
// and number of ledger instances.
TEST_P(ConvergenceTest, NLedgersConverge) {
std::vector<std::unique_ptr<PageWatcherImpl>> watchers;
std::vector<std::unique_ptr<SyncWatcherImpl>> sync_watchers;
std::vector<std::unique_ptr<TestConflictResolverFactory>> resolver_factories;
std::independent_bits_engine<std::default_random_engine, CHAR_BIT, uint8_t> generator;
std::uniform_real_distribution<> distribution(1, 100);
for (int i = 0; i < num_ledgers_; i++) {
if (merge_function_type_ == MergeType::NON_ASSOCIATIVE_CUSTOM) {
ConflictResolverFactoryPtr resolver_factory_ptr;
resolver_factories.push_back(
std::make_unique<TestConflictResolverFactory>(resolver_factory_ptr.NewRequest()));
LedgerPtr ledger = ledger_instances_[i]->GetTestLedger();
ledger->SetConflictResolverFactory(std::move(resolver_factory_ptr));
}
watchers.push_back(WatchPageContents(&pages_[i]));
sync_watchers.push_back(WatchPageSyncState(&pages_[i]));
pages_[i]->StartTransaction();
if (merge_function_type_ == MergeType::NON_ASSOCIATIVE_CUSTOM) {
pages_[i]->Put(convert::ToArray("value"), DoubleToArray(distribution(generator)));
} else {
pages_[i]->Put(convert::ToArray("value"), data_generator_->MakeValue(50));
}
}
auto sync_waiter = MakeRefCounted<CompletionWaiter>();
for (int i = 0; i < num_ledgers_; i++) {
pages_[i]->Commit();
pages_[i]->Sync(sync_waiter->NewCallback());
}
auto loop_waiter = NewWaiter();
sync_waiter->Finalize(Capture(loop_waiter->GetCallback()));
ASSERT_TRUE(loop_waiter->RunUntilCalled());
// Function to verify if the visible Ledger state has not changed since last
// call and all values are identical.
fit::function<bool()> has_state_converged = [this, &watchers, &sync_watchers]() {
// Counts the number of visible changes. Used to verify that the minimal
// number of changes for all Ledgers to have communicated is accounted for
// (see also below).
int num_changes = 0;
for (int i = 0; i < num_ledgers_; i++) {
num_changes += watchers[i]->changes;
}
// All ledgers should see their own change (num_ledgers_). Then, at least
// all but one should receive a change with the "final" value. There might
// be more changes seen, though.
if (num_changes < 2 * num_ledgers_ - 1) {
return false;
}
// All synchronization must be idle.
bool idle = true;
for (int i = 0; i < num_ledgers_; i++) {
if (sync_watchers[i]->download != SyncState::IDLE ||
sync_watchers[i]->upload != SyncState::IDLE || sync_watchers[i]->new_state) {
idle = false;
}
// With this, we make sure we can verify if the state changes during the
// next cycle. If it has changed, we are sure the convergence has not
// happened yet.
sync_watchers[i]->new_state = false;
}
return idle && AreValuesIdentical(watchers, "value");
};
bool merge_done = false;
ConflictResolutionWaitStatus wait_status = ConflictResolutionWaitStatus::NO_CONFLICTS;
RefPtr<StatusWaiter<ConflictResolutionWaitStatus>> waiter;
// In addition of verifying that the external states of the ledgers have
// converged, we also verify we are not currently performing a merge in the
// background, indicating that the convergence did not finish.
auto is_sync_and_merge_complete = [this, &has_state_converged, &merge_done, &wait_status,
&waiter] {
TRACE_DURATION("ledger", "ledger_test_is_sync_and_merge_complete");
if (has_state_converged()) {
if (merge_done && wait_status == ConflictResolutionWaitStatus::NO_CONFLICTS) {
return true;
}
if (!waiter) {
waiter = MakeRefCounted<StatusWaiter<ConflictResolutionWaitStatus>>(
ConflictResolutionWaitStatus::NO_CONFLICTS);
for (int i = 0; i < num_ledgers_; i++) {
pages_[i]->WaitForConflictResolution(waiter->NewCallback());
}
waiter->Finalize([&merge_done, &wait_status, &waiter](ConflictResolutionWaitStatus status) {
merge_done = true;
wait_status = status;
waiter = nullptr;
});
}
return false;
}
merge_done = false;
if (waiter) {
waiter->Cancel();
waiter = nullptr;
}
return false;
};
// If |RunLoopUntil| returns, the condition is met, thus the ledgers have
// converged.
EXPECT_TRUE(RunLoopUntil(std::move(is_sync_and_merge_complete)));
int num_changes = 0;
for (int i = 0; i < num_ledgers_; i++) {
num_changes += watchers[i]->changes;
}
EXPECT_GE(num_changes, 2 * num_ledgers_ - 1);
// All synchronization must still be idle.
for (int i = 0; i < num_ledgers_; i++) {
EXPECT_FALSE(sync_watchers[i]->new_state);
EXPECT_EQ(sync_watchers[i]->download, SyncState::IDLE);
EXPECT_EQ(sync_watchers[i]->upload, SyncState::IDLE);
}
EXPECT_TRUE(AreValuesIdentical(watchers, "value"));
}
INSTANTIATE_TEST_SUITE_P(
ManyLedgersConvergenceTest, ConvergenceTest,
::testing::Combine(
::testing::Values(MergeType::LAST_ONE_WINS, MergeType::NON_ASSOCIATIVE_CUSTOM),
// Temporarily reduced the number of simulated Ledgers to reduce flaky
// failures on bots, see LE-752. TODO(ppi): revert back to (2, 6).
::testing::Range(2, 3),
::testing::ValuesIn(GetLedgerAppInstanceFactoryBuilders(EnableSynchronization::SYNC_ONLY))),
[](const ::testing::TestParamInfo<ConvergenceTest::ParamType>& info) {
std::stringstream ss;
ss << (std::get<MergeType>(info.param) == MergeType::LAST_ONE_WINS ? "LastOneWins"
: "NonAssociativeCustom")
<< "With" << std::get<int>(info.param) << "Ledgers"
<< std::get<const LedgerAppInstanceFactoryBuilder*>(info.param)->TestSuffix();
return ss.str();
});
} // namespace
} // namespace ledger