blob: f425c042cf827b3cff59e38df94dc73da0be8257 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/storage/impl/page_db_impl.h"
#include <algorithm>
#include <string>
#include <lib/fxl/strings/concatenate.h>
#include "peridot/bin/ledger/storage/impl/data_serialization.h"
#include "peridot/bin/ledger/storage/impl/db_serialization.h"
#include "peridot/bin/ledger/storage/impl/journal_impl.h"
#include "peridot/bin/ledger/storage/impl/object_identifier_encoding.h"
#include "peridot/bin/ledger/storage/impl/object_impl.h"
#include "peridot/bin/ledger/storage/impl/page_db_batch_impl.h"
#include "peridot/lib/convert/convert.h"
#define RETURN_ON_ERROR(expr) \
do { \
Status status = (expr); \
if (status != Status::OK) { \
return status; \
} \
} while (0)
namespace storage {
using coroutine::CoroutineHandler;
namespace {
// Extracts a sorted list of commit its from |entries|. Entries must be a map
// from commit ids to serialized |A|.
template <typename A>
void ExtractSortedCommitsIds(
std::vector<std::pair<std::string, std::string>>* entries,
std::vector<CommitId>* commit_ids) {
std::sort(entries->begin(), entries->end(),
[&](const std::pair<std::string, std::string>& p1,
const std::pair<std::string, std::string>& p2) {
auto t1 = DeserializeData<A>(p1.second);
auto t2 = DeserializeData<A>(p2.second);
if (t1 != t2) {
return t1 < t2;
}
return p1.first < p2.first;
});
commit_ids->clear();
commit_ids->reserve(entries->size());
for (std::pair<std::string, std::string>& entry : *entries) {
commit_ids->push_back(std::move(entry.first));
}
}
class JournalEntryIterator final : public Iterator<const EntryChange> {
public:
explicit JournalEntryIterator(
std::unique_ptr<Iterator<const std::pair<convert::ExtendedStringView,
convert::ExtendedStringView>>>
it)
: it_(std::move(it)) {
PrepareEntry();
}
~JournalEntryIterator() override {}
Iterator<const EntryChange>& Next() override {
it_->Next();
PrepareEntry();
return *this;
}
bool Valid() const override { return it_->Valid(); }
Status GetStatus() const override { return it_->GetStatus(); }
const EntryChange& operator*() const override { return *(change_.get()); }
const EntryChange* operator->() const override { return change_.get(); }
private:
void PrepareEntry() {
if (!JournalEntryIterator::Valid()) {
change_.reset(nullptr);
return;
}
change_ = std::make_unique<EntryChange>();
const std::pair<convert::ExtendedStringView, convert::ExtendedStringView>&
key_value = **it_;
change_->entry.key =
key_value.first.substr(JournalEntryRow::kPrefixSize).ToString();
if (key_value.second[0] == JournalEntryRow::kAddPrefix) {
Status status = JournalEntryRow::ExtractObjectIdentifier(
key_value.second, &change_->entry.object_identifier);
FXL_DCHECK(status == Status::OK);
change_->deleted = false;
change_->entry.priority =
(key_value.second[1] == JournalEntryRow::kLazyPrefix)
? KeyPriority::LAZY
: KeyPriority::EAGER;
} else {
change_->deleted = true;
}
}
std::unique_ptr<Iterator<const std::pair<convert::ExtendedStringView,
convert::ExtendedStringView>>>
it_;
std::unique_ptr<EntryChange> change_;
};
} // namespace
PageDbImpl::PageDbImpl(ledger::Environment* environment, std::unique_ptr<Db> db)
: environment_(environment), db_(std::move(db)) {
FXL_DCHECK(environment_);
FXL_DCHECK(db_);
}
PageDbImpl::~PageDbImpl() {}
Status PageDbImpl::StartBatch(coroutine::CoroutineHandler* handler,
std::unique_ptr<Batch>* batch) {
std::unique_ptr<Db::Batch> db_batch;
RETURN_ON_ERROR(db_->StartBatch(handler, &db_batch));
*batch = std::make_unique<PageDbBatchImpl>(environment_->random(),
std::move(db_batch), this);
return Status::OK;
}
Status PageDbImpl::GetHeads(CoroutineHandler* handler,
std::vector<CommitId>* heads) {
std::vector<std::pair<std::string, std::string>> entries;
RETURN_ON_ERROR(db_->GetEntriesByPrefix(
handler, convert::ToSlice(HeadRow::kPrefix), &entries));
ExtractSortedCommitsIds<zx::time_utc>(&entries, heads);
return Status::OK;
}
Status PageDbImpl::GetCommitStorageBytes(CoroutineHandler* handler,
CommitIdView commit_id,
std::string* storage_bytes) {
return db_->Get(handler, CommitRow::GetKeyFor(commit_id), storage_bytes);
}
Status PageDbImpl::GetImplicitJournalIds(CoroutineHandler* handler,
std::vector<JournalId>* journal_ids) {
return db_->GetByPrefix(handler,
convert::ToSlice(ImplicitJournalMetadataRow::kPrefix),
journal_ids);
}
Status PageDbImpl::GetBaseCommitForJournal(CoroutineHandler* handler,
const JournalId& journal_id,
CommitId* base) {
FXL_DCHECK(journal_id.size() == JournalEntryRow::kJournalIdSize);
FXL_DCHECK(journal_id[0] == JournalEntryRow::kImplicitPrefix);
return db_->Get(handler, ImplicitJournalMetadataRow::GetKeyFor(journal_id),
base);
}
Status PageDbImpl::GetJournalEntries(
CoroutineHandler* handler, const JournalId& journal_id,
std::unique_ptr<Iterator<const EntryChange>>* entries,
JournalContainsClearOperation* contains_clear_operation) {
std::unique_ptr<Iterator<const std::pair<convert::ExtendedStringView,
convert::ExtendedStringView>>>
it;
RETURN_ON_ERROR(db_->GetIteratorAtPrefix(
handler, JournalEntryRow::GetEntriesPrefixFor(journal_id), &it));
bool contains_clear_operation_key;
RETURN_ON_ERROR(db_->HasKey(handler,
JournalEntryRow::GetClearMarkerKey(journal_id),
&contains_clear_operation_key));
*entries = std::make_unique<JournalEntryIterator>(std::move(it));
*contains_clear_operation = contains_clear_operation_key
? JournalContainsClearOperation::YES
: JournalContainsClearOperation::NO;
return Status::OK;
}
Status PageDbImpl::ReadObject(CoroutineHandler* handler,
ObjectIdentifier object_identifier,
std::unique_ptr<const Object>* object) {
return db_->GetObject(handler,
ObjectRow::GetKeyFor(object_identifier.object_digest()),
object_identifier, object);
}
Status PageDbImpl::HasObject(CoroutineHandler* handler,
const ObjectDigest& object_digest,
bool* has_object) {
return db_->HasKey(handler, ObjectRow::GetKeyFor(object_digest), has_object);
}
Status PageDbImpl::GetObjectStatus(CoroutineHandler* handler,
ObjectIdentifier object_identifier,
PageDbObjectStatus* object_status) {
// Check must be done in ascending order of status, so that a change of status
// between 2 reads does not create the case where no key is found.
// That said, the most common expected status is SYNCED, so for performance
// reasons, it is better to check it first.
// By checking it first and then checking all statuses in ascending order we
// both ensure correctness and performant lookup.
// The only case that would generate a spurious lookup is when the status is
// changed concurrently, which is a rare occurence.
for (PageDbObjectStatus possible_status :
{PageDbObjectStatus::SYNCED, PageDbObjectStatus::TRANSIENT,
PageDbObjectStatus::LOCAL, PageDbObjectStatus::SYNCED}) {
bool has_key;
RETURN_ON_ERROR(db_->HasKey(
handler, ObjectStatusRow::GetKeyFor(possible_status, object_identifier),
&has_key));
if (has_key) {
*object_status = possible_status;
return Status::OK;
}
}
*object_status = PageDbObjectStatus::UNKNOWN;
return Status::OK;
}
Status PageDbImpl::GetUnsyncedCommitIds(CoroutineHandler* handler,
std::vector<CommitId>* commit_ids) {
std::vector<std::pair<std::string, std::string>> entries;
RETURN_ON_ERROR(db_->GetEntriesByPrefix(
handler, convert::ToSlice(UnsyncedCommitRow::kPrefix), &entries));
// Unsynced commit row values are the commit's generation.
ExtractSortedCommitsIds<uint64_t>(&entries, commit_ids);
return Status::OK;
}
Status PageDbImpl::IsCommitSynced(CoroutineHandler* handler,
const CommitId& commit_id, bool* is_synced) {
bool has_key;
RETURN_ON_ERROR(
db_->HasKey(handler, UnsyncedCommitRow::GetKeyFor(commit_id), &has_key));
*is_synced = !has_key;
return Status::OK;
}
Status PageDbImpl::GetUnsyncedPieces(
CoroutineHandler* handler,
std::vector<ObjectIdentifier>* object_identifiers) {
std::vector<std::string> encoded_identifiers;
Status status =
db_->GetByPrefix(handler, convert::ToSlice(ObjectStatusRow::kLocalPrefix),
&encoded_identifiers);
if (status != Status::OK) {
return status;
}
object_identifiers->clear();
ObjectIdentifier object_identifier;
for (auto& encoded_identifier : encoded_identifiers) {
if (!DecodeObjectIdentifier(encoded_identifier, &object_identifier)) {
return Status::FORMAT_ERROR;
}
object_identifiers->emplace_back(std::move(object_identifier));
}
return Status::OK;
}
Status PageDbImpl::GetSyncMetadata(CoroutineHandler* handler,
fxl::StringView key, std::string* value) {
return db_->Get(handler, SyncMetadataRow::GetKeyFor(key), value);
}
Status PageDbImpl::IsPageOnline(coroutine::CoroutineHandler* handler,
bool* page_is_online) {
return db_->HasKey(handler, PageIsOnlineRow::kKey, page_is_online);
}
Status PageDbImpl::AddHead(CoroutineHandler* handler, CommitIdView head,
zx::time_utc timestamp) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->AddHead(handler, head, timestamp));
return batch->Execute(handler);
}
Status PageDbImpl::RemoveHead(CoroutineHandler* handler, CommitIdView head) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->RemoveHead(handler, head));
return batch->Execute(handler);
}
Status PageDbImpl::AddCommitStorageBytes(CoroutineHandler* handler,
const CommitId& commit_id,
fxl::StringView storage_bytes) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(
batch->AddCommitStorageBytes(handler, commit_id, storage_bytes));
return batch->Execute(handler);
}
Status PageDbImpl::RemoveCommit(CoroutineHandler* handler,
const CommitId& commit_id) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->RemoveCommit(handler, commit_id));
return batch->Execute(handler);
}
Status PageDbImpl::CreateJournalId(CoroutineHandler* handler,
JournalType journal_type,
const CommitId& base,
JournalId* journal_id) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(
batch->CreateJournalId(handler, journal_type, base, journal_id));
return batch->Execute(handler);
}
Status PageDbImpl::RemoveExplicitJournals(CoroutineHandler* handler) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->RemoveExplicitJournals(handler));
return batch->Execute(handler);
}
Status PageDbImpl::RemoveJournal(CoroutineHandler* handler,
const JournalId& journal_id) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->RemoveJournal(handler, journal_id));
return batch->Execute(handler);
}
Status PageDbImpl::AddJournalEntry(CoroutineHandler* handler,
const JournalId& journal_id,
fxl::StringView key,
const ObjectIdentifier& object_identifier,
KeyPriority priority) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->AddJournalEntry(handler, journal_id, key,
object_identifier, priority));
return batch->Execute(handler);
}
Status PageDbImpl::RemoveJournalEntry(CoroutineHandler* handler,
const JournalId& journal_id,
convert::ExtendedStringView key) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->RemoveJournalEntry(handler, journal_id, key));
return batch->Execute(handler);
}
Status PageDbImpl::EmptyJournalAndMarkContainsClearOperation(
coroutine::CoroutineHandler* handler, const JournalId& journal_id) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(
batch->EmptyJournalAndMarkContainsClearOperation(handler, journal_id));
return batch->Execute(handler);
}
Status PageDbImpl::WriteObject(CoroutineHandler* handler,
ObjectIdentifier object_identifier,
std::unique_ptr<DataSource::DataChunk> content,
PageDbObjectStatus object_status) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->WriteObject(handler, object_identifier,
std::move(content), object_status));
return batch->Execute(handler);
}
Status PageDbImpl::SetObjectStatus(CoroutineHandler* handler,
ObjectIdentifier object_identifier,
PageDbObjectStatus object_status) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(
batch->SetObjectStatus(handler, object_identifier, object_status));
return batch->Execute(handler);
}
Status PageDbImpl::MarkCommitIdSynced(CoroutineHandler* handler,
const CommitId& commit_id) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->MarkCommitIdSynced(handler, commit_id));
return batch->Execute(handler);
}
Status PageDbImpl::MarkCommitIdUnsynced(CoroutineHandler* handler,
const CommitId& commit_id,
uint64_t generation) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->MarkCommitIdUnsynced(handler, commit_id, generation));
return batch->Execute(handler);
}
Status PageDbImpl::SetSyncMetadata(CoroutineHandler* handler,
fxl::StringView key, fxl::StringView value) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->SetSyncMetadata(handler, key, value));
return batch->Execute(handler);
}
Status PageDbImpl::MarkPageOnline(coroutine::CoroutineHandler* handler) {
std::unique_ptr<Batch> batch;
RETURN_ON_ERROR(StartBatch(handler, &batch));
RETURN_ON_ERROR(batch->MarkPageOnline(handler));
return batch->Execute(handler);
}
} // namespace storage