blob: e918c9ca77b989a68d56e82b2b4048be0615f765 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/cloud_sync/impl/page_download.h"
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include "src/ledger/bin/cloud_sync/impl/constants.h"
#include "src/ledger/bin/storage/public/data_source.h"
#include "src/ledger/bin/storage/public/read_data_source.h"
#include "src/lib/fxl/strings/concatenate.h"
namespace cloud_sync {
namespace {
DownloadSyncState GetMergedState(DownloadSyncState commit_state,
int current_get_object_calls) {
if (commit_state != DOWNLOAD_IDLE) {
return commit_state;
}
return current_get_object_calls == 0 ? DOWNLOAD_IDLE : DOWNLOAD_IN_PROGRESS;
}
bool IsPermanentError(cloud_provider::Status status) {
switch (status) {
case cloud_provider::Status::OK:
case cloud_provider::Status::AUTH_ERROR:
case cloud_provider::Status::NETWORK_ERROR:
return false;
case cloud_provider::Status::ARGUMENT_ERROR:
case cloud_provider::Status::INTERNAL_ERROR:
case cloud_provider::Status::NOT_FOUND:
case cloud_provider::Status::PARSE_ERROR:
case cloud_provider::Status::SERVER_ERROR:
case cloud_provider::Status::UNKNOWN_ERROR:
return true;
}
}
} // namespace
PageDownload::PageDownload(callback::ScopedTaskRunner* task_runner,
storage::PageStorage* storage,
storage::PageSyncClient* sync_client,
encryption::EncryptionService* encryption_service,
cloud_provider::PageCloudPtr* page_cloud,
Delegate* delegate,
std::unique_ptr<backoff::Backoff> backoff)
: task_runner_(task_runner),
storage_(storage),
sync_client_(sync_client),
encryption_service_(encryption_service),
page_cloud_(page_cloud),
delegate_(delegate),
backoff_(std::move(backoff)),
log_prefix_(fxl::Concatenate(
{"Page ", convert::ToHex(storage->GetId()), " download sync: "})),
watcher_binding_(this) {}
PageDownload::~PageDownload() { sync_client_->SetSyncDelegate(nullptr); }
void PageDownload::StartDownload() {
SetCommitState(DOWNLOAD_BACKLOG);
sync_client_->SetSyncDelegate(this);
// Retrieve the server-side timestamp of the last commit we received.
storage_->GetSyncMetadata(
kTimestampKey,
task_runner_->MakeScoped([this](storage::Status status,
std::string last_commit_token_id) {
// NOT_FOUND means that we haven't persisted the state yet, e.g. because
// we haven't received any remote commits yet. In this case an empty
// timestamp is the right value.
if (status != storage::Status::OK &&
status != storage::Status::INTERNAL_NOT_FOUND) {
HandleDownloadCommitError("Failed to retrieve the sync metadata.");
return;
}
if (last_commit_token_id.empty()) {
FXL_VLOG(1) << log_prefix_ << "starting sync for the first time, "
<< "retrieving all remote commits";
} else {
// TODO(ppi): print the timestamp out as human-readable wall time.
FXL_VLOG(1) << log_prefix_ << "starting sync again, "
<< "retrieving commits uploaded after: "
<< last_commit_token_id;
}
std::unique_ptr<cloud_provider::Token> position_token;
if (!last_commit_token_id.empty()) {
position_token = std::make_unique<cloud_provider::Token>();
position_token->opaque_id = convert::ToArray(last_commit_token_id);
}
(*page_cloud_)
->GetCommits(
std::move(position_token),
[this](cloud_provider::Status cloud_status,
std::unique_ptr<cloud_provider::CommitPack> commit_pack,
std::unique_ptr<cloud_provider::Token> position_token) {
if (cloud_status != cloud_provider::Status::OK) {
// Fetching the remote commits failed, schedule a retry.
FXL_LOG(WARNING)
<< log_prefix_
<< "fetching the remote commits failed due to a "
<< "connection error, status: "
<< fidl::ToUnderlying(cloud_status) << ", retrying.";
SetCommitState(DOWNLOAD_TEMPORARY_ERROR);
RetryWithBackoff([this] { StartDownload(); });
return;
}
if (!commit_pack) {
FXL_LOG(ERROR) << "Null commits despite status OK.";
SetCommitState(DOWNLOAD_PERMANENT_ERROR);
return;
}
backoff_->Reset();
std::vector<cloud_provider::CommitPackEntry> entries;
if (!cloud_provider::DecodeCommitPack(*commit_pack,
&entries)) {
FXL_LOG(ERROR) << "Failed to decode the commits.";
SetCommitState(DOWNLOAD_PERMANENT_ERROR);
return;
}
if (entries.empty()) {
// If there is no remote commits to add, announce that
// we're done.
FXL_VLOG(1)
<< log_prefix_
<< "initial sync finished, no new remote commits";
BacklogDownloaded();
} else {
FXL_VLOG(1) << log_prefix_ << "retrieved " << entries.size()
<< " (possibly) new remote commits, "
<< "adding them to storage.";
// If not, fire the backlog download callback when the
// remote commits are downloaded.
const auto commit_count = entries.size();
DownloadBatch(std::move(entries), std::move(position_token),
[this, commit_count] {
FXL_VLOG(1)
<< log_prefix_
<< "initial sync finished, added "
<< commit_count << " remote commits.";
BacklogDownloaded();
});
}
});
}));
}
bool PageDownload::IsIdle() {
switch (GetMergedState(commit_state_, current_get_object_calls_)) {
case DOWNLOAD_NOT_STARTED:
case DOWNLOAD_IDLE:
case DOWNLOAD_PERMANENT_ERROR:
return true;
break;
case DOWNLOAD_BACKLOG:
case DOWNLOAD_TEMPORARY_ERROR:
case DOWNLOAD_SETTING_REMOTE_WATCHER:
case DOWNLOAD_IN_PROGRESS:
return false;
break;
}
}
void PageDownload::BacklogDownloaded() { SetRemoteWatcher(false); }
void PageDownload::SetRemoteWatcher(bool is_retry) {
FXL_DCHECK(commit_state_ == DOWNLOAD_BACKLOG ||
commit_state_ == DOWNLOAD_TEMPORARY_ERROR)
<< "Current state: " << commit_state_;
SetCommitState(DOWNLOAD_SETTING_REMOTE_WATCHER);
// Retrieve the server-side token of the last commit we received.
std::string last_commit_token_id;
storage_->GetSyncMetadata(
kTimestampKey,
task_runner_->MakeScoped([this, is_retry](
storage::Status status,
std::string last_commit_token_id) {
if (status != storage::Status::OK &&
status != storage::Status::INTERNAL_NOT_FOUND) {
HandleDownloadCommitError("Failed to retrieve the sync metadata.");
return;
}
std::unique_ptr<cloud_provider::Token> position_token;
if (!last_commit_token_id.empty()) {
position_token = std::make_unique<cloud_provider::Token>();
position_token->opaque_id = convert::ToArray(last_commit_token_id);
}
cloud_provider::PageCloudWatcherPtr watcher;
watcher_binding_.Bind(watcher.NewRequest());
(*page_cloud_)
->SetWatcher(
std::move(position_token), std::move(watcher),
[this](auto status) {
// This should always succeed - any errors are reported
// through OnError().
if (status != cloud_provider::Status::OK) {
HandleDownloadCommitError(
"Unexpected error when setting the PageCloudWatcher.");
}
});
SetCommitState(DOWNLOAD_IDLE);
if (is_retry) {
FXL_LOG(INFO) << log_prefix_ << "Cloud watcher re-established";
}
}));
}
void PageDownload::OnNewCommits(cloud_provider::CommitPack commits,
cloud_provider::Token position_token,
OnNewCommitsCallback callback) {
std::vector<cloud_provider::CommitPackEntry> entries;
if (!cloud_provider::DecodeCommitPack(commits, &entries)) {
HandleDownloadCommitError("Failed to decode the commits");
return;
}
if (batch_download_) {
// If there is already a commit batch being downloaded, save the new commits
// to be downloaded when it is done.
std::move(entries.begin(), entries.end(),
std::back_inserter(commits_to_download_));
position_token_ = fidl::MakeOptional(std::move(position_token));
callback();
return;
}
SetCommitState(DOWNLOAD_IN_PROGRESS);
DownloadBatch(std::move(entries),
fidl::MakeOptional(std::move(position_token)),
std::move(callback));
}
void PageDownload::OnNewObject(std::vector<uint8_t> /*id*/,
fuchsia::mem::Buffer /*data*/,
OnNewObjectCallback /*callback*/) {
// No known cloud provider implementations use this method.
// TODO(ppi): implement this method when we have such cloud provider
// implementations.
FXL_NOTIMPLEMENTED();
}
void PageDownload::OnError(cloud_provider::Status status) {
FXL_DCHECK(commit_state_ == DOWNLOAD_IDLE ||
commit_state_ == DOWNLOAD_IN_PROGRESS);
if (!IsPermanentError(status)) {
// Reset the watcher and schedule a retry.
if (watcher_binding_.is_bound()) {
watcher_binding_.Unbind();
}
SetCommitState(DOWNLOAD_TEMPORARY_ERROR);
FXL_LOG(WARNING)
<< log_prefix_
<< "Connection error in the remote commit watcher, retrying.";
RetryWithBackoff([this] { SetRemoteWatcher(true); });
return;
}
if (status == cloud_provider::Status::PARSE_ERROR) {
HandleDownloadCommitError(
"Received a malformed remote commit notification.");
return;
}
FXL_LOG(WARNING) << "Received unexpected error from PageCloudWatcher: "
<< fidl::ToUnderlying(status);
HandleDownloadCommitError("Received unexpected error from PageCloudWatcher.");
}
void PageDownload::DownloadBatch(
std::vector<cloud_provider::CommitPackEntry> entries,
std::unique_ptr<cloud_provider::Token> position_token,
fit::closure on_done) {
FXL_DCHECK(!batch_download_);
batch_download_ = std::make_unique<BatchDownload>(
storage_, encryption_service_, std::move(entries),
std::move(position_token),
[this, on_done = std::move(on_done)] {
if (on_done) {
on_done();
}
batch_download_.reset();
if (commits_to_download_.empty()) {
// Don't set to idle if we're in process of setting the remote
// watcher.
if (commit_state_ == DOWNLOAD_IN_PROGRESS) {
SetCommitState(DOWNLOAD_IDLE);
}
return;
}
auto entries = std::move(commits_to_download_);
commits_to_download_.clear();
DownloadBatch(std::move(entries), std::move(position_token_), nullptr);
},
[this] {
HandleDownloadCommitError(
"Failed to persist a remote commit in storage");
});
batch_download_->Start();
}
void PageDownload::GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
current_get_object_calls_++;
UpdateDownloadState();
encryption_service_->GetObjectName(
object_identifier,
task_runner_->MakeScoped(
[this, object_identifier, callback = std::move(callback)](
encryption::Status status, std::string object_name) mutable {
if (status != encryption::Status::OK) {
HandleGetObjectError(std::move(object_identifier),
encryption::IsPermanentError(status),
"encryption", std::move(callback));
return;
}
(*page_cloud_)
->GetObject(
convert::ToArray(object_name),
[this, object_identifier = std::move(object_identifier),
callback = std::move(callback)](
cloud_provider::Status status,
::fuchsia::mem::BufferPtr data) mutable {
if (status != cloud_provider::Status::OK) {
HandleGetObjectError(std::move(object_identifier),
IsPermanentError(status),
"cloud provider",
std::move(callback));
return;
}
fsl::SizedVmo sized_vmo;
if (!fsl::SizedVmo::FromTransport(std::move(*data),
&sized_vmo)) {
HandleGetObjectError(std::move(object_identifier), true,
"converting to SizedVmo",
std::move(callback));
return;
}
DecryptObject(
std::move(object_identifier),
storage::DataSource::Create(std::move(sized_vmo)),
std::move(callback));
});
}));
}
void PageDownload::DecryptObject(
storage::ObjectIdentifier object_identifier,
std::unique_ptr<storage::DataSource> content,
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
storage::ReadDataSource(
&managed_container_, std::move(content),
[this, object_identifier = std::move(object_identifier),
callback = std::move(callback)](
storage::Status status,
std::unique_ptr<storage::DataSource::DataChunk> content) mutable {
if (status != storage::Status::OK) {
HandleGetObjectError(std::move(object_identifier), true, "io",
std::move(callback));
return;
}
encryption_service_->DecryptObject(
object_identifier, content->Get().ToString(),
[this, object_identifier, callback = std::move(callback)](
encryption::Status status, std::string content) mutable {
if (status != encryption::Status::OK) {
HandleGetObjectError(object_identifier,
encryption::IsPermanentError(status),
"encryption", std::move(callback));
return;
}
backoff_->Reset();
callback(
storage::Status::OK, storage::ChangeSource::CLOUD,
storage::IsObjectSynced::YES,
storage::DataSource::DataChunk::Create(std::move(content)));
current_get_object_calls_--;
UpdateDownloadState();
});
});
}
void PageDownload::HandleGetObjectError(
storage::ObjectIdentifier object_identifier, bool is_permanent,
const char error_name[],
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
if (is_permanent) {
backoff_->Reset();
FXL_LOG(WARNING) << log_prefix_ << "GetObject() failed due to a permanent "
<< error_name << " error.";
callback(storage::Status::IO_ERROR, storage::ChangeSource::CLOUD,
storage::IsObjectSynced::YES, nullptr);
current_get_object_calls_--;
UpdateDownloadState();
return;
}
FXL_LOG(WARNING) << log_prefix_ << "GetObject() failed due to a "
<< error_name << " error, retrying.";
current_get_object_calls_--;
UpdateDownloadState();
RetryWithBackoff([this, object_identifier = std::move(object_identifier),
callback = std::move(callback)]() mutable {
GetObject(object_identifier, std::move(callback));
});
}
void PageDownload::HandleDownloadCommitError(const char error_description[]) {
FXL_LOG(ERROR) << log_prefix_ << error_description << " Stopping sync.";
if (watcher_binding_.is_bound()) {
watcher_binding_.Unbind();
}
sync_client_->SetSyncDelegate(nullptr);
SetCommitState(DOWNLOAD_PERMANENT_ERROR);
}
void PageDownload::SetCommitState(DownloadSyncState new_state) {
if (new_state == commit_state_) {
return;
}
commit_state_ = new_state;
UpdateDownloadState();
}
void PageDownload::UpdateDownloadState() {
DownloadSyncState new_state =
GetMergedState(commit_state_, current_get_object_calls_);
// Notify only if the externally visible state changed.
if (new_state != merged_state_) {
merged_state_ = new_state;
delegate_->SetDownloadState(
GetMergedState(commit_state_, current_get_object_calls_));
}
}
void PageDownload::RetryWithBackoff(fit::closure callable) {
task_runner_->PostDelayedTask(
[this, callable = std::move(callable)]() {
if (this->commit_state_ != DOWNLOAD_PERMANENT_ERROR) {
callable();
}
},
backoff_->GetNext());
}
} // namespace cloud_sync