blob: fa7c9ae1e77a9bbdc41178bc536dbaf0a9d0458b [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/cloud_sync/impl/page_download.h"
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <optional>
#include "src/ledger/bin/cloud_sync/impl/constants.h"
#include "src/ledger/bin/cloud_sync/impl/entry_payload_encoding.h"
#include "src/ledger/bin/cloud_sync/impl/status.h"
#include "src/ledger/bin/storage/public/constants.h"
#include "src/ledger/bin/storage/public/data_source.h"
#include "src/ledger/bin/storage/public/read_data_source.h"
#include "src/ledger/lib/callback/waiter.h"
#include "src/ledger/lib/convert/convert.h"
#include "src/ledger/lib/encoding/encoding.h"
#include "src/ledger/lib/logging/logging.h"
#include "src/ledger/lib/memory/ref_ptr.h"
#include "third_party/abseil-cpp/absl/strings/str_cat.h"
#include "third_party/abseil-cpp/absl/strings/string_view.h"
namespace cloud_sync {
namespace {
DownloadSyncState GetMergedState(DownloadSyncState commit_state, int current_get_object_calls) {
if (commit_state != DOWNLOAD_IDLE) {
return commit_state;
}
return current_get_object_calls == 0 ? DOWNLOAD_IDLE : DOWNLOAD_IN_PROGRESS;
}
// Normalizes a diff so that the cloud cannot learn anything from knowing whether it applies
// successfully or not.
//
// We fail for diffs that the cloud would know are invalid. We can learn here that some other diffs
// are invalid, eg. those that insert a key twice, but it would be to risky to reject them here: we
// do not want the cloud to be able to distinguish between a failure due to having duplicate keys
// and a failure due to not getting the expected tree after applying the diff, and it's easier to do
// this if we follow the same code/error handling path in those two cases.
//
// We rely on diffs being applied strictly: deletions are only valid if they matchh precisely the
// entry present in the tree, and the diff is rejected otherwise. Similarly, insertions are rejected
// if the key exists instead of being turned into updates.
//
// Some parts of the diff might have been shuffled before being sent. For simplicity, we completely
// ignore the order in which changes have been sent. Once we've matched and simplified insertions
// and deletions based on the entry id (which is non-secret), the diff can only apply successfully
// if all deletions delete things that are in the base version, and all insertions insert things
// that are in the target version, and there is no entry with this key in the base version or it has
// been deleted. If that's the case, it will apply successfully if we apply deletions at a given key
// before insertions at this key. We sort by key because this is expected by
// |storage::btree::ApplyChangesFromCloud|.
bool NormalizeDiff(std::vector<storage::EntryChange>* changes) {
auto compare_by_entryid = [](const storage::Entry& lhs, const storage::Entry& rhs) {
return lhs.entry_id < rhs.entry_id;
};
// To each entry id, we associate the first entry we found with this id, and the count of entries
// with this id. Inserted entries are counted as +1, deleted entries as -1.
std::map<storage::Entry, int64_t, decltype(compare_by_entryid)> entries(compare_by_entryid);
for (auto& change : *changes) {
entries[std::move(change.entry)] += change.deleted ? -1 : +1;
}
// Changes have been invalidated by the move.
changes->clear();
// Serialize the map back to a vector. We expect all counts to be 0, +1 or -1: other diffs will
// not apply successfully and can be rejected now because the count is only based on the
// non-secret entry ids.
while (!entries.empty()) {
auto node = entries.extract(entries.begin());
if (node.mapped() == 0) {
// Insertions and deletions cancel.
} else if (node.mapped() == -1) {
// Only one deletion remains.
changes->push_back({std::move(node.key()), true});
} else if (node.mapped() == 1) {
// Only one insertion remains.
changes->push_back({std::move(node.key()), false});
} else {
// Multiple insertions or deletions remain, the diff is invalid. Failing here is OK, because
// we only depend on information known to the cloud.
return false;
}
}
// Sort the vector by entry name, putting deletions before insertions.
auto compare_by_key_deletions_first = [](const storage::EntryChange& lhs,
const storage::EntryChange& rhs) {
// We want deleted = true before deleted = false, but false < true.
return std::forward_as_tuple(lhs.entry.key, !lhs.deleted) <
std::forward_as_tuple(rhs.entry.key, !rhs.deleted);
};
std::sort(changes->begin(), changes->end(), compare_by_key_deletions_first);
return true;
}
} // namespace
PageDownload::PageDownload(ledger::ScopedTaskRunner* task_runner, storage::PageStorage* storage,
encryption::EncryptionService* encryption_service,
cloud_provider::PageCloudPtr* page_cloud, Delegate* delegate,
std::unique_ptr<ledger::Backoff> backoff)
: task_runner_(task_runner),
storage_(storage),
encryption_service_(encryption_service),
page_cloud_(page_cloud),
delegate_(delegate),
backoff_(std::move(backoff)),
log_prefix_(absl::StrCat("Page ", convert::ToHex(storage->GetId()), " download sync: ")),
watcher_binding_(this),
weak_factory_(this) {}
PageDownload::~PageDownload() = default;
void PageDownload::StartDownload() {
SetCommitState(DOWNLOAD_BACKLOG);
// Retrieve the server-side timestamp of the last commit we received.
storage_->GetSyncMetadata(
kTimestampKey,
task_runner_->MakeScoped([this](ledger::Status status, std::string last_commit_token_id) {
// NOT_FOUND means that we haven't persisted the state yet, e.g. because
// we haven't received any remote commits yet. In this case an empty
// timestamp is the right value.
if (status != ledger::Status::OK && status != ledger::Status::INTERNAL_NOT_FOUND) {
HandleDownloadCommitError("Failed to retrieve the sync metadata.");
return;
}
if (last_commit_token_id.empty()) {
LEDGER_VLOG(1) << log_prefix_ << "starting sync for the first time, "
<< "retrieving all remote commits";
} else {
// TODO(ppi): print the timestamp out as human-readable wall time.
LEDGER_VLOG(1) << log_prefix_ << "starting sync again, "
<< "retrieving commits uploaded after: " << last_commit_token_id;
}
std::unique_ptr<cloud_provider::PositionToken> position_token;
if (!last_commit_token_id.empty()) {
position_token = std::make_unique<cloud_provider::PositionToken>();
position_token->opaque_id = convert::ToArray(last_commit_token_id);
}
(*page_cloud_)
->GetCommits(
std::move(position_token),
[this](cloud_provider::Status cloud_status,
std::unique_ptr<cloud_provider::CommitPack> commit_pack,
std::unique_ptr<cloud_provider::PositionToken> position_token) {
if (cloud_status != cloud_provider::Status::OK) {
// Fetching the remote commits failed, schedule a retry.
LEDGER_LOG(WARNING)
<< log_prefix_ << "fetching the remote commits failed due to a "
<< "connection error, status: " << fidl::ToUnderlying(cloud_status)
<< ", retrying.";
SetCommitState(DOWNLOAD_TEMPORARY_ERROR);
RetryWithBackoff([this] { StartDownload(); });
return;
}
if (!commit_pack) {
LEDGER_LOG(ERROR) << "Null commits despite status OK.";
SetCommitState(DOWNLOAD_PERMANENT_ERROR);
return;
}
backoff_->Reset();
cloud_provider::Commits commits_container;
if (!ledger::DecodeFromBuffer(commit_pack->buffer, &commits_container)) {
LEDGER_LOG(ERROR) << "Failed to decode the commits.";
SetCommitState(DOWNLOAD_PERMANENT_ERROR);
return;
}
std::vector<cloud_provider::Commit> commits =
std::move(commits_container.commits);
if (commits.empty()) {
// If there is no remote commits to add, announce that
// we're done.
LEDGER_VLOG(1) << log_prefix_ << "initial sync finished, no new remote commits";
BacklogDownloaded();
} else {
LEDGER_VLOG(1) << log_prefix_ << "retrieved " << commits.size()
<< " (possibly) new remote commits, "
<< "adding them to storage.";
// If not, fire the backlog download callback when the
// remote commits are downloaded.
const auto commit_count = commits.size();
DownloadBatch(std::move(commits), std::move(position_token),
[this, commit_count] {
LEDGER_VLOG(1) << log_prefix_ << "initial sync finished, added "
<< commit_count << " remote commits.";
BacklogDownloaded();
});
}
});
}));
}
bool PageDownload::IsPaused() {
return IsIdle() || GetMergedState(commit_state_, current_get_calls_) == DOWNLOAD_TEMPORARY_ERROR;
}
bool PageDownload::IsIdle() {
switch (GetMergedState(commit_state_, current_get_calls_)) {
case DOWNLOAD_NOT_STARTED:
case DOWNLOAD_IDLE:
case DOWNLOAD_PERMANENT_ERROR:
return true;
break;
case DOWNLOAD_BACKLOG:
case DOWNLOAD_TEMPORARY_ERROR:
case DOWNLOAD_SETTING_REMOTE_WATCHER:
case DOWNLOAD_IN_PROGRESS:
return false;
break;
}
}
void PageDownload::BacklogDownloaded() { SetRemoteWatcher(false); }
void PageDownload::SetRemoteWatcher(bool is_retry) {
LEDGER_DCHECK(commit_state_ == DOWNLOAD_BACKLOG || commit_state_ == DOWNLOAD_TEMPORARY_ERROR)
<< "Current state: " << commit_state_;
SetCommitState(DOWNLOAD_SETTING_REMOTE_WATCHER);
// Retrieve the server-side token of the last commit we received.
std::string last_commit_token_id;
storage_->GetSyncMetadata(
kTimestampKey, task_runner_->MakeScoped([this, is_retry](ledger::Status status,
std::string last_commit_token_id) {
if (status != ledger::Status::OK && status != ledger::Status::INTERNAL_NOT_FOUND) {
HandleDownloadCommitError("Failed to retrieve the sync metadata.");
return;
}
std::unique_ptr<cloud_provider::PositionToken> position_token;
if (!last_commit_token_id.empty()) {
position_token = std::make_unique<cloud_provider::PositionToken>();
position_token->opaque_id = convert::ToArray(last_commit_token_id);
}
cloud_provider::PageCloudWatcherPtr watcher;
watcher_binding_.Bind(watcher.NewRequest());
(*page_cloud_)
->SetWatcher(std::move(position_token), std::move(watcher), [this](auto status) {
// This should always succeed - any errors are reported
// through OnError().
if (status != cloud_provider::Status::OK) {
HandleDownloadCommitError("Unexpected error when setting the PageCloudWatcher.");
}
});
SetCommitState(DOWNLOAD_IDLE);
if (is_retry) {
LEDGER_LOG(INFO) << log_prefix_ << "Cloud watcher re-established";
}
}));
}
void PageDownload::OnNewCommits(cloud_provider::CommitPack commit_pack,
cloud_provider::PositionToken position_token,
OnNewCommitsCallback callback) {
cloud_provider::Commits commits_container;
if (!ledger::DecodeFromBuffer(commit_pack.buffer, &commits_container)) {
HandleDownloadCommitError("Failed to decode the commits");
return;
}
std::vector<cloud_provider::Commit> commits = std::move(commits_container.commits);
if (batch_download_) {
// If there is already a commit batch being downloaded, save the new commits
// to be downloaded when it is done.
std::move(commits.begin(), commits.end(), std::back_inserter(commits_to_download_));
position_token_ = fidl::MakeOptional(std::move(position_token));
callback();
return;
}
SetCommitState(DOWNLOAD_IN_PROGRESS);
DownloadBatch(std::move(commits), fidl::MakeOptional(std::move(position_token)),
std::move(callback));
}
void PageDownload::OnNewObject(std::vector<uint8_t> /*id*/, fuchsia::mem::Buffer /*data*/,
OnNewObjectCallback /*callback*/) {
// No known cloud provider implementations use this method.
// TODO(ppi): implement this method when we have such cloud provider
// implementations.
LEDGER_NOTIMPLEMENTED();
}
void PageDownload::OnError(cloud_provider::Status status) {
LEDGER_DCHECK(commit_state_ == DOWNLOAD_IDLE || commit_state_ == DOWNLOAD_IN_PROGRESS);
if (!IsPermanentError(status)) {
// Reset the watcher and schedule a retry.
if (watcher_binding_.is_bound()) {
watcher_binding_.Unbind();
}
SetCommitState(DOWNLOAD_TEMPORARY_ERROR);
LEDGER_LOG(WARNING) << log_prefix_
<< "Connection error in the remote commit watcher, retrying.";
RetryWithBackoff([this] { SetRemoteWatcher(true); });
return;
}
if (status == cloud_provider::Status::PARSE_ERROR) {
HandleDownloadCommitError("Received a malformed remote commit notification.");
return;
}
LEDGER_LOG(WARNING) << "Received unexpected error from PageCloudWatcher: "
<< fidl::ToUnderlying(status);
HandleDownloadCommitError("Received unexpected error from PageCloudWatcher.");
}
void PageDownload::DownloadBatch(std::vector<cloud_provider::Commit> commits,
std::unique_ptr<cloud_provider::PositionToken> position_token,
fit::closure on_done) {
LEDGER_DCHECK(!batch_download_);
batch_download_ = std::make_unique<BatchDownload>(
storage_, encryption_service_, std::move(commits), std::move(position_token),
[this, on_done = std::move(on_done)] {
if (on_done) {
on_done();
}
batch_download_.reset();
if (commits_to_download_.empty()) {
// Don't set to idle if we're in process of setting the remote
// watcher.
if (commit_state_ == DOWNLOAD_IN_PROGRESS) {
SetCommitState(DOWNLOAD_IDLE);
}
return;
}
auto commits = std::move(commits_to_download_);
commits_to_download_.clear();
DownloadBatch(std::move(commits), std::move(position_token_), nullptr);
},
[this] { HandleDownloadCommitError("Failed to persist a remote commit in storage"); });
batch_download_->Start();
}
void PageDownload::GetObject(
storage::ObjectIdentifier object_identifier,
storage::RetrievedObjectType /*retrieved_object_type*/,
fit::function<void(ledger::Status, storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
GetObject(std::move(object_identifier), std::move(callback));
}
void PageDownload::GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(ledger::Status, storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
current_get_calls_++;
UpdateDownloadState();
encryption_service_->GetObjectName(
object_identifier,
task_runner_->MakeScoped([this, object_identifier, callback = std::move(callback)](
encryption::Status status, std::string object_name) mutable {
if (status != encryption::Status::OK) {
HandleGetObjectError(std::move(object_identifier), encryption::IsPermanentError(status),
"encryption", std::move(callback));
return;
}
(*page_cloud_)
->GetObject(convert::ToArray(object_name),
[this, object_identifier = std::move(object_identifier),
callback = std::move(callback)](cloud_provider::Status status,
::fuchsia::mem::BufferPtr data) mutable {
if (status != cloud_provider::Status::OK) {
HandleGetObjectError(std::move(object_identifier),
IsPermanentError(status), "cloud provider",
std::move(callback));
return;
}
ledger::SizedVmo sized_vmo;
if (!ledger::SizedVmo::FromTransport(std::move(*data), &sized_vmo)) {
HandleGetObjectError(std::move(object_identifier), true,
"converting to SizedVmo", std::move(callback));
return;
}
DecryptObject(std::move(object_identifier),
storage::DataSource::Create(std::move(sized_vmo)),
std::move(callback));
});
}));
}
void PageDownload::DecryptObject(
storage::ObjectIdentifier object_identifier, std::unique_ptr<storage::DataSource> content,
fit::function<void(ledger::Status, storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
storage::ReadDataSource(
&managed_container_, std::move(content),
[this, object_identifier = std::move(object_identifier), callback = std::move(callback)](
ledger::Status status, std::unique_ptr<storage::DataSource::DataChunk> content) mutable {
if (status != ledger::Status::OK) {
HandleGetObjectError(std::move(object_identifier), true, "io", std::move(callback));
return;
}
encryption_service_->DecryptObject(
object_identifier, convert::ToString(content->Get()),
[this, object_identifier, callback = std::move(callback)](encryption::Status status,
std::string content) mutable {
if (status != encryption::Status::OK) {
HandleGetObjectError(object_identifier, encryption::IsPermanentError(status),
"encryption", std::move(callback));
return;
}
backoff_->Reset();
callback(ledger::Status::OK, storage::ChangeSource::CLOUD,
storage::IsObjectSynced::YES,
storage::DataSource::DataChunk::Create(std::move(content)));
current_get_calls_--;
UpdateDownloadState();
});
});
}
void PageDownload::ReadDiffEntry(
const cloud_provider::DiffEntry& change,
fit::function<void(ledger::Status, storage::EntryChange)> callback) {
storage::EntryChange result;
if (!change.has_entry_id() || change.entry_id().empty() || !change.has_operation() ||
!change.has_data()) {
callback(ledger::Status::INVALID_ARGUMENT, {});
return;
}
result.deleted = change.operation() == cloud_provider::Operation::DELETION;
encryption_service_->DecryptEntryPayload(
convert::ToString(change.data()),
ledger::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, entry_id = change.entry_id(), result = std::move(result),
callback = std::move(callback)](encryption::Status status,
std::string decrypted_entry_payload) mutable {
if (status != encryption::Status::OK) {
callback(ledger::Status::INVALID_ARGUMENT, std::move(result));
return;
}
if (!DecodeEntryPayload(std::move(entry_id), std::move(decrypted_entry_payload),
storage_->GetObjectIdentifierFactory(), &result.entry)) {
callback(ledger::Status::INVALID_ARGUMENT, std::move(result));
return;
}
callback(ledger::Status::OK, std::move(result));
}));
}
void PageDownload::DecodeAndParseDiff(
const cloud_provider::DiffPack& diff_pack,
fit::function<void(ledger::Status, storage::CommitId, std::vector<storage::EntryChange>)>
callback) {
cloud_provider::Diff diff;
std::optional<std::string> base_remote_commit_id;
std::vector<storage::EntryChange> changes;
if (!ledger::DecodeFromBuffer(diff_pack.buffer, &diff)) {
callback(ledger::Status::INVALID_ARGUMENT, {}, {});
return;
}
if (!diff.has_base_state()) {
callback(ledger::Status::INVALID_ARGUMENT, {}, {});
return;
}
if (diff.base_state().is_empty_page()) {
base_remote_commit_id = std::nullopt;
} else if (diff.base_state().is_at_commit()) {
base_remote_commit_id = convert::ToString(diff.base_state().at_commit());
} else {
callback(ledger::Status::INVALID_ARGUMENT, {}, {});
return;
}
if (!diff.has_changes()) {
callback(ledger::Status::INVALID_ARGUMENT, {}, {});
return;
}
auto waiter = ledger::MakeRefCounted<ledger::Waiter<ledger::Status, storage::EntryChange>>(
ledger::Status::OK);
for (const cloud_provider::DiffEntry& cloud_change : diff.changes()) {
ReadDiffEntry(cloud_change, waiter->NewCallback());
}
waiter->Finalize(task_runner_->MakeScoped(
[this, base_remote_commit_id = std::move(base_remote_commit_id),
callback = std::move(callback)](ledger::Status status,
std::vector<storage::EntryChange> changes) mutable {
if (!base_remote_commit_id) {
callback(status, convert::ToString(storage::kFirstPageCommitId), std::move(changes));
} else {
storage_->GetCommitIdFromRemoteId(
*base_remote_commit_id,
[callback = std::move(callback), changes = std::move(changes)](
ledger::Status status, storage::CommitId base_commit_id) mutable {
callback(status, std::move(base_commit_id), std::move(changes));
});
}
}));
}
void PageDownload::GetDiff(
storage::CommitId commit_id, std::vector<storage::CommitId> possible_bases,
fit::function<void(ledger::Status, storage::CommitId, std::vector<storage::EntryChange>)>
callback) {
current_get_calls_++;
UpdateDownloadState();
std::string remote_commit_id = encryption_service_->EncodeCommitId(commit_id);
std::vector<std::vector<uint8_t>> bases_as_bytes;
bases_as_bytes.reserve(possible_bases.size());
for (auto& base : possible_bases) {
bases_as_bytes.push_back(convert::ToArray(encryption_service_->EncodeCommitId(base)));
}
(*page_cloud_)
->GetDiff(
convert::ToArray(remote_commit_id), std::move(bases_as_bytes),
[this, callback = std::move(callback), commit_id = std::move(commit_id),
possible_bases = std::move(possible_bases)](
cloud_provider::Status status,
std::unique_ptr<cloud_provider::DiffPack> diff_pack) mutable {
if (status == cloud_provider::Status::NOT_SUPPORTED) {
// The cloud provider does not support diff. Ask the storage to apply
// an empty diff to the root of the same commit.
// TODO(12356): remove compatibility.
callback(ledger::Status::OK, std::move(commit_id), {});
current_get_calls_--;
UpdateDownloadState();
return;
}
if (status != cloud_provider::Status::OK) {
HandleGetDiffError(std::move(commit_id), std::move(possible_bases),
IsPermanentError(status), "cloud provider", std::move(callback));
return;
}
if (!diff_pack) {
HandleGetDiffError(std::move(commit_id), std::move(possible_bases),
/*is_permanent*/ true, "missing diff", std::move(callback));
return;
}
DecodeAndParseDiff(
*diff_pack,
ledger::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, commit_id, possible_bases = std::move(possible_bases),
callback = std::move(callback)](
ledger::Status status, storage::CommitId base_commit,
std::vector<storage::EntryChange> changes) mutable {
if (status != ledger::Status::OK) {
HandleGetDiffError(std::move(commit_id), std::move(possible_bases),
/*is_permanent*/ true, "invalid diff during decoding",
std::move(callback));
return;
}
if (!NormalizeDiff(&changes)) {
HandleGetDiffError(std::move(commit_id), std::move(possible_bases),
/*is_permanent*/ true,
"invalid diff during normalization",
std::move(callback));
return;
}
callback(ledger::Status::OK, std::move(base_commit), std::move(changes));
current_get_calls_--;
UpdateDownloadState();
}));
});
}
void PageDownload::HandleGetObjectError(
storage::ObjectIdentifier object_identifier, bool is_permanent, absl::string_view error_name,
fit::function<void(ledger::Status, storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
if (is_permanent) {
backoff_->Reset();
LEDGER_LOG(WARNING) << log_prefix_ << "GetObject() failed due to a permanent " << error_name
<< " error.";
callback(ledger::Status::IO_ERROR, storage::ChangeSource::CLOUD, storage::IsObjectSynced::YES,
nullptr);
current_get_calls_--;
UpdateDownloadState();
return;
}
LEDGER_LOG(WARNING) << log_prefix_ << "GetObject() failed due to a " << error_name
<< " error, retrying.";
current_get_calls_--;
UpdateDownloadState();
RetryWithBackoff([this, object_identifier = std::move(object_identifier),
callback = std::move(callback)]() mutable {
GetObject(object_identifier, std::move(callback));
});
}
void PageDownload::HandleGetDiffError(
storage::CommitId commit_id, std::vector<storage::CommitId> possible_bases, bool is_permanent,
absl::string_view error_name,
fit::function<void(ledger::Status, storage::CommitId, std::vector<storage::EntryChange>)>
callback) {
if (is_permanent) {
backoff_->Reset();
LEDGER_LOG(WARNING) << log_prefix_ << "GetDiff() failed due to a permanent " << error_name
<< " error.";
callback(ledger::Status::IO_ERROR, "", {});
current_get_calls_--;
UpdateDownloadState();
return;
}
LEDGER_LOG(WARNING) << log_prefix_ << "GetDiff() failed due to a " << error_name
<< " error, retrying.";
current_get_calls_--;
UpdateDownloadState();
RetryWithBackoff([this, commit_id = std::move(commit_id),
possible_bases = std::move(possible_bases),
callback = std::move(callback)]() mutable {
GetDiff(std::move(commit_id), std::move(possible_bases), std::move(callback));
});
}
void PageDownload::HandleDownloadCommitError(absl::string_view error_description) {
LEDGER_LOG(ERROR) << log_prefix_ << error_description << " Stopping sync.";
if (watcher_binding_.is_bound()) {
watcher_binding_.Unbind();
}
SetCommitState(DOWNLOAD_PERMANENT_ERROR);
}
void PageDownload::SetCommitState(DownloadSyncState new_state) {
if (new_state == commit_state_) {
return;
}
commit_state_ = new_state;
UpdateDownloadState();
}
void PageDownload::UpdateDownloadState() {
DownloadSyncState new_state = GetMergedState(commit_state_, current_get_calls_);
// Notify only if the externally visible state changed.
if (new_state != merged_state_) {
merged_state_ = new_state;
delegate_->SetDownloadState(GetMergedState(commit_state_, current_get_calls_));
}
}
void PageDownload::RetryWithBackoff(fit::closure callable) {
task_runner_->PostDelayedTask(
[this, callable = std::move(callable)]() {
if (this->commit_state_ != DOWNLOAD_PERMANENT_ERROR) {
callable();
}
},
backoff_->GetNext());
}
} // namespace cloud_sync