blob: 20a132ca65213fefc9aeb5cf53bbce2c049d25ae [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/storage/impl/page_db_batch_impl.h"
#include <memory>
#include <lib/fxl/strings/concatenate.h>
#include "peridot/bin/ledger/storage/impl/data_serialization.h"
#include "peridot/bin/ledger/storage/impl/db_serialization.h"
#include "peridot/bin/ledger/storage/impl/journal_impl.h"
#define RETURN_ON_ERROR(expr) \
do { \
Status status = (expr); \
if (status != Status::OK) { \
return status; \
} \
} while (0)
namespace storage {
using coroutine::CoroutineHandler;
PageDbBatchImpl::PageDbBatchImpl(rng::Random* random,
std::unique_ptr<Db::Batch> batch, PageDb* db)
: random_(random), batch_(std::move(batch)), db_(db) {}
PageDbBatchImpl::~PageDbBatchImpl() {}
Status PageDbBatchImpl::AddHead(CoroutineHandler* handler, CommitIdView head,
zx::time_utc timestamp) {
return batch_->Put(handler, HeadRow::GetKeyFor(head),
SerializeData(timestamp));
}
Status PageDbBatchImpl::RemoveHead(CoroutineHandler* handler,
CommitIdView head) {
return batch_->Delete(handler, HeadRow::GetKeyFor(head));
}
Status PageDbBatchImpl::AddCommitStorageBytes(CoroutineHandler* handler,
const CommitId& commit_id,
fxl::StringView storage_bytes) {
return batch_->Put(handler, CommitRow::GetKeyFor(commit_id), storage_bytes);
}
Status PageDbBatchImpl::RemoveCommit(CoroutineHandler* handler,
const CommitId& commit_id) {
return batch_->Delete(handler, CommitRow::GetKeyFor(commit_id));
}
Status PageDbBatchImpl::CreateJournalId(coroutine::CoroutineHandler* handler,
JournalType journal_type,
const CommitId& base,
JournalId* journal_id) {
JournalId id = JournalEntryRow::NewJournalId(random_, journal_type);
Status status = Status::OK;
if (journal_type == JournalType::IMPLICIT) {
status =
batch_->Put(handler, ImplicitJournalMetadataRow::GetKeyFor(id), base);
}
if (status == Status::OK) {
journal_id->swap(id);
}
return status;
}
Status PageDbBatchImpl::RemoveExplicitJournals(CoroutineHandler* handler) {
static std::string kExplicitJournalPrefix =
fxl::Concatenate({JournalEntryRow::kPrefix,
fxl::StringView(&JournalEntryRow::kExplicitPrefix, 1)});
return batch_->DeleteByPrefix(handler, kExplicitJournalPrefix);
}
Status PageDbBatchImpl::RemoveJournal(CoroutineHandler* handler,
const JournalId& journal_id) {
if (journal_id[0] == JournalEntryRow::kImplicitPrefix) {
RETURN_ON_ERROR(batch_->Delete(
handler, ImplicitJournalMetadataRow::GetKeyFor(journal_id)));
}
return batch_->DeleteByPrefix(handler,
JournalEntryRow::GetPrefixFor(journal_id));
}
Status PageDbBatchImpl::AddJournalEntry(
coroutine::CoroutineHandler* handler, const JournalId& journal_id,
fxl::StringView key, const ObjectIdentifier& object_identifier,
KeyPriority priority) {
return batch_->Put(handler, JournalEntryRow::GetKeyFor(journal_id, key),
JournalEntryRow::GetValueFor(object_identifier, priority));
}
Status PageDbBatchImpl::RemoveJournalEntry(coroutine::CoroutineHandler* handler,
const JournalId& journal_id,
convert::ExtendedStringView key) {
return batch_->Put(handler, JournalEntryRow::GetKeyFor(journal_id, key),
JournalEntryRow::kDeletePrefix);
}
Status PageDbBatchImpl::EmptyJournalAndMarkContainsClearOperation(
CoroutineHandler* handler, const JournalId& journal_id) {
Status status = batch_->DeleteByPrefix(
handler, JournalEntryRow::GetPrefixFor(journal_id));
if (status != Status::OK) {
return status;
}
return batch_->Put(handler, JournalEntryRow::GetClearMarkerKey(journal_id),
"");
}
Status PageDbBatchImpl::WriteObject(
CoroutineHandler* handler, ObjectIdentifier object_identifier,
std::unique_ptr<DataSource::DataChunk> content,
PageDbObjectStatus object_status) {
FXL_DCHECK(object_status > PageDbObjectStatus::UNKNOWN);
bool has_key;
RETURN_ON_ERROR(
db_->HasObject(handler, object_identifier.object_digest(), &has_key));
if (has_key) {
if (object_status == PageDbObjectStatus::TRANSIENT) {
return Status::OK;
}
return SetObjectStatus(handler, std::move(object_identifier),
object_status);
}
RETURN_ON_ERROR(batch_->Put(
handler, ObjectRow::GetKeyFor(object_identifier.object_digest()),
content->Get()));
return batch_->Put(
handler, ObjectStatusRow::GetKeyFor(object_status, object_identifier),
"");
}
Status PageDbBatchImpl::SetObjectStatus(CoroutineHandler* handler,
ObjectIdentifier object_identifier,
PageDbObjectStatus object_status) {
FXL_DCHECK(object_status >= PageDbObjectStatus::LOCAL);
RETURN_ON_ERROR(DCheckHasObject(handler, object_identifier.object_digest()));
PageDbObjectStatus previous_object_status;
RETURN_ON_ERROR(db_->GetObjectStatus(handler, object_identifier,
&previous_object_status));
if (previous_object_status >= object_status) {
return Status::OK;
}
RETURN_ON_ERROR(batch_->Delete(
handler,
ObjectStatusRow::GetKeyFor(previous_object_status, object_identifier)));
return batch_->Put(
handler, ObjectStatusRow::GetKeyFor(object_status, object_identifier),
"");
}
Status PageDbBatchImpl::MarkCommitIdSynced(CoroutineHandler* handler,
const CommitId& commit_id) {
return batch_->Delete(handler, UnsyncedCommitRow::GetKeyFor(commit_id));
}
Status PageDbBatchImpl::MarkCommitIdUnsynced(CoroutineHandler* handler,
const CommitId& commit_id,
uint64_t generation) {
return batch_->Put(handler, UnsyncedCommitRow::GetKeyFor(commit_id),
SerializeData(generation));
}
Status PageDbBatchImpl::SetSyncMetadata(CoroutineHandler* handler,
fxl::StringView key,
fxl::StringView value) {
return batch_->Put(handler, SyncMetadataRow::GetKeyFor(key), value);
}
Status PageDbBatchImpl::MarkPageOnline(coroutine::CoroutineHandler* handler) {
return batch_->Put(handler, PageIsOnlineRow::kKey, "");
}
Status PageDbBatchImpl::Execute(CoroutineHandler* handler) {
return batch_->Execute(handler);
}
Status PageDbBatchImpl::DCheckHasObject(CoroutineHandler* handler,
const ObjectDigest& key) {
#ifdef NDEBUG
return Status::OK;
#else
bool result;
Status status = db_->HasObject(handler, key, &result);
if (status == Status::INTERRUPTED) {
return status;
}
FXL_DCHECK(status == Status::OK && result);
return Status::OK;
#endif
}
} // namespace storage