blob: 72c400c418af1df4ed162aa56f14776898678f94 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/storage/impl/page_db_batch_impl.h"
#include <map>
#include <memory>
#include "src/ledger/bin/storage/impl/clock_serialization.h"
#include "src/ledger/bin/storage/impl/data_serialization.h"
#include "src/ledger/bin/storage/impl/db_serialization.h"
#include "src/ledger/bin/storage/impl/object_digest.h"
#include "src/ledger/bin/storage/impl/page_db.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/lib/fxl/logging.h"
#include "src/lib/fxl/strings/concatenate.h"
namespace storage {
using coroutine::CoroutineHandler;
PageDbBatchImpl::PageDbBatchImpl(std::unique_ptr<Db::Batch> batch, PageDb* page_db, Db* db,
ObjectIdentifierFactory* factory)
: batch_(std::move(batch)), page_db_(page_db), db_(db), factory_(factory) {}
PageDbBatchImpl::~PageDbBatchImpl() { UntrackPendingDeletions(); }
Status PageDbBatchImpl::AddHead(CoroutineHandler* handler, CommitIdView head,
zx::time_utc timestamp) {
return batch_->Put(handler, HeadRow::GetKeyFor(head), SerializeData(timestamp));
}
Status PageDbBatchImpl::RemoveHead(CoroutineHandler* handler, CommitIdView head) {
return batch_->Delete(handler, HeadRow::GetKeyFor(head));
}
Status PageDbBatchImpl::AddMerge(coroutine::CoroutineHandler* handler, CommitIdView parent1_id,
CommitIdView parent2_id, CommitIdView merge_commit_id) {
return batch_->Put(handler, MergeRow::GetKeyFor(parent1_id, parent2_id, merge_commit_id), "");
}
Status PageDbBatchImpl::DeleteMerge(coroutine::CoroutineHandler* handler, CommitIdView parent1_id,
CommitIdView parent2_id, CommitIdView commit_id) {
return batch_->Delete(handler, MergeRow::GetKeyFor(parent1_id, parent2_id, commit_id));
}
Status PageDbBatchImpl::AddCommitStorageBytes(CoroutineHandler* handler, const CommitId& commit_id,
fxl::StringView remote_commit_id,
const ObjectIdentifier& root_node,
fxl::StringView storage_bytes) {
RETURN_ON_ERROR(batch_->Put(
handler, ReferenceRow::GetKeyForCommit(commit_id, root_node.object_digest()), ""));
RETURN_ON_ERROR(
batch_->Put(handler, RemoteCommitIdToLocalRow::GetKeyFor(remote_commit_id), commit_id));
return batch_->Put(handler, CommitRow::GetKeyFor(commit_id), storage_bytes);
}
Status PageDbBatchImpl::DeleteCommit(coroutine::CoroutineHandler* handler, CommitIdView commit_id,
fxl::StringView remote_commit_id,
const ObjectIdentifier& root_node) {
RETURN_ON_ERROR(
batch_->Delete(handler, ReferenceRow::GetKeyForCommit(commit_id, root_node.object_digest())));
RETURN_ON_ERROR(batch_->Delete(handler, UnsyncedCommitRow::GetKeyFor(commit_id)));
RETURN_ON_ERROR(batch_->Delete(handler, RemoteCommitIdToLocalRow::GetKeyFor(remote_commit_id)));
return batch_->Delete(handler, CommitRow::GetKeyFor(commit_id));
}
Status PageDbBatchImpl::WriteObject(CoroutineHandler* handler, const Piece& piece,
PageDbObjectStatus object_status,
const ObjectReferencesAndPriority& references) {
FXL_DCHECK(object_status > PageDbObjectStatus::UNKNOWN);
const ObjectIdentifier& object_identifier = piece.GetIdentifier();
Status status = page_db_->HasObject(handler, object_identifier);
if (status == Status::OK) {
if (object_status == PageDbObjectStatus::TRANSIENT) {
return Status::OK;
}
return SetObjectStatus(handler, piece.GetIdentifier(), object_status);
}
if (status != Status::INTERNAL_NOT_FOUND) {
return status;
}
RETURN_ON_ERROR(batch_->Put(handler, ObjectRow::GetKeyFor(object_identifier.object_digest()),
piece.GetData()));
for (const auto& [child, priority] : references) {
FXL_DCHECK(!GetObjectDigestInfo(child).is_inlined());
RETURN_ON_ERROR(batch_->Put(
handler, ReferenceRow::GetKeyForObject(object_identifier.object_digest(), child, priority),
""));
}
return batch_->Put(handler, ObjectStatusRow::GetKeyFor(object_status, object_identifier), "");
}
// Object deletion of |object_digest| proceeds in several steps:
// - register the object as pending deletion, and fail if the object already has any live reference.
// From this point on, if any other part of the code attempts to create an ObjectIdentifier for
// this object (in particular to read or write it), it will automatically mark the deletion as
// aborted.
// - collect all the synchronization statuses for the object. A given object may be known under
// different identifiers, with different sync statuses. Do not decode those object identifiers, as
// it would create a live reference to the object, that would abort the deletion (see below).
// - for each status, abort if the object is not garbage collectable (ie. has some on-disk
// references) and batch a delete of the associated keys.
// - batch a delete of the object itself, and all its |references|.
// - store the digest as pending deletion for this batch.
//
// When |Execute| eventually runs, it checks that none of the pending deletions have been aborted,
// ie. that no live references to the object has been introduced since the first step. No on-disk
// reference or change of status can have happened either, because all the entry points in PageDb
// that allow those changes require an ObjectIdentifier as input, the creation of which would have
// aborted the deletion. This is the reason this method is the only one in |PageDb| operating on
// ObjectDigest rather than ObjectIdentifier.
Status PageDbBatchImpl::DeleteObject(coroutine::CoroutineHandler* handler,
const ObjectDigest& object_digest,
const ObjectReferencesAndPriority& references) {
if (!factory_->TrackDeletion(object_digest)) {
FXL_VLOG(1) << "Object is live, cannot be deleted: " << object_digest;
return Status::CANCELED;
}
std::map<std::string, PageDbObjectStatus> keys;
RETURN_ON_ERROR(page_db_->GetObjectStatusKeys(handler, object_digest, &keys));
// |seen_status| is used to ensure that we call |IsGarbageCollectable| only once per status.
std::set<PageDbObjectStatus> seen_status;
for (const auto& [object_status_key, object_status] : keys) {
if (seen_status.insert(object_status).second) {
bool is_garbage_collectable = false;
RETURN_ON_ERROR(
IsGarbageCollectable(handler, object_digest, object_status, &is_garbage_collectable));
if (!is_garbage_collectable) {
// Abort the deletion.
(void)factory_->UntrackDeletion(object_digest);
FXL_VLOG(1) << "Object is not garbage collectable, cannot be deleted: " << object_digest;
return Status::CANCELED;
}
}
RETURN_ON_ERROR(batch_->Delete(handler, object_status_key));
}
RETURN_ON_ERROR(batch_->Delete(handler, ObjectRow::GetKeyFor(object_digest)));
for (const auto& [child, priority] : references) {
FXL_DCHECK(!GetObjectDigestInfo(child).is_inlined());
RETURN_ON_ERROR(
batch_->Delete(handler, ReferenceRow::GetKeyForObject(object_digest, child, priority)));
}
pending_deletion_.insert(object_digest);
return Status::OK;
}
Status PageDbBatchImpl::IsGarbageCollectable(coroutine::CoroutineHandler* handler,
const ObjectDigest& digest,
PageDbObjectStatus object_status,
bool *result) {
*result = false;
std::string prefix;
switch (object_status) {
case PageDbObjectStatus::UNKNOWN:
FXL_NOTREACHED();
return Status::INTERNAL_ERROR;
case PageDbObjectStatus::SYNCED:
// object-object links only for synced objects: even if referenced by on-disk commits,
// it is safe to discard them (and recover them later from the cloud).
prefix = ReferenceRow::GetObjectKeyPrefixFor(digest);
break;
case PageDbObjectStatus::TRANSIENT:
case PageDbObjectStatus::LOCAL:
// object-object and commit-object links must both be zero for all other types of objects.
prefix = ReferenceRow::GetKeyPrefixFor(digest);
break;
}
Status status = db_->HasPrefix(handler, prefix);
if (status == Status::INTERNAL_NOT_FOUND) {
*result = true;
status = Status::OK;
}
return status;
}
Status PageDbBatchImpl::SetObjectStatus(CoroutineHandler* handler,
const ObjectIdentifier& object_identifier,
PageDbObjectStatus object_status) {
FXL_DCHECK(object_status >= PageDbObjectStatus::LOCAL);
RETURN_ON_ERROR(DCheckHasObject(handler, object_identifier));
PageDbObjectStatus previous_object_status;
RETURN_ON_ERROR(page_db_->GetObjectStatus(handler, object_identifier, &previous_object_status));
if (previous_object_status >= object_status) {
return Status::OK;
}
// The object might exist already under a different identifier (with the same digest), in which
// case there is no status row to delete.
if (previous_object_status != PageDbObjectStatus::UNKNOWN) {
RETURN_ON_ERROR(batch_->Delete(
handler, ObjectStatusRow::GetKeyFor(previous_object_status, object_identifier)));
}
return batch_->Put(handler, ObjectStatusRow::GetKeyFor(object_status, object_identifier), "");
}
Status PageDbBatchImpl::MarkCommitIdSynced(CoroutineHandler* handler, const CommitId& commit_id) {
return batch_->Delete(handler, UnsyncedCommitRow::GetKeyFor(commit_id));
}
Status PageDbBatchImpl::MarkCommitIdUnsynced(CoroutineHandler* handler, const CommitId& commit_id,
uint64_t generation) {
return batch_->Put(handler, UnsyncedCommitRow::GetKeyFor(commit_id), SerializeData(generation));
}
Status PageDbBatchImpl::SetSyncMetadata(CoroutineHandler* handler, fxl::StringView key,
fxl::StringView value) {
return batch_->Put(handler, SyncMetadataRow::GetKeyFor(key), value);
}
Status PageDbBatchImpl::MarkPageOnline(coroutine::CoroutineHandler* handler) {
return batch_->Put(handler, PageIsOnlineRow::kKey, "");
}
Status PageDbBatchImpl::SetDeviceId(coroutine::CoroutineHandler* handler, DeviceIdView device_id) {
return batch_->Put(handler, ClockRow::kDeviceIdKey, device_id);
}
Status PageDbBatchImpl::SetClockEntry(coroutine::CoroutineHandler* handler, DeviceIdView device_id,
const ClockEntry& entry) {
std::string data;
SerializeClockEntry(entry, &data);
return batch_->Put(handler, ClockRow::GetClockEntryForKey(device_id), data);
}
Status PageDbBatchImpl::Execute(CoroutineHandler* handler) {
if (!UntrackPendingDeletions()) {
return Status::CANCELED;
}
return batch_->Execute(handler);
}
bool PageDbBatchImpl::UntrackPendingDeletions() {
bool aborted = false;
for (const ObjectDigest& object_digest : pending_deletion_) {
if (!factory_->UntrackDeletion(object_digest)) {
FXL_VLOG(1) << "Deletion has been aborted, object cannot be deleted: " << object_digest;
aborted = true;
}
}
pending_deletion_.clear();
return !aborted;
}
Status PageDbBatchImpl::DCheckHasObject(CoroutineHandler* handler, const ObjectIdentifier& key) {
#ifdef NDEBUG
return Status::OK;
#else
Status status = page_db_->HasObject(handler, key);
if (status == Status::INTERRUPTED) {
return status;
}
FXL_DCHECK(status == Status::OK) << key;
return Status::OK;
#endif
}
} // namespace storage