blob: 55ba360e60d99da68238b167bc1e0ee1dfdd77c1 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/cloud_sync/impl/page_sync_impl.h"
#include <memory>
#include <utility>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <lib/async/cpp/task.h>
#include <lib/backoff/testing/test_backoff.h>
#include <lib/callback/capture.h>
#include <lib/callback/set_when_called.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fxl/macros.h>
#include <lib/gtest/test_loop_fixture.h>
#include "peridot/bin/ledger/cloud_sync/impl/constants.h"
#include "peridot/bin/ledger/cloud_sync/impl/testing/test_page_cloud.h"
#include "peridot/bin/ledger/cloud_sync/impl/testing/test_page_storage.h"
#include "peridot/bin/ledger/cloud_sync/public/sync_state_watcher.h"
#include "peridot/bin/ledger/encryption/fake/fake_encryption_service.h"
#include "peridot/bin/ledger/storage/public/page_storage.h"
#include "peridot/bin/ledger/storage/testing/commit_empty_impl.h"
#include "peridot/bin/ledger/storage/testing/page_storage_empty_impl.h"
namespace cloud_sync {
namespace {
using testing::ElementsAre;
// Creates a dummy continuation token.
cloud_provider::Token MakeToken(convert::ExtendedStringView token_id) {
cloud_provider::Token token;
token.opaque_id = convert::ToArray(token_id);
return token;
}
class TestSyncStateWatcher : public SyncStateWatcher {
public:
TestSyncStateWatcher() {}
~TestSyncStateWatcher() override{};
void Notify(SyncStateContainer sync_state) override {
if (!states.empty() && sync_state == *states.rbegin()) {
return;
}
states.push_back(sync_state);
}
std::vector<SyncStateContainer> states;
};
class PageSyncImplTest : public gtest::TestLoopFixture {
public:
PageSyncImplTest()
: storage_(dispatcher()),
encryption_service_(dispatcher()),
page_cloud_(page_cloud_ptr_.NewRequest()) {
std::unique_ptr<TestSyncStateWatcher> watcher =
std::make_unique<TestSyncStateWatcher>();
state_watcher_ = watcher.get();
auto download_backoff =
std::make_unique<backoff::TestBackoff>(zx::msec(50));
download_backoff_ptr_ = download_backoff.get();
auto upload_backoff = std::make_unique<backoff::TestBackoff>(zx::msec(50));
upload_backoff_ptr_ = upload_backoff.get();
page_sync_ = std::make_unique<PageSyncImpl>(
dispatcher(), &storage_, &storage_, &encryption_service_,
std::move(page_cloud_ptr_), std::move(download_backoff),
std::move(upload_backoff), std::move(watcher));
}
~PageSyncImplTest() override {}
protected:
enum class UploadStatus {
ENABLED,
DISABLED,
};
void StartPageSync(UploadStatus status = UploadStatus::ENABLED) {
if (status == UploadStatus::ENABLED) {
page_sync_->EnableUpload();
}
page_sync_->Start();
}
TestPageStorage storage_;
encryption::FakeEncryptionService encryption_service_;
cloud_provider::PageCloudPtr page_cloud_ptr_;
TestPageCloud page_cloud_;
backoff::TestBackoff* download_backoff_ptr_;
backoff::TestBackoff* upload_backoff_ptr_;
TestSyncStateWatcher* state_watcher_;
std::unique_ptr<PageSyncImpl> page_sync_;
private:
FXL_DISALLOW_COPY_AND_ASSIGN(PageSyncImplTest);
};
SyncStateWatcher::SyncStateContainer MakeStates(DownloadSyncState download,
UploadSyncState upload) {
return {download, upload};
}
// Verifies that the backlog of commits to upload returned from
// GetUnsyncedCommits() is uploaded to PageCloudHandler.
TEST_F(PageSyncImplTest, UploadBacklog) {
storage_.NewCommit("id1", "content1");
storage_.NewCommit("id2", "content2");
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
StartPageSync();
RunLoopUntilIdle();
ASSERT_TRUE(called);
ASSERT_EQ(2u, page_cloud_.received_commits.size());
EXPECT_EQ("id1", page_cloud_.received_commits[0].id);
EXPECT_EQ("content1", encryption_service_.DecryptCommitSynchronous(
page_cloud_.received_commits[0].data));
EXPECT_EQ("id2", page_cloud_.received_commits[1].id);
EXPECT_EQ("content2", encryption_service_.DecryptCommitSynchronous(
page_cloud_.received_commits[1].data));
EXPECT_EQ(2u, storage_.commits_marked_as_synced.size());
EXPECT_EQ(1u, storage_.commits_marked_as_synced.count("id1"));
EXPECT_EQ(1u, storage_.commits_marked_as_synced.count("id2"));
EXPECT_THAT(
state_watcher_->states,
ElementsAre(MakeStates(DOWNLOAD_BACKLOG, UPLOAD_NOT_STARTED),
MakeStates(DOWNLOAD_BACKLOG, UPLOAD_WAIT_REMOTE_DOWNLOAD),
MakeStates(DOWNLOAD_SETTING_REMOTE_WATCHER,
UPLOAD_WAIT_REMOTE_DOWNLOAD),
MakeStates(DOWNLOAD_IDLE, UPLOAD_WAIT_REMOTE_DOWNLOAD),
MakeStates(DOWNLOAD_IDLE, UPLOAD_PENDING),
MakeStates(DOWNLOAD_IDLE, UPLOAD_IN_PROGRESS),
MakeStates(DOWNLOAD_IDLE, UPLOAD_IDLE)));
}
// Verifies that the backlog of commits to upload returned from
// GetUnsyncedCommits() is uploaded to PageCloudHandler.
TEST_F(PageSyncImplTest, PageWatcher) {
TestSyncStateWatcher watcher;
storage_.NewCommit("id1", "content1");
storage_.NewCommit("id2", "content2");
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
page_sync_->SetSyncWatcher(&watcher);
StartPageSync();
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_THAT(
watcher.states,
ElementsAre(MakeStates(DOWNLOAD_NOT_STARTED, UPLOAD_NOT_STARTED),
MakeStates(DOWNLOAD_BACKLOG, UPLOAD_NOT_STARTED),
MakeStates(DOWNLOAD_BACKLOG, UPLOAD_WAIT_REMOTE_DOWNLOAD),
MakeStates(DOWNLOAD_SETTING_REMOTE_WATCHER,
UPLOAD_WAIT_REMOTE_DOWNLOAD),
MakeStates(DOWNLOAD_IDLE, UPLOAD_WAIT_REMOTE_DOWNLOAD),
MakeStates(DOWNLOAD_IDLE, UPLOAD_PENDING),
MakeStates(DOWNLOAD_IDLE, UPLOAD_IN_PROGRESS),
MakeStates(DOWNLOAD_IDLE, UPLOAD_IDLE)));
}
// Verifies that sync pauses uploading commits when it is downloading a commit.
TEST_F(PageSyncImplTest, NoUploadWhenDownloading) {
storage_.should_delay_add_commit_confirmation = true;
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
StartPageSync();
RunLoopUntilIdle();
ASSERT_TRUE(called);
ASSERT_TRUE(page_cloud_.set_watcher.is_bound());
auto commit_pack =
MakeTestCommitPack(&encryption_service_, {{"id1", "content1"}});
ASSERT_TRUE(commit_pack);
page_cloud_.set_watcher->OnNewCommits(std::move(*commit_pack),
MakeToken("44"), [] {});
RunLoopUntilIdle();
EXPECT_LT(0u, storage_.add_commits_from_sync_calls);
storage_.watcher_->OnNewCommits(
storage_.NewCommit("id2", "content2")->AsList(),
storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
EXPECT_FALSE(storage_.delayed_add_commit_confirmations.empty());
EXPECT_TRUE(page_cloud_.received_commits.empty());
storage_.delayed_add_commit_confirmations.front()();
RunLoopUntilIdle();
EXPECT_FALSE(page_cloud_.received_commits.empty());
}
TEST_F(PageSyncImplTest, UploadExistingCommitsOnlyAfterBacklogDownload) {
// Verify that two local commits are not uploaded when download is in
// progress.
storage_.NewCommit("local1", "content1");
storage_.NewCommit("local2", "content2");
page_cloud_.commits_to_return.push_back(
MakeTestCommit(&encryption_service_, "remote3", "content3"));
page_cloud_.commits_to_return.push_back(
MakeTestCommit(&encryption_service_, "remote4", "content4"));
page_cloud_.position_token_to_return = fidl::MakeOptional(MakeToken("43"));
bool backlog_downloaded_called = false;
page_sync_->SetOnBacklogDownloaded([this, &backlog_downloaded_called] {
EXPECT_EQ(0u, page_cloud_.received_commits.size());
EXPECT_EQ(0u, storage_.commits_marked_as_synced.size());
backlog_downloaded_called = true;
});
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
StartPageSync();
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(backlog_downloaded_called);
ASSERT_EQ(2u, page_cloud_.received_commits.size());
EXPECT_EQ("local1", page_cloud_.received_commits[0].id);
EXPECT_EQ("content1", encryption_service_.DecryptCommitSynchronous(
page_cloud_.received_commits[0].data));
EXPECT_EQ("local2", page_cloud_.received_commits[1].id);
EXPECT_EQ("content2", encryption_service_.DecryptCommitSynchronous(
page_cloud_.received_commits[1].data));
ASSERT_EQ(2u, storage_.commits_marked_as_synced.size());
EXPECT_EQ(1u, storage_.commits_marked_as_synced.count("local1"));
EXPECT_EQ(1u, storage_.commits_marked_as_synced.count("local2"));
}
// Verifies that existing commits are uploaded before the new ones.
TEST_F(PageSyncImplTest, UploadExistingAndNewCommits) {
storage_.NewCommit("id1", "content1");
page_sync_->SetOnBacklogDownloaded([this] {
async::PostTask(dispatcher(), [this] {
auto commit = storage_.NewCommit("id2", "content2");
storage_.new_commits_to_return["id2"] = commit->Clone();
storage_.watcher_->OnNewCommits(commit->AsList(),
storage::ChangeSource::LOCAL);
});
});
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
StartPageSync();
RunLoopUntilIdle();
ASSERT_TRUE(called);
ASSERT_EQ(2u, page_cloud_.received_commits.size());
EXPECT_EQ("id1", page_cloud_.received_commits[0].id);
EXPECT_EQ("content1", encryption_service_.DecryptCommitSynchronous(
page_cloud_.received_commits[0].data));
EXPECT_EQ("id2", page_cloud_.received_commits[1].id);
EXPECT_EQ("content2", encryption_service_.DecryptCommitSynchronous(
page_cloud_.received_commits[1].data));
EXPECT_EQ(2u, storage_.commits_marked_as_synced.size());
EXPECT_EQ(1u, storage_.commits_marked_as_synced.count("id1"));
EXPECT_EQ(1u, storage_.commits_marked_as_synced.count("id2"));
}
// Verifies that the on idle callback is called when there is no pending upload
// tasks.
TEST_F(PageSyncImplTest, UploadIdleCallback) {
int on_idle_calls = 0;
storage_.NewCommit("id1", "content1");
storage_.NewCommit("id2", "content2");
page_sync_->SetOnIdle([&on_idle_calls] { on_idle_calls++; });
StartPageSync();
// Verify that the idle callback is called once both commits are uploaded.
RunLoopUntilIdle();
EXPECT_EQ(2u, page_cloud_.received_commits.size());
EXPECT_EQ(1, on_idle_calls);
EXPECT_TRUE(page_sync_->IsIdle());
// Notify about a new commit to upload and verify that the idle callback was
// called again on completion.
auto commit3 = storage_.NewCommit("id3", "content3");
storage_.new_commits_to_return["id3"] = commit3->Clone();
storage_.watcher_->OnNewCommits(commit3->AsList(),
storage::ChangeSource::LOCAL);
EXPECT_FALSE(page_sync_->IsIdle());
RunLoopUntilIdle();
EXPECT_EQ(3u, page_cloud_.received_commits.size());
EXPECT_EQ(2, on_idle_calls);
EXPECT_TRUE(page_sync_->IsIdle());
}
// Verifies that a failure to persist the remote commit stops syncing remote
// commits and calls the error callback.
TEST_F(PageSyncImplTest, FailToStoreRemoteCommit) {
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
int error_callback_calls = 0;
page_sync_->SetOnUnrecoverableError(
[&error_callback_calls] { error_callback_calls++; });
StartPageSync();
RunLoopUntilIdle();
ASSERT_TRUE(called);
ASSERT_TRUE(page_cloud_.set_watcher.is_bound());
auto commit_pack =
MakeTestCommitPack(&encryption_service_, {{"id1", "content1"}});
ASSERT_TRUE(commit_pack);
storage_.should_fail_add_commit_from_sync = true;
EXPECT_EQ(0, error_callback_calls);
page_cloud_.set_watcher->OnNewCommits(std::move(*commit_pack),
MakeToken("42"), [] {});
RunLoopUntilIdle();
EXPECT_FALSE(page_cloud_.set_watcher.is_bound());
EXPECT_EQ(1, error_callback_calls);
}
// Verifies that the on idle callback is called when there is no download in
// progress.
TEST_F(PageSyncImplTest, DownloadIdleCallback) {
page_cloud_.commits_to_return.push_back(
MakeTestCommit(&encryption_service_, "id1", "content1"));
page_cloud_.commits_to_return.push_back(
MakeTestCommit(&encryption_service_, "id2", "content2"));
page_cloud_.position_token_to_return = fidl::MakeOptional(MakeToken("43"));
int on_idle_calls = 0;
page_sync_->SetOnIdle([&on_idle_calls] { on_idle_calls++; });
StartPageSync();
EXPECT_EQ(0, on_idle_calls);
EXPECT_FALSE(page_sync_->IsIdle());
// Run the message loop and verify that the sync is idle after all remote
// commits are added to storage.
RunLoopUntilIdle();
EXPECT_EQ(1, on_idle_calls);
EXPECT_TRUE(page_sync_->IsIdle());
EXPECT_EQ(2u, storage_.received_commits.size());
// Notify about a new commit to download and verify that the idle callback was
// called again on completion.
auto commit_pack =
MakeTestCommitPack(&encryption_service_, {{"id3", "content3"}});
ASSERT_TRUE(commit_pack);
page_cloud_.set_watcher->OnNewCommits(std::move(*commit_pack),
MakeToken("44"), [] {});
RunLoopUntilIdle();
EXPECT_EQ(3u, storage_.received_commits.size());
EXPECT_EQ(2, on_idle_calls);
EXPECT_TRUE(page_sync_->IsIdle());
}
// Verifies that uploads are paused until EnableUpload is called.
TEST_F(PageSyncImplTest, UploadIsPaused) {
storage_.NewCommit("id1", "content1");
storage_.NewCommit("id2", "content2");
bool called;
page_sync_->SetOnIdle(callback::SetWhenCalled(&called));
StartPageSync(UploadStatus::DISABLED);
RunLoopUntilIdle();
ASSERT_TRUE(called);
ASSERT_EQ(0u, page_cloud_.received_commits.size());
page_sync_->EnableUpload();
RunLoopUntilIdle();
ASSERT_EQ(2u, page_cloud_.received_commits.size());
}
// Merge commits are deterministic, so can already be in the cloud when we try
// to upload it. The upload will then fail. However, we should stop retrying to
// upload the commit once we received a notification for it through the cloud
// sync watcher.
TEST_F(PageSyncImplTest, UploadCommitAlreadyInCloud) {
// Complete the initial sync.
StartPageSync();
RunLoopUntilIdle();
EXPECT_EQ(1u, page_cloud_.get_commits_calls);
// Create a local commit, but make the upload fail.
page_cloud_.commit_status_to_return = cloud_provider::Status::SERVER_ERROR;
auto commit1 = storage_.NewCommit("id1", "content1");
storage_.new_commits_to_return["id1"] = commit1->Clone();
storage_.watcher_->OnNewCommits(commit1->AsList(),
storage::ChangeSource::LOCAL);
// We need to wait for the callback to be executed on the PageSync side.
RunLoopUntilIdle();
EXPECT_EQ(1u, page_cloud_.add_commits_calls);
EXPECT_EQ(1, upload_backoff_ptr_->get_next_count);
// Verify that the commit is still not marked as synced in storage.
EXPECT_TRUE(storage_.commits_marked_as_synced.empty());
// Let's receive the same commit from the remote side.
auto commit_pack =
MakeTestCommitPack(&encryption_service_, {{"id1", "content1"}});
ASSERT_TRUE(commit_pack);
page_cloud_.set_watcher->OnNewCommits(std::move(*commit_pack),
MakeToken("44"), [] {});
RunLoopUntilIdle();
EXPECT_TRUE(page_sync_->IsIdle());
// No additional calls.
EXPECT_EQ(1u, page_cloud_.add_commits_calls);
EXPECT_TRUE(page_sync_->IsIdle());
}
TEST_F(PageSyncImplTest, UnrecoverableError) {
int on_error_calls = 0;
page_sync_->SetOnUnrecoverableError([&on_error_calls] { on_error_calls++; });
// Complete the initial sync.
StartPageSync();
RunLoopUntilIdle();
EXPECT_EQ(0, on_error_calls);
page_cloud_.Unbind();
RunLoopUntilIdle();
EXPECT_EQ(1, on_error_calls);
}
} // namespace
} // namespace cloud_sync