blob: 222d54e03f0a818df643f08cff9947ab84ae7d70 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/cloud_sync/impl/page_upload.h"
#include <lib/callback/scoped_callback.h>
#include <lib/fit/function.h>
namespace cloud_sync {
PageUpload::PageUpload(callback::ScopedTaskRunner* task_runner,
storage::PageStorage* storage,
encryption::EncryptionService* encryption_service,
cloud_provider::PageCloudPtr* page_cloud,
Delegate* delegate,
std::unique_ptr<backoff::Backoff> backoff)
: task_runner_(task_runner),
storage_(storage),
encryption_service_(encryption_service),
page_cloud_(page_cloud),
delegate_(delegate),
log_prefix_("Page " + convert::ToHex(storage->GetId()) +
" upload sync: "),
backoff_(std::move(backoff)),
weak_ptr_factory_(this) {}
PageUpload::~PageUpload() {}
void PageUpload::StartOrRestartUpload() {
if (external_state_ == UPLOAD_NOT_STARTED) {
// When called for the first time, this method is responsible for handling
// the initial setup.
SetState(UPLOAD_SETUP);
// Starting to watch right away is not an issue, because new commit
// notifications are used as a tickle only, and we use a separate call to
// get unsynced commits.
storage_->AddCommitWatcher(this);
}
// Whether called for the first time or to restart upload, prime the upload
// process.
NextState();
}
void PageUpload::OnNewCommits(
const std::vector<std::unique_ptr<const storage::Commit>>& /*commits*/,
storage::ChangeSource source) {
// Only upload the locally created commits.
// TODO(ppi): revisit this when we have p2p sync, too.
if (source != storage::ChangeSource::LOCAL) {
return;
}
if (external_state_ == UPLOAD_TEMPORARY_ERROR) {
// Upload is already scheduled to retry uploading. No need to do anything
// here.
return;
}
NextState();
}
void PageUpload::UploadUnsyncedCommits() {
FXL_DCHECK(internal_state_ == PageUploadState::PROCESSING);
if (!delegate_->IsDownloadIdle()) {
// If a commit batch is currently being downloaded, don't try to start the
// upload.
SetState(UPLOAD_WAIT_REMOTE_DOWNLOAD);
PreviousState();
return;
}
SetState(UPLOAD_PENDING);
// Retrieve the of the existing unsynced commits and enqueue them for
// upload.
// TODO(ppi): either switch to a paginating API or (better?) ensure that long
// backlogs of local commits are squashed in storage, as otherwise the list of
// commits can be possibly very big.
storage_->GetUnsyncedCommits(callback::MakeScoped(
weak_ptr_factory_.GetWeakPtr(),
[this](storage::Status status,
std::vector<std::unique_ptr<const storage::Commit>> commits) {
if (status != storage::Status::OK) {
SetState(UPLOAD_PERMANENT_ERROR);
HandleError("Failed to retrieve the unsynced commits");
return;
}
VerifyUnsyncedCommits(std::move(commits));
}));
}
void PageUpload::VerifyUnsyncedCommits(
std::vector<std::unique_ptr<const storage::Commit>> commits) {
// If we have no commit to upload, skip.
if (commits.empty()) {
SetState(UPLOAD_IDLE);
PreviousState();
return;
}
storage_->GetHeadCommitIds(callback::MakeScoped(
weak_ptr_factory_.GetWeakPtr(),
[this, commits = std::move(commits)](
storage::Status status,
std::vector<storage::CommitId> heads) mutable {
if (status != storage::Status::OK) {
HandleError("Failed to retrieve the current heads");
return;
}
FXL_DCHECK(!heads.empty());
if (!delegate_->IsDownloadIdle()) {
// If a commit batch is currently being downloaded, don't try to start
// the upload.
SetState(UPLOAD_WAIT_REMOTE_DOWNLOAD);
PreviousState();
return;
}
if (heads.size() > 1u) {
// Too many local heads.
SetState(UPLOAD_WAIT_TOO_MANY_LOCAL_HEADS);
PreviousState();
return;
}
HandleUnsyncedCommits(std::move(commits));
}));
}
void PageUpload::HandleUnsyncedCommits(
std::vector<std::unique_ptr<const storage::Commit>> commits) {
FXL_DCHECK(!batch_upload_);
SetState(UPLOAD_IN_PROGRESS);
batch_upload_ = std::make_unique<BatchUpload>(
storage_, encryption_service_, page_cloud_, std::move(commits),
[this] {
// Upload succeeded, reset the backoff delay.
backoff_->Reset();
batch_upload_.reset();
PreviousState();
},
[this](BatchUpload::ErrorType error_type) {
switch (error_type) {
case BatchUpload::ErrorType::TEMPORARY: {
FXL_LOG(WARNING)
<< log_prefix_
<< "commit upload failed due to a connection error, retrying.";
SetState(UPLOAD_TEMPORARY_ERROR);
batch_upload_.reset();
PreviousState();
RetryWithBackoff([this] { NextState(); });
} break;
case BatchUpload::ErrorType::PERMANENT: {
FXL_LOG(WARNING) << log_prefix_
<< "commit upload failed with a permanent error.";
SetState(UPLOAD_PERMANENT_ERROR);
} break;
}
});
batch_upload_->Start();
}
void PageUpload::HandleError(const char error_description[]) {
FXL_LOG(ERROR) << log_prefix_ << error_description << " Stopping sync.";
if (external_state_ > UPLOAD_SETUP) {
storage_->RemoveCommitWatcher(this);
}
SetState(UPLOAD_PERMANENT_ERROR);
}
void PageUpload::RetryWithBackoff(fit::closure callable) {
task_runner_->PostDelayedTask(
[this, callable = std::move(callable)]() {
if (this->external_state_ != UPLOAD_PERMANENT_ERROR) {
callable();
}
},
backoff_->GetNext());
}
void PageUpload::SetState(UploadSyncState new_state) {
if (new_state == external_state_) {
return;
}
external_state_ = new_state;
// Posting to the run loop to handle the case where the delegate will delete
// this class in the SetUploadState method.
// TODO(qsr): Aggregate changed state, so that a change from A -> B -> A do
// not send any signal.
task_runner_->PostTask(
[this] { delegate_->SetUploadState(external_state_); });
}
bool PageUpload::IsIdle() {
switch (external_state_) {
case UPLOAD_NOT_STARTED:
case UPLOAD_IDLE:
// Note: this is considered idle because the reason for being blocked is
// external to the class - there's nothing to do on our side.
case UPLOAD_WAIT_TOO_MANY_LOCAL_HEADS:
case UPLOAD_PERMANENT_ERROR:
return true;
break;
case UPLOAD_SETUP:
case UPLOAD_PENDING:
case UPLOAD_WAIT_REMOTE_DOWNLOAD:
case UPLOAD_TEMPORARY_ERROR:
case UPLOAD_IN_PROGRESS:
return false;
break;
}
}
void PageUpload::NextState() {
switch (internal_state_) {
case PageUploadState::NO_COMMIT:
internal_state_ = PageUploadState::PROCESSING;
UploadUnsyncedCommits();
return;
case PageUploadState::PROCESSING:
case PageUploadState::PROCESSING_NEW_COMMIT:
internal_state_ = PageUploadState::PROCESSING_NEW_COMMIT;
return;
}
}
void PageUpload::PreviousState() {
switch (internal_state_) {
case PageUploadState::NO_COMMIT:
FXL_NOTREACHED() << "Bad state";
case PageUploadState::PROCESSING:
internal_state_ = PageUploadState::NO_COMMIT;
if (external_state_ == UPLOAD_IN_PROGRESS) {
SetState(UPLOAD_IDLE);
}
return;
case PageUploadState::PROCESSING_NEW_COMMIT:
internal_state_ = PageUploadState::PROCESSING;
UploadUnsyncedCommits();
return;
}
}
} // namespace cloud_sync