blob: 98fc883a66f9728cc901695bd3c7d1dbcf6436f4 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/cloud_sync/impl/page_upload.h"
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <memory>
#include <utility>
#include <vector>
#include <gmock/gmock.h>
#include "src/ledger/bin/cloud_sync/impl/clock_pack.h"
#include "src/ledger/bin/cloud_sync/impl/constants.h"
#include "src/ledger/bin/cloud_sync/impl/testing/test_page_cloud.h"
#include "src/ledger/bin/cloud_sync/impl/testing/test_page_storage.h"
#include "src/ledger/bin/cloud_sync/public/sync_state_watcher.h"
#include "src/ledger/bin/encryption/fake/fake_encryption_service.h"
#include "src/ledger/bin/fidl/include/types.h"
#include "src/ledger/bin/storage/public/page_storage.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/bin/storage/testing/commit_empty_impl.h"
#include "src/ledger/bin/storage/testing/page_storage_empty_impl.h"
#include "src/ledger/bin/testing/test_with_environment.h"
#include "src/ledger/lib/backoff/testing/test_backoff.h"
#include "src/ledger/lib/callback/capture.h"
#include "src/ledger/lib/callback/set_when_called.h"
#include "src/ledger/lib/encoding/encoding.h"
#include "src/ledger/lib/socket/strings.h"
namespace cloud_sync {
namespace {
using ::testing::Each;
using ::testing::SizeIs;
using ::testing::Truly;
constexpr zx::duration kBackoffInterval = zx::msec(10);
constexpr zx::duration kHalfBackoffInterval = zx::msec(5);
// Creates a dummy continuation token.
cloud_provider::PositionToken MakeToken(convert::ExtendedStringView token_id) {
cloud_provider::PositionToken token;
token.opaque_id = convert::ToArray(token_id);
return token;
}
class PageUploadTest : public ledger::TestWithEnvironment, public PageUpload::Delegate {
public:
PageUploadTest()
: storage_(dispatcher()),
encryption_service_(dispatcher()),
page_cloud_(page_cloud_ptr_.NewRequest()),
task_runner_(dispatcher()) {
auto test_backoff = std::make_unique<ledger::TestBackoff>(kBackoffInterval);
backoff_ = test_backoff.get();
page_upload_ = std::make_unique<PageUpload>(environment_.coroutine_service(), &task_runner_,
&storage_, &encryption_service_, &page_cloud_ptr_,
this, std::move(test_backoff));
}
PageUploadTest(const PageUploadTest&) = delete;
PageUploadTest& operator=(const PageUploadTest&) = delete;
~PageUploadTest() override = default;
protected:
void SetOnNewStateCallback(fit::closure callback) { new_state_callback_ = std::move(callback); }
void SetUploadState(UploadSyncState sync_state) override {
states_.push_back(sync_state);
if (new_state_callback_) {
new_state_callback_();
}
}
bool IsDownloadIdle() override { return is_download_idle_; }
TestPageStorage storage_;
encryption::FakeEncryptionService encryption_service_;
cloud_provider::PageCloudPtr page_cloud_ptr_;
TestPageCloud page_cloud_;
std::vector<UploadSyncState> states_;
std::unique_ptr<PageUpload> page_upload_;
ledger::TestBackoff* backoff_;
bool is_download_idle_ = true;
private:
fit::closure new_state_callback_;
ledger::ScopedTaskRunner task_runner_;
};
// Verifies that the backlog of commits to upload returned from
// GetUnsyncedCommits() is uploaded to PageCloudHandler.
TEST_F(PageUploadTest, UploadBacklog) {
storage_.NewCommit("id1", "content1");
storage_.NewCommit("id2", "content2");
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
ASSERT_EQ(page_cloud_.received_commits.size(), 2u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id1")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content1");
EXPECT_EQ(page_cloud_.received_commits[1].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id2")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[1].data()),
"content2");
EXPECT_EQ(storage_.commits_marked_as_synced.size(), 2u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id1"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id2"), 1u);
}
// Verifies that the backlog of commits to upload is not uploaded until there's
// only one local head.
TEST_F(PageUploadTest, UploadBacklogOnlyOnSingleHead) {
// Verify that two local commits are not uploaded when there is two local
// heads.
bool upload_is_paused = false;
storage_.head_count = 2;
storage_.NewCommit("id0", "content0");
storage_.NewCommit("id1", "content1");
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
EXPECT_EQ(page_cloud_.received_commits.size(), 0u);
EXPECT_EQ(storage_.commits_marked_as_synced.size(), 0u);
// Add a new commit and reduce the number of heads to 1.
upload_is_paused = false;
storage_.head_count = 1;
auto commit = storage_.NewCommit("id2", "content2");
storage_.watcher_->OnNewCommits(commit->AsList(), storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
// Verify that all local commits were uploaded.
ASSERT_EQ(page_cloud_.received_commits.size(), 3u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id0")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content0");
EXPECT_EQ(page_cloud_.received_commits[1].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id1")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[1].data()),
"content1");
EXPECT_EQ(page_cloud_.received_commits[2].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id2")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[2].data()),
"content2");
EXPECT_EQ(storage_.commits_marked_as_synced.size(), 3u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id0"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id1"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id2"), 1u);
}
TEST_F(PageUploadTest, UploadExistingCommitsOnlyAfterBacklogDownload) {
// Verify that two local commits are not uploaded when download is in
// progress.
storage_.NewCommit("local1", "content1");
storage_.NewCommit("local2", "content2");
page_cloud_.commits_to_return.push_back(MakeTestCommit(&encryption_service_, "content3"));
page_cloud_.commits_to_return.push_back(MakeTestCommit(&encryption_service_, "content4"));
page_cloud_.position_token_to_return = fidl::MakeOptional(MakeToken("44"));
is_download_idle_ = false;
bool upload_wait_remote_download = false;
SetOnNewStateCallback([this, &upload_wait_remote_download] {
if (states_.back() == UPLOAD_WAIT_REMOTE_DOWNLOAD) {
upload_wait_remote_download = true;
}
});
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_wait_remote_download);
EXPECT_EQ(page_cloud_.received_commits.size(), 0u);
EXPECT_EQ(storage_.commits_marked_as_synced.size(), 0u);
is_download_idle_ = true;
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
ASSERT_EQ(page_cloud_.received_commits.size(), 2u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("local1")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content1");
EXPECT_EQ(page_cloud_.received_commits[1].id(),
convert::ToArray(encryption_service_.EncodeCommitId("local2")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[1].data()),
"content2");
ASSERT_EQ(storage_.commits_marked_as_synced.size(), 2u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("local1"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("local2"), 1u);
}
// Verfies that the new commits that PageSync is notified about through storage
// watcher are uploaded to PageCloudHandler, with the exception of commits that
// themselves come from sync.
TEST_F(PageUploadTest, UploadNewCommits) {
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
auto commit1 = storage_.NewCommit("id1", "content1");
storage_.watcher_->OnNewCommits(commit1->AsList(), storage::ChangeSource::LOCAL);
// The commit coming from sync should be ignored.
auto commit2 = storage_.NewCommit("id2", "content2", false);
storage_.watcher_->OnNewCommits(commit2->AsList(), storage::ChangeSource::CLOUD);
auto commit3 = storage_.NewCommit("id3", "content3");
storage_.watcher_->OnNewCommits(commit3->AsList(), storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
ASSERT_EQ(page_cloud_.received_commits.size(), 2u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id1")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content1");
EXPECT_EQ(page_cloud_.received_commits[1].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id3")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[1].data()),
"content3");
EXPECT_EQ(storage_.commits_marked_as_synced.size(), 2u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id1"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id3"), 1u);
}
// Verifies that new commits being added to storage are only uploaded while
// there is only a single head.
TEST_F(PageUploadTest, UploadNewCommitsOnlyOnSingleHead) {
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
// Add a new commit when there's only one head and verify that it is
// uploaded.
storage_.head_count = 1;
auto commit0 = storage_.NewCommit("id0", "content0");
storage_.watcher_->OnNewCommits(commit0->AsList(), storage::ChangeSource::LOCAL);
EXPECT_FALSE(page_upload_->IsPaused());
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
ASSERT_EQ(page_cloud_.received_commits.size(), 1u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id0")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content0");
EXPECT_EQ(storage_.commits_marked_as_synced.count("id0"), 1u);
// Add another commit when there's two heads and verify that it is not
// uploaded.
page_cloud_.received_commits.clear();
storage_.head_count = 2;
auto commit1 = storage_.NewCommit("id1", "content1");
storage_.watcher_->OnNewCommits(commit1->AsList(), storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
ASSERT_EQ(page_cloud_.received_commits.size(), 0u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id1"), 0u);
// Add another commit bringing the number of heads down to one and verify that
// both commits are uploaded.
storage_.head_count = 1;
auto commit2 = storage_.NewCommit("id2", "content2");
storage_.watcher_->OnNewCommits(commit2->AsList(), storage::ChangeSource::LOCAL);
EXPECT_FALSE(page_upload_->IsPaused());
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
ASSERT_EQ(page_cloud_.received_commits.size(), 2u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id1")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content1");
EXPECT_EQ(page_cloud_.received_commits[1].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id2")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[1].data()),
"content2");
EXPECT_EQ(storage_.commits_marked_as_synced.count("id1"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id2"), 1u);
}
// Verifies that existing commits are uploaded before the new ones.
TEST_F(PageUploadTest, UploadExistingAndNewCommits) {
storage_.NewCommit("id1", "content1");
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
auto commit = storage_.NewCommit("id2", "content2");
storage_.watcher_->OnNewCommits(commit->AsList(), storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
ASSERT_EQ(page_cloud_.received_commits.size(), 2u);
ASSERT_THAT(page_cloud_.received_commits, Each(Truly(CommitHasIdAndData)));
EXPECT_EQ(page_cloud_.received_commits[0].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id1")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[0].data()),
"content1");
EXPECT_EQ(page_cloud_.received_commits[1].id(),
convert::ToArray(encryption_service_.EncodeCommitId("id2")));
EXPECT_EQ(encryption_service_.DecryptCommitSynchronous(page_cloud_.received_commits[1].data()),
"content2");
EXPECT_EQ(storage_.commits_marked_as_synced.size(), 2u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id1"), 1u);
EXPECT_EQ(storage_.commits_marked_as_synced.count("id2"), 1u);
}
// Verifies that failing uploads are retried.
TEST_F(PageUploadTest, RetryUpload) {
page_upload_->StartOrRestartUpload();
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
SetOnNewStateCallback(nullptr);
page_cloud_.commit_status_to_return = cloud_provider::Status::NETWORK_ERROR;
auto commit1 = storage_.NewCommit("id1", "content1");
storage_.watcher_->OnNewCommits(commit1->AsList(), storage::ChangeSource::LOCAL);
RunLoopFor(kHalfBackoffInterval);
EXPECT_GE(backoff_->get_next_count, 0);
RunLoopFor(kBackoffInterval);
EXPECT_GE(backoff_->get_next_count, 1);
RunLoopFor(kBackoffInterval);
EXPECT_GE(backoff_->get_next_count, 2);
// Verify that the commit is still not marked as synced in storage.
EXPECT_TRUE(storage_.commits_marked_as_synced.empty());
}
// Verifies that the idle status is returned when there is no pending upload
// task.
TEST_F(PageUploadTest, UploadIdleStatus) {
int on_idle_calls = 0;
storage_.NewCommit("id1", "content1");
storage_.NewCommit("id2", "content2");
SetOnNewStateCallback([this, &on_idle_calls] {
if (states_.back() == UPLOAD_IDLE) {
on_idle_calls++;
}
});
page_upload_->StartOrRestartUpload();
// Verify that the idle callback is called once both commits are uploaded.
RunLoopUntilIdle();
EXPECT_EQ(page_cloud_.received_commits.size(), 2u);
EXPECT_EQ(on_idle_calls, 1);
EXPECT_TRUE(page_upload_->IsPaused());
// Notify about a new commit to upload and verify that the idle callback was
// called again on completion.
auto commit3 = storage_.NewCommit("id3", "content3");
storage_.watcher_->OnNewCommits(commit3->AsList(), storage::ChangeSource::LOCAL);
EXPECT_FALSE(page_upload_->IsPaused());
RunLoopUntilIdle();
EXPECT_EQ(page_cloud_.received_commits.size(), 3u);
EXPECT_EQ(on_idle_calls, 2);
EXPECT_TRUE(page_upload_->IsPaused());
}
// Verifies that if listing the original commits to be uploaded fails, the
// client is notified about the error.
TEST_F(PageUploadTest, FailToListCommits) {
EXPECT_FALSE(storage_.watcher_set);
int error_calls = 0;
storage_.should_fail_get_unsynced_commits = true;
SetOnNewStateCallback([this, &error_calls] {
if (states_.back() == UPLOAD_PERMANENT_ERROR) {
error_calls++;
}
});
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
EXPECT_EQ(error_calls, 1);
EXPECT_EQ(page_cloud_.received_commits.size(), 0u);
}
// Verifies that already synced commits are not re-uploaded.
TEST_F(PageUploadTest, DoNotUploadSyncedCommits) {
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
auto commit = std::make_unique<TestCommit>("id", "content");
storage_.watcher_->OnNewCommits(commit->AsList(), storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
// Commit is already synced.
ASSERT_EQ(page_cloud_.received_commits.size(), 0u);
}
// Verifies that commits that are received between the first upload and the
// retry are not sent.
TEST_F(PageUploadTest, DoNotUploadSyncedCommitsOnRetry) {
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] {
upload_is_paused = page_upload_->IsPaused();
if (states_.back() == UploadSyncState::UPLOAD_TEMPORARY_ERROR) {
QuitLoop();
}
});
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
page_cloud_.commit_status_to_return = cloud_provider::Status::NETWORK_ERROR;
auto commit = storage_.NewCommit("id", "content");
storage_.watcher_->OnNewCommits(commit->AsList(), storage::ChangeSource::LOCAL);
// The page upload should run into temporary error.
RunLoopUntilIdle();
EXPECT_EQ(states_.back(), UploadSyncState::UPLOAD_TEMPORARY_ERROR);
EXPECT_GT(page_cloud_.add_commits_calls, 0u);
// Configure the cloud to accept the next attempt to upload.
page_cloud_.commit_status_to_return = cloud_provider::Status::OK;
page_cloud_.add_commits_calls = 0u;
// Make storage report the commit as synced (not include it in the list of
// unsynced commits to return).
storage_.unsynced_commits_to_return.clear();
RunLoopFor(kHalfBackoffInterval);
EXPECT_EQ(states_.back(), UploadSyncState::UPLOAD_TEMPORARY_ERROR);
EXPECT_TRUE(upload_is_paused);
RunLoopFor(kBackoffInterval);
EXPECT_EQ(states_.back(), UploadSyncState::UPLOAD_IDLE);
EXPECT_TRUE(upload_is_paused);
// Verify that no calls were made to attempt to upload the commit.
EXPECT_EQ(page_cloud_.add_commits_calls, 0u);
}
// Verifies that concurrent new commit notifications do not crash PageUpload.
TEST_F(PageUploadTest, UploadNewCommitsConcurrentNoCrash) {
bool upload_is_paused = false;
SetOnNewStateCallback([this, &upload_is_paused] { upload_is_paused = page_upload_->IsPaused(); });
page_upload_->StartOrRestartUpload();
RunLoopUntilIdle();
ASSERT_TRUE(upload_is_paused);
upload_is_paused = false;
storage_.head_count = 2;
auto commit0 = storage_.NewCommit("id0", "content0");
storage_.watcher_->OnNewCommits(commit0->AsList(), storage::ChangeSource::LOCAL);
auto commit1 = storage_.NewCommit("id1", "content1");
storage_.watcher_->OnNewCommits(commit1->AsList(), storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
}
// Verifies that if upload fails when sending the object, we don't create a new batch and call
// GetUnsyncedCommits again.
TEST_F(PageUploadTest, RetryOnError) {
storage_.NewCommit("id", "content");
auto id = encryption_service_.MakeObjectIdentifier(storage_.GetObjectIdentifierFactory(),
storage::ObjectDigest("obj_digest"));
storage_.unsynced_objects_to_return[id] =
std::make_unique<storage::fake::FakePiece>(id, "obj_data");
int failed_upload_count = 30;
page_cloud_.object_status_to_return = cloud_provider::Status::NETWORK_ERROR;
// Fail the upload |failed_upload_count| times.
SetOnNewStateCallback([this, &failed_upload_count] {
if (states_.back() == UploadSyncState::UPLOAD_TEMPORARY_ERROR) {
ASSERT_GE(failed_upload_count, 0);
failed_upload_count--;
if (failed_upload_count == 0) {
page_cloud_.object_status_to_return = cloud_provider::Status::OK;
}
}
});
page_upload_->StartOrRestartUpload();
RunLoopFor(kBackoffInterval * 30);
EXPECT_EQ(failed_upload_count, 0);
// GetUnsyncedCommits should be called twice: once when creating the batch and once just before
// uploading the commits.
EXPECT_EQ(storage_.get_unsynced_commits_calls, 2u);
EXPECT_EQ(page_cloud_.add_commits_calls, 1u);
EXPECT_EQ(states_.back(), UploadSyncState::UPLOAD_IDLE);
}
// Verifies that clocks are uploaded.
TEST_F(PageUploadTest, UploadClock) {
page_upload_->StartOrRestartUpload();
storage::Clock clock{
{clocks::DeviceId{"device_0", 1}, storage::ClockTombstone{}},
{clocks::DeviceId{"device_1", 1},
storage::DeviceEntry{storage::ClockEntry{"commit1", 1}, storage::ClockEntry{"commit0", 0}}},
{clocks::DeviceId{"device_2", 4},
storage::DeviceEntry{storage::ClockEntry{"commit4", 4}, storage::ClockEntry{"commit2", 2}}}};
bool called;
ledger::Status status;
page_upload_->UpdateClock(std::move(clock),
ledger::Capture(ledger::SetWhenCalled(&called), &status));
RunLoopUntilIdle();
EXPECT_FALSE(called);
EXPECT_THAT(page_cloud_.clocks, SizeIs(1));
page_cloud_.clocks[0].second(
cloud_provider::Status::OK,
std::make_unique<cloud_provider::ClockPack>(std::move(page_cloud_.clocks[0].first)));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(status, ledger::Status::OK);
}
// Verifies that clocks uploads are buffered while waiting for the cloud response.
TEST_F(PageUploadTest, UploadClockRateLimit) {
storage_.remote_id_to_commit_id[encryption_service_.EncodeCommitId("commit0")] = "commit0";
storage_.remote_id_to_commit_id[encryption_service_.EncodeCommitId("commit1")] = "commit1";
storage_.remote_id_to_commit_id[encryption_service_.EncodeCommitId("commit5")] = "commit5";
page_upload_->StartOrRestartUpload();
storage::Clock clock{
{clocks::DeviceId{"device_1", 1},
storage::DeviceEntry{storage::ClockEntry{"commit1", 1}, storage::ClockEntry{"commit0", 0}}}};
bool called_1;
ledger::Status status_1;
page_upload_->UpdateClock(clock, ledger::Capture(ledger::SetWhenCalled(&called_1), &status_1));
RunLoopUntilIdle();
EXPECT_FALSE(called_1);
EXPECT_THAT(page_cloud_.clocks, SizeIs(1));
bool called_2;
ledger::Status status_2;
page_upload_->UpdateClock(clock, ledger::Capture(ledger::SetWhenCalled(&called_2), &status_2));
// The second call is waiting for the first one to finish.
RunLoopUntilIdle();
EXPECT_FALSE(called_2);
EXPECT_THAT(page_cloud_.clocks, SizeIs(1));
storage::Clock clock_3{
{clocks::DeviceId{"device_1", 1},
storage::DeviceEntry{storage::ClockEntry{"commit5", 5}, storage::ClockEntry{"commit0", 0}}}};
bool called_3;
ledger::Status status_3;
page_upload_->UpdateClock(clock_3, ledger::Capture(ledger::SetWhenCalled(&called_3), &status_3));
// The second call is waiting for the first one to finish.
RunLoopUntilIdle();
EXPECT_FALSE(called_3);
EXPECT_THAT(page_cloud_.clocks, SizeIs(1));
// Respond to the first request. The second request should be sent, with the third clock.
page_cloud_.clocks[0].second(
cloud_provider::Status::OK,
std::make_unique<cloud_provider::ClockPack>(std::move(page_cloud_.clocks[0].first)));
RunLoopUntilIdle();
EXPECT_TRUE(called_1);
EXPECT_EQ(status_1, ledger::Status::OK);
EXPECT_THAT(page_cloud_.clocks, SizeIs(2));
auto golden_clock = clock_3;
// Remove the cloud entry from the clocks: it is not transmitted either way.
std::get<storage::DeviceEntry>(golden_clock.begin()->second).cloud.reset();
storage::Clock actual_clock;
ledger::Status status;
EXPECT_TRUE(RunInCoroutine([&](coroutine::CoroutineHandler* handler) {
status = DecodeClock(handler, &storage_, std::move(page_cloud_.clocks[1].first), &actual_clock);
}));
EXPECT_EQ(status, ledger::Status::OK);
std::get<storage::DeviceEntry>(actual_clock.begin()->second).cloud.reset();
EXPECT_EQ(actual_clock, golden_clock);
auto return_clock_pack = EncodeClock(&encryption_service_, actual_clock);
page_cloud_.clocks[1].second(
cloud_provider::Status::OK,
std::make_unique<cloud_provider::ClockPack>(std::move(return_clock_pack)));
RunLoopUntilIdle();
EXPECT_TRUE(called_2);
EXPECT_EQ(status_2, ledger::Status::OK);
EXPECT_TRUE(called_3);
EXPECT_EQ(status_3, ledger::Status::OK);
}
// Verifies that if we receive new commits during backoff:
// - the retried upload only uploads commits that existed when it was started.
// - a second upload starts immediately after the first (retried) upload succeeds and uploads the
// new commits.
TEST_F(PageUploadTest, NewCommitsDuringBackoff) {
storage_.NewCommit("id", "content");
auto id = encryption_service_.MakeObjectIdentifier(storage_.GetObjectIdentifierFactory(),
storage::ObjectDigest("obj_digest"));
storage_.unsynced_objects_to_return[id] =
std::make_unique<storage::fake::FakePiece>(id, "obj_data");
page_cloud_.object_status_to_return = cloud_provider::Status::NETWORK_ERROR;
// Wait for the upload to fail: the upload state becomes UPLOAD_TEMPORARY_ERROR. While the upload
// is in backoff, we ensure that the retry will succeed, we create a new local commit and notify
// the page upload object.
SetOnNewStateCallback([this] {
if (states_.back() == UploadSyncState::UPLOAD_TEMPORARY_ERROR) {
page_cloud_.object_status_to_return = cloud_provider::Status::OK;
auto commit = storage_.NewCommit("id2", "content2");
storage_.watcher_->OnNewCommits(commit->AsList(), storage::ChangeSource::LOCAL);
}
});
page_upload_->StartOrRestartUpload();
RunLoopFor(kBackoffInterval);
// Two uploads have been done.
EXPECT_EQ(page_cloud_.add_commits_calls, 2u);
EXPECT_EQ(states_.back(), UploadSyncState::UPLOAD_IDLE);
ASSERT_EQ(page_cloud_.received_commits.size(), 2u);
}
} // namespace
} // namespace cloud_sync