blob: a6409c38cea69f8689cb542a4cf19ccf1f8db686 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/storage/impl/page_db_batch_impl.h"
#include <map>
#include <memory>
#include "src/ledger/bin/storage/impl/clock_serialization.h"
#include "src/ledger/bin/storage/impl/data_serialization.h"
#include "src/ledger/bin/storage/impl/db_serialization.h"
#include "src/ledger/bin/storage/impl/object_digest.h"
#include "src/ledger/bin/storage/impl/page_db.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/lib/logging/logging.h"
#include "third_party/abseil-cpp/absl/strings/string_view.h"
namespace storage {
using coroutine::CoroutineHandler;
PageDbBatchImpl::PageDbBatchImpl(std::unique_ptr<Db::Batch> batch, PageDb* page_db,
ObjectIdentifierFactory* factory)
: batch_(std::move(batch)), page_db_(page_db), factory_(factory) {}
PageDbBatchImpl::~PageDbBatchImpl() { UntrackPendingDeletions(); }
Status PageDbBatchImpl::AddHead(CoroutineHandler* handler, CommitIdView head,
zx::time_utc timestamp) {
return batch_->Put(handler, HeadRow::GetKeyFor(head), SerializeData(timestamp));
}
Status PageDbBatchImpl::RemoveHead(CoroutineHandler* handler, CommitIdView head) {
return batch_->Delete(handler, HeadRow::GetKeyFor(head));
}
Status PageDbBatchImpl::AddMerge(coroutine::CoroutineHandler* handler, CommitIdView parent1_id,
CommitIdView parent2_id, CommitIdView merge_commit_id) {
return batch_->Put(handler, MergeRow::GetKeyFor(parent1_id, parent2_id, merge_commit_id), "");
}
Status PageDbBatchImpl::DeleteMerge(coroutine::CoroutineHandler* handler, CommitIdView parent1_id,
CommitIdView parent2_id, CommitIdView commit_id) {
return batch_->Delete(handler, MergeRow::GetKeyFor(parent1_id, parent2_id, commit_id));
}
Status PageDbBatchImpl::AddCommitStorageBytes(CoroutineHandler* handler, const CommitId& commit_id,
absl::string_view remote_commit_id,
const ObjectIdentifier& root_node,
absl::string_view storage_bytes) {
RETURN_ON_ERROR(batch_->Put(
handler, ReferenceRow::GetKeyForCommit(commit_id, root_node.object_digest()), ""));
RETURN_ON_ERROR(
batch_->Put(handler, RemoteCommitIdToLocalRow::GetKeyFor(remote_commit_id), commit_id));
return batch_->Put(handler, CommitRow::GetKeyFor(commit_id), storage_bytes);
}
Status PageDbBatchImpl::DeleteCommit(coroutine::CoroutineHandler* handler, CommitIdView commit_id,
absl::string_view remote_commit_id,
const ObjectIdentifier& root_node) {
RETURN_ON_ERROR(
batch_->Delete(handler, ReferenceRow::GetKeyForCommit(commit_id, root_node.object_digest())));
RETURN_ON_ERROR(batch_->Delete(handler, UnsyncedCommitRow::GetKeyFor(commit_id)));
RETURN_ON_ERROR(batch_->Delete(handler, RemoteCommitIdToLocalRow::GetKeyFor(remote_commit_id)));
return batch_->Delete(handler, CommitRow::GetKeyFor(commit_id));
}
Status PageDbBatchImpl::WriteObject(CoroutineHandler* handler, const Piece& piece,
PageDbObjectStatus object_status,
const ObjectReferencesAndPriority& references) {
LEDGER_DCHECK(object_status > PageDbObjectStatus::UNKNOWN);
const ObjectIdentifier& object_identifier = piece.GetIdentifier();
Status status = page_db_->HasObject(handler, object_identifier);
if (status == Status::OK) {
if (object_status == PageDbObjectStatus::TRANSIENT) {
return Status::OK;
}
return SetObjectStatus(handler, piece.GetIdentifier(), object_status);
}
if (status != Status::INTERNAL_NOT_FOUND) {
return status;
}
RETURN_ON_ERROR(batch_->Put(handler, ObjectRow::GetKeyFor(object_identifier.object_digest()),
piece.GetData()));
for (const auto& [child, priority] : references) {
LEDGER_DCHECK(!GetObjectDigestInfo(child).is_inlined());
RETURN_ON_ERROR(batch_->Put(
handler, ReferenceRow::GetKeyForObject(object_identifier.object_digest(), child, priority),
""));
}
return batch_->Put(handler, ObjectStatusRow::GetKeyFor(object_status, object_identifier), "");
}
// Object deletion of |object_digest| proceeds in several steps:
// - register the object as pending deletion, and fail if the object already has any live reference.
// From this point on, if any other part of the code attempts to create an ObjectIdentifier for
// this object (in particular to read or write it), it will automatically mark the deletion as
// aborted.
// - collect all the synchronization statuses for the object. A given object may be known under
// different identifiers, with different sync statuses. Do not decode those object identifiers, as
// it would create a live reference to the object, that would abort the deletion (see below).
// - for each status, abort if the object is not garbage collectable (ie. has some on-disk
// references) and batch a delete of the associated keys.
// - batch a delete of the object itself, and all its |references|.
// - store the digest as pending deletion for this batch.
//
// When |Execute| eventually runs, it checks that none of the pending deletions have been aborted,
// ie. that no live references to the object has been introduced since the first step. No on-disk
// reference or change of status can have happened either, because all the entry points in PageDb
// that allow those changes require an ObjectIdentifier as input, the creation of which would have
// aborted the deletion. This is the reason this method is the only one in |PageDb| operating on
// ObjectDigest rather than ObjectIdentifier.
Status PageDbBatchImpl::DeleteObject(coroutine::CoroutineHandler* handler,
const ObjectDigest& object_digest,
const ObjectReferencesAndPriority& references) {
if (!factory_->TrackDeletion(object_digest)) {
LEDGER_VLOG(1) << "Object is live, cannot be deleted: " << object_digest;
return Status::CANCELED;
}
std::vector<std::string> object_status_keys;
Status status = page_db_->EnsureObjectDeletable(handler, object_digest, &object_status_keys);
if (status == Status::CANCELED) {
(void)factory_->UntrackDeletion(object_digest);
LEDGER_VLOG(1) << "Object is not garbage collectable, cannot be deleted: " << object_digest;
return Status::CANCELED;
}
RETURN_ON_ERROR(status);
for (const auto& object_status_key : object_status_keys) {
RETURN_ON_ERROR(batch_->Delete(handler, object_status_key));
}
RETURN_ON_ERROR(batch_->Delete(handler, ObjectRow::GetKeyFor(object_digest)));
for (const auto& [child, priority] : references) {
LEDGER_DCHECK(!GetObjectDigestInfo(child).is_inlined());
RETURN_ON_ERROR(
batch_->Delete(handler, ReferenceRow::GetKeyForObject(object_digest, child, priority)));
}
pending_deletion_.insert(object_digest);
return Status::OK;
}
Status PageDbBatchImpl::SetObjectStatus(CoroutineHandler* handler,
const ObjectIdentifier& object_identifier,
PageDbObjectStatus object_status) {
LEDGER_DCHECK(object_status >= PageDbObjectStatus::LOCAL);
RETURN_ON_ERROR(DCheckHasObject(handler, object_identifier));
PageDbObjectStatus previous_object_status;
RETURN_ON_ERROR(page_db_->GetObjectStatus(handler, object_identifier, &previous_object_status));
if (previous_object_status >= object_status) {
return Status::OK;
}
// The object might exist already under a different identifier (with the same digest), in which
// case there is no status row to delete.
if (previous_object_status != PageDbObjectStatus::UNKNOWN) {
RETURN_ON_ERROR(batch_->Delete(
handler, ObjectStatusRow::GetKeyFor(previous_object_status, object_identifier)));
}
return batch_->Put(handler, ObjectStatusRow::GetKeyFor(object_status, object_identifier), "");
}
Status PageDbBatchImpl::MarkCommitIdSynced(CoroutineHandler* handler, const CommitId& commit_id) {
return batch_->Delete(handler, UnsyncedCommitRow::GetKeyFor(commit_id));
}
Status PageDbBatchImpl::MarkCommitIdUnsynced(CoroutineHandler* handler, const CommitId& commit_id,
uint64_t generation) {
return batch_->Put(handler, UnsyncedCommitRow::GetKeyFor(commit_id), SerializeData(generation));
}
Status PageDbBatchImpl::SetSyncMetadata(CoroutineHandler* handler, absl::string_view key,
absl::string_view value) {
return batch_->Put(handler, SyncMetadataRow::GetKeyFor(key), value);
}
Status PageDbBatchImpl::MarkPageOnline(coroutine::CoroutineHandler* handler) {
return batch_->Put(handler, PageIsOnlineRow::kKey, "");
}
Status PageDbBatchImpl::SetDeviceId(coroutine::CoroutineHandler* handler,
const clocks::DeviceId& device_id) {
std::string device_id_data = SerializeDeviceId(device_id);
return batch_->Put(handler, ClockRow::kDeviceIdKey, device_id_data);
}
Status PageDbBatchImpl::SetClock(coroutine::CoroutineHandler* handler, const Clock& entry) {
std::string data = SerializeClock(entry);
return batch_->Put(handler, ClockRow::kEntriesKey, data);
}
Status PageDbBatchImpl::Execute(CoroutineHandler* handler) {
if (!UntrackPendingDeletions()) {
return Status::CANCELED;
}
return batch_->Execute(handler);
}
bool PageDbBatchImpl::UntrackPendingDeletions() {
bool aborted = false;
for (const ObjectDigest& object_digest : pending_deletion_) {
if (!factory_->UntrackDeletion(object_digest)) {
LEDGER_VLOG(1) << "Deletion has been aborted, object cannot be deleted: " << object_digest;
aborted = true;
}
}
pending_deletion_.clear();
return !aborted;
}
Status PageDbBatchImpl::DCheckHasObject(CoroutineHandler* handler, const ObjectIdentifier& key) {
#ifdef NDEBUG
return Status::OK;
#else
Status status = page_db_->HasObject(handler, key);
if (status == Status::INTERRUPTED) {
return status;
}
LEDGER_DCHECK(status == Status::OK) << key;
return Status::OK;
#endif
}
} // namespace storage