blob: 7fba43e7bc4c4c0f3f905a44b3c49e29b38f7e86 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/storage/fake/fake_page_storage.h"
#include <string>
#include <utility>
#include <vector>
#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/strings/concatenate.h>
#include <lib/zx/time.h>
#include "peridot/bin/ledger/encryption/primitives/hash.h"
#include "peridot/bin/ledger/storage/fake/fake_commit.h"
#include "peridot/bin/ledger/storage/fake/fake_journal.h"
#include "peridot/bin/ledger/storage/fake/fake_object.h"
#include "peridot/bin/ledger/storage/public/constants.h"
namespace storage {
namespace fake {
namespace {
Status ToBuffer(convert::ExtendedStringView value, int64_t offset,
int64_t max_size, fsl::SizedVmo* buffer) {
size_t start = value.size();
// Valid indices are between -N and N-1.
if (offset >= -static_cast<int64_t>(value.size()) &&
offset < static_cast<int64_t>(value.size())) {
start = offset < 0 ? value.size() + offset : offset;
}
size_t length = max_size < 0 ? value.size() : max_size;
bool result = fsl::VmoFromString(value.substr(start, length), buffer);
return result ? Status::OK : Status::INTERNAL_IO_ERROR;
}
} // namespace
FakePageStorage::FakePageStorage(ledger::Environment* environment,
PageId page_id)
: page_id_(std::move(page_id)),
environment_(environment),
encryption_service_(environment_->dispatcher()) {}
FakePageStorage::~FakePageStorage() {}
PageId FakePageStorage::GetId() { return page_id_; }
void FakePageStorage::GetHeadCommitIds(
fit::function<void(Status, std::vector<CommitId>)> callback) {
std::vector<CommitId> commit_ids(heads_.begin(), heads_.end());
if (commit_ids.empty()) {
commit_ids.emplace_back();
}
callback(Status::OK, std::move(commit_ids));
}
void FakePageStorage::GetCommit(
CommitIdView commit_id,
fit::function<void(Status, std::unique_ptr<const Commit>)> callback) {
auto it = journals_.find(commit_id.ToString());
if (it == journals_.end()) {
callback(Status::NOT_FOUND, nullptr);
return;
}
async::PostDelayedTask(
environment_->dispatcher(),
[this, commit_id = commit_id.ToString(), callback = std::move(callback)] {
callback(Status::OK,
std::make_unique<FakeCommit>(journals_[commit_id].get()));
},
kFakePageStorageDelay);
}
void FakePageStorage::StartCommit(
const CommitId& commit_id, JournalType /*journal_type*/,
fit::function<void(Status, std::unique_ptr<Journal>)> callback) {
uint64_t next_generation = 0;
FakeJournalDelegate::Data data;
if (journals_.find(commit_id) != journals_.end()) {
next_generation = journals_[commit_id].get()->GetGeneration() + 1;
data = journals_[commit_id].get()->GetData();
}
auto delegate = std::make_unique<FakeJournalDelegate>(
environment_->random(), std::move(data), commit_id, autocommit_,
next_generation);
auto journal = std::make_unique<FakeJournal>(delegate.get());
journals_[delegate->GetId()] = std::move(delegate);
callback(Status::OK, std::move(journal));
}
void FakePageStorage::StartMergeCommit(
const CommitId& left, const CommitId& right,
fit::function<void(Status, std::unique_ptr<Journal>)> callback) {
auto delegate = std::make_unique<FakeJournalDelegate>(
environment_->random(), journals_[left].get()->GetData(), left, right,
autocommit_,
1 + std::max(journals_[left].get()->GetGeneration(),
journals_[right].get()->GetGeneration()));
auto journal = std::make_unique<FakeJournal>(delegate.get());
journals_[delegate->GetId()] = std::move(delegate);
callback(Status::OK, std::move(journal));
}
void FakePageStorage::CommitJournal(
std::unique_ptr<Journal> journal,
fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
static_cast<FakeJournal*>(journal.get())
->Commit([this, callback = std::move(callback)](
Status status,
std::unique_ptr<const storage::Commit> commit) {
for (const storage::CommitIdView& parent_id : commit->GetParentIds()) {
auto it = heads_.find(parent_id.ToString());
if (it != heads_.end()) {
heads_.erase(it);
}
}
heads_.emplace(commit->GetId());
if (!drop_commit_notifications_) {
for (CommitWatcher* watcher : watchers_) {
async::PostTask(
environment_->dispatcher(),
[watcher, commit = commit->Clone()]() mutable {
std::vector<std::unique_ptr<const Commit>> commits;
commits.push_back(std::move(commit));
watcher->OnNewCommits(commits, ChangeSource::LOCAL);
});
}
}
callback(status, std::move(commit));
});
}
void FakePageStorage::RollbackJournal(std::unique_ptr<Journal> journal,
fit::function<void(Status)> callback) {
callback(static_cast<FakeJournal*>(journal.get())->Rollback());
}
Status FakePageStorage::AddCommitWatcher(CommitWatcher* watcher) {
watchers_.emplace(watcher);
return Status::OK;
}
Status FakePageStorage::RemoveCommitWatcher(CommitWatcher* watcher) {
auto it = watchers_.find(watcher);
if (it != watchers_.end()) {
watchers_.erase(it);
}
return Status::OK;
}
void FakePageStorage::IsSynced(fit::function<void(Status, bool)> callback) {
callback(Status::OK, is_synced_);
}
void FakePageStorage::AddObjectFromLocal(
ObjectType /*object_type*/, std::unique_ptr<DataSource> data_source,
fit::function<void(Status, ObjectIdentifier)> callback) {
auto value = std::make_unique<std::string>();
auto data_source_ptr = data_source.get();
data_source_ptr->Get([this, data_source = std::move(data_source),
value = std::move(value),
callback = std::move(callback)](
std::unique_ptr<DataSource::DataChunk> chunk,
DataSource::Status status) mutable {
if (status == DataSource::Status::ERROR) {
callback(Status::IO_ERROR, {});
return;
}
auto view = chunk->Get();
value->append(view.data(), view.size());
if (status == DataSource::Status::DONE) {
ObjectIdentifier object_identifier =
encryption_service_.MakeObjectIdentifier(FakeDigest(*value));
objects_[object_identifier] = std::move(*value);
callback(Status::OK, std::move(object_identifier));
}
});
}
void FakePageStorage::GetObject(
ObjectIdentifier object_identifier, Location /*location*/,
fit::function<void(Status, std::unique_ptr<const Object>)> callback) {
GetPiece(object_identifier, std::move(callback));
}
void FakePageStorage::GetObjectPart(
ObjectIdentifier object_identifier, int64_t offset, int64_t max_size,
Location location, fit::function<void(Status, fsl::SizedVmo)> callback) {
GetPiece(object_identifier,
[offset, max_size, callback = std::move(callback)](
Status status, std::unique_ptr<const Object> piece) {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
fxl::StringView data;
Status data_status = piece->GetData(&data);
if (data_status != Status::OK) {
callback(data_status, nullptr);
return;
}
fsl::SizedVmo buffer;
Status buffer_status = ToBuffer(data, offset, max_size, &buffer);
if (buffer_status != Status::OK) {
callback(buffer_status, nullptr);
return;
}
callback(Status::OK, std::move(buffer));
});
}
void FakePageStorage::GetPiece(
ObjectIdentifier object_identifier,
fit::function<void(Status, std::unique_ptr<const Object>)> callback) {
object_requests_.emplace_back(
[this, object_identifier = std::move(object_identifier),
callback = std::move(callback)] {
auto it = objects_.find(object_identifier);
if (it == objects_.end()) {
callback(Status::NOT_FOUND, nullptr);
return;
}
callback(Status::OK,
std::make_unique<FakeObject>(object_identifier, it->second));
});
async::PostDelayedTask(
environment_->dispatcher(), [this] { SendNextObject(); },
kFakePageStorageDelay);
}
void FakePageStorage::GetCommitContents(const Commit& commit,
std::string min_key,
fit::function<bool(Entry)> on_next,
fit::function<void(Status)> on_done) {
FakeJournalDelegate* journal = journals_[commit.GetId()].get();
if (!journal) {
on_done(Status::NOT_FOUND);
return;
}
for (auto it = journal->GetData().lower_bound(min_key);
it != journal->GetData().end(); ++it) {
if (!on_next(it->second)) {
break;
}
}
on_done(Status::OK);
}
void FakePageStorage::GetEntryFromCommit(
const Commit& commit, std::string key,
fit::function<void(Status, Entry)> callback) {
FakeJournalDelegate* journal = journals_[commit.GetId()].get();
if (!journal) {
callback(Status::NOT_FOUND, Entry());
return;
}
const fake::FakeJournalDelegate::Data& data = journal->GetData();
auto it = data.find(key);
if (it == data.end()) {
callback(Status::NOT_FOUND, Entry());
return;
}
callback(Status::OK, it->second);
}
const std::map<std::string, std::unique_ptr<FakeJournalDelegate>>&
FakePageStorage::GetJournals() const {
return journals_;
}
const std::map<ObjectIdentifier, std::string>& FakePageStorage::GetObjects()
const {
return objects_;
}
ObjectDigest FakePageStorage::FakeDigest(fxl::StringView value) const {
// Builds a fake ObjectDigest by computing the hash of |value|, and prefixes
// it with 0xFACEFEED to intentionally make it longer than real object
// digests, start with a 1 bit, and easy to spot in logs. This is incompatible
// with real object digests, but is enough for a fake because all clients of
// the fake should treat object digests as opaque blobs.
return ObjectDigest(fxl::Concatenate(
{"\xFA\xCE\xFE\xED", encryption::SHA256WithLengthHash(value)}));
}
void FakePageStorage::SendNextObject() {
auto rng = environment_->random()->NewBitGenerator<uint64_t>();
std::uniform_int_distribution<size_t> distribution(
0u, object_requests_.size() - 1);
auto it = object_requests_.begin() + distribution(rng);
auto closure = std::move(*it);
object_requests_.erase(it);
closure();
}
void FakePageStorage::DeleteObjectFromLocal(
const ObjectIdentifier& object_identifier) {
objects_.erase(object_identifier);
}
void FakePageStorage::SetDropCommitNotifications(bool drop) {
drop_commit_notifications_ = drop;
}
} // namespace fake
} // namespace storage