blob: 9b0a3d2209d542891d181334d8181716e33e4aed [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/storage/fake/fake_page_storage.h"
#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/zx/time.h>
#include <string>
#include <utility>
#include <vector>
#include "src/ledger/bin/encryption/primitives/hash.h"
#include "src/ledger/bin/storage/fake/fake_commit.h"
#include "src/ledger/bin/storage/fake/fake_journal.h"
#include "src/ledger/bin/storage/fake/fake_object.h"
#include "src/ledger/bin/storage/public/constants.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/bin/storage/testing/commit_empty_impl.h"
#include "src/lib/fxl/logging.h"
#include "src/lib/fxl/strings/concatenate.h"
namespace storage {
namespace fake {
namespace {
Status ToBuffer(convert::ExtendedStringView value, int64_t offset,
int64_t max_size, fsl::SizedVmo* buffer) {
size_t start = value.size();
// Valid indices are between -N and N-1.
if (offset >= -static_cast<int64_t>(value.size()) &&
offset < static_cast<int64_t>(value.size())) {
start = offset < 0 ? value.size() + offset : offset;
}
size_t length = max_size < 0 ? value.size() : max_size;
bool result = fsl::VmoFromString(value.substr(start, length), buffer);
return result ? Status::OK : Status::INTERNAL_ERROR;
}
class FakeRootCommit : public CommitEmptyImpl {
public:
FakeRootCommit() : id_(storage::kFirstPageCommitId.ToString()) {}
std::unique_ptr<const Commit> Clone() const override {
return std::make_unique<const FakeRootCommit>();
}
const CommitId& GetId() const override { return id_; }
std::vector<CommitIdView> GetParentIds() const override {
return std::vector<CommitIdView>();
}
zx::time_utc GetTimestamp() const override { return zx::time_utc(); }
uint64_t GetGeneration() const override { return 0; }
private:
const CommitId id_;
};
} // namespace
FakePageStorage::FakePageStorage(ledger::Environment* environment,
PageId page_id)
: page_id_(std::move(page_id)),
environment_(environment),
encryption_service_(environment_->dispatcher()) {}
FakePageStorage::~FakePageStorage() {}
PageId FakePageStorage::GetId() { return page_id_; }
Status FakePageStorage::GetHeadCommits(
std::vector<std::unique_ptr<const Commit>>* head_commits) {
std::vector<std::pair<CommitId, zx::time_utc>> heads(heads_.begin(),
heads_.end());
std::sort(heads.begin(), heads.end(), [](const auto& p1, const auto& p2) {
return std::tie(p1.second, p1.first) < std::tie(p2.second, p2.first);
});
std::vector<std::unique_ptr<const Commit>> commits;
commits.reserve(heads.size());
for (const auto& [commit_id, _] : heads) {
commits.push_back(std::make_unique<FakeCommit>(journals_[commit_id].get()));
}
if (commits.empty()) {
commits.push_back(std::make_unique<const FakeRootCommit>());
}
head_commits->swap(commits);
return Status::OK;
}
void FakePageStorage::GetMergeCommitIds(
CommitIdView parent1_id, CommitIdView parent2_id,
fit::function<void(Status, std::vector<CommitId>)> callback) {
auto [parent_min_id, parent_max_id] = std::minmax(parent1_id, parent2_id);
auto it = merges_.find(
std::make_pair(parent_min_id.ToString(), parent_max_id.ToString()));
auto parents = it != merges_.end() ? it->second : std::vector<CommitId>{};
callback(Status::OK, std::move(parents));
}
void FakePageStorage::GetCommit(
CommitIdView commit_id,
fit::function<void(Status, std::unique_ptr<const Commit>)> callback) {
if (commit_id == storage::kFirstPageCommitId) {
callback(Status::OK, std::make_unique<const FakeRootCommit>());
return;
}
auto it = journals_.find(commit_id.ToString());
if (it == journals_.end()) {
callback(Status::INTERNAL_NOT_FOUND, nullptr);
return;
}
async::PostTask(
environment_->dispatcher(),
[this, commit_id = commit_id.ToString(), callback = std::move(callback)] {
callback(Status::OK,
std::make_unique<FakeCommit>(journals_[commit_id].get()));
});
}
std::unique_ptr<Journal> FakePageStorage::StartCommit(
std::unique_ptr<const Commit> commit) {
CommitId commit_id = commit->GetId();
uint64_t next_generation = 0;
FakeJournalDelegate::Data data;
if (journals_.find(commit_id) != journals_.end()) {
next_generation = journals_[commit_id].get()->GetGeneration() + 1;
data = journals_[commit_id].get()->GetData();
}
auto delegate = std::make_unique<FakeJournalDelegate>(
environment_->random(), std::move(data), commit_id, autocommit_,
next_generation);
auto journal = std::make_unique<FakeJournal>(delegate.get());
journals_[delegate->GetId()] = std::move(delegate);
return journal;
}
std::unique_ptr<Journal> FakePageStorage::StartMergeCommit(
std::unique_ptr<const Commit> left, std::unique_ptr<const Commit> right) {
const CommitId& left_id = left->GetId();
const CommitId& right_id = right->GetId();
auto delegate = std::make_unique<FakeJournalDelegate>(
environment_->random(), journals_[left_id].get()->GetData(), left_id,
right_id, autocommit_,
1 + std::max(journals_[left_id].get()->GetGeneration(),
journals_[right_id].get()->GetGeneration()));
auto journal = std::make_unique<FakeJournal>(delegate.get());
journals_[delegate->GetId()] = std::move(delegate);
return journal;
}
void FakePageStorage::CommitJournal(
std::unique_ptr<Journal> journal,
fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
static_cast<FakeJournal*>(journal.get())
->Commit([this, callback = std::move(callback)](
Status status,
std::unique_ptr<const storage::Commit> commit) {
std::vector<storage::CommitIdView> parent_ids = commit->GetParentIds();
if (parent_ids.size() == 2) {
merges_[std::minmax(parent_ids[0].ToString(),
parent_ids[1].ToString())]
.push_back(commit->GetId());
}
for (const storage::CommitIdView& parent_id : parent_ids) {
auto it = heads_.find(parent_id.ToString());
if (it != heads_.end()) {
heads_.erase(it);
}
}
heads_.emplace(commit->GetId(), commit->GetTimestamp());
if (!drop_commit_notifications_) {
for (CommitWatcher* watcher : watchers_) {
async::PostTask(
environment_->dispatcher(),
[this, watcher, commit = commit->Clone()]() mutable {
// Check that watcher was not unregistered.
if (watchers_.find(watcher) == watchers_.end()) {
return;
}
std::vector<std::unique_ptr<const Commit>> commits;
commits.push_back(std::move(commit));
watcher->OnNewCommits(commits, ChangeSource::LOCAL);
});
}
}
callback(status, std::move(commit));
});
}
void FakePageStorage::AddCommitWatcher(CommitWatcher* watcher) {
watchers_.emplace(watcher);
}
void FakePageStorage::RemoveCommitWatcher(CommitWatcher* watcher) {
auto it = watchers_.find(watcher);
if (it != watchers_.end()) {
watchers_.erase(it);
}
}
void FakePageStorage::IsSynced(fit::function<void(Status, bool)> callback) {
callback(Status::OK, is_synced_);
}
void FakePageStorage::AddObjectFromLocal(
ObjectType /*object_type*/, std::unique_ptr<DataSource> data_source,
ObjectReferencesAndPriority tree_references,
fit::function<void(Status, ObjectIdentifier)> callback) {
auto value = std::make_unique<std::string>();
auto data_source_ptr = data_source.get();
data_source_ptr->Get([this, data_source = std::move(data_source),
value = std::move(value),
tree_references = std::move(tree_references),
callback = std::move(callback)](
std::unique_ptr<DataSource::DataChunk> chunk,
DataSource::Status status) mutable {
if (status == DataSource::Status::ERROR) {
callback(Status::IO_ERROR, {});
return;
}
auto view = chunk->Get();
value->append(view.data(), view.size());
if (status == DataSource::Status::DONE) {
ObjectIdentifier object_identifier =
encryption_service_.MakeObjectIdentifier(FakeDigest(*value));
objects_[object_identifier] = std::move(*value);
references_[object_identifier.object_digest()] =
std::move(tree_references);
callback(Status::OK, std::move(object_identifier));
}
});
}
void FakePageStorage::GetObject(
ObjectIdentifier object_identifier, Location /*location*/,
fit::function<void(Status, std::unique_ptr<const Object>)> callback) {
GetPiece(object_identifier,
[callback = std::move(callback)](
Status status, std::unique_ptr<const Piece> piece) {
return callback(
status, piece == nullptr
? nullptr
: std::make_unique<FakeObject>(std::move(piece)));
});
}
void FakePageStorage::GetObjectPart(
ObjectIdentifier object_identifier, int64_t offset, int64_t max_size,
Location location, fit::function<void(Status, fsl::SizedVmo)> callback) {
GetPiece(object_identifier,
[offset, max_size, callback = std::move(callback)](
Status status, std::unique_ptr<const Piece> piece) {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
fxl::StringView data = piece->GetData();
fsl::SizedVmo buffer;
Status buffer_status = ToBuffer(data, offset, max_size, &buffer);
if (buffer_status != Status::OK) {
callback(buffer_status, nullptr);
return;
}
callback(Status::OK, std::move(buffer));
});
}
void FakePageStorage::GetPiece(
ObjectIdentifier object_identifier,
fit::function<void(Status, std::unique_ptr<const Piece>)> callback) {
object_requests_.emplace_back(
[this, object_identifier = std::move(object_identifier),
callback = std::move(callback)] {
auto it = objects_.find(object_identifier);
if (it == objects_.end()) {
callback(Status::INTERNAL_NOT_FOUND, nullptr);
return;
}
callback(Status::OK,
std::make_unique<FakePiece>(object_identifier, it->second));
});
async::PostDelayedTask(
environment_->dispatcher(), [this] { SendNextObject(); },
kFakePageStorageDelay);
}
void FakePageStorage::GetCommitContents(const Commit& commit,
std::string min_key,
fit::function<bool(Entry)> on_next,
fit::function<void(Status)> on_done) {
FakeJournalDelegate* journal = journals_[commit.GetId()].get();
if (!journal) {
on_done(Status::INTERNAL_NOT_FOUND);
return;
}
for (auto it = journal->GetData().lower_bound(min_key);
it != journal->GetData().end(); ++it) {
if (!on_next(it->second)) {
break;
}
}
on_done(Status::OK);
}
void FakePageStorage::GetEntryFromCommit(
const Commit& commit, std::string key,
fit::function<void(Status, Entry)> callback) {
FakeJournalDelegate* journal = journals_[commit.GetId()].get();
if (!journal) {
callback(Status::INTERNAL_NOT_FOUND, Entry());
return;
}
const fake::FakeJournalDelegate::Data& data = journal->GetData();
auto it = data.find(key);
if (it == data.end()) {
callback(Status::INTERNAL_NOT_FOUND, Entry());
return;
}
callback(Status::OK, it->second);
}
const std::map<std::string, std::unique_ptr<FakeJournalDelegate>>&
FakePageStorage::GetJournals() const {
return journals_;
}
const std::map<ObjectIdentifier, std::string>& FakePageStorage::GetObjects()
const {
return objects_;
}
const std::map<ObjectDigest, ObjectReferencesAndPriority>&
FakePageStorage::GetReferences() const {
return references_;
}
ObjectDigest FakePageStorage::FakeDigest(fxl::StringView value) const {
// Builds a fake ObjectDigest by computing the hash of |value|, and prefixes
// it with 0xFACEFEED to intentionally make it longer than real object
// digests, start with a 1 bit, and easy to spot in logs. This is incompatible
// with real object digests, but is enough for a fake because all clients of
// the fake should treat object digests as opaque blobs.
return ObjectDigest(fxl::Concatenate(
{"\xFA\xCE\xFE\xED", encryption::SHA256WithLengthHash(value)}));
}
void FakePageStorage::SendNextObject() {
auto rng = environment_->random()->NewBitGenerator<uint64_t>();
std::uniform_int_distribution<size_t> distribution(
0u, object_requests_.size() - 1);
auto it = object_requests_.begin() + distribution(rng);
auto closure = std::move(*it);
object_requests_.erase(it);
closure();
}
void FakePageStorage::DeleteObjectFromLocal(
const ObjectIdentifier& object_identifier) {
objects_.erase(object_identifier);
}
void FakePageStorage::SetDropCommitNotifications(bool drop) {
drop_commit_notifications_ = drop;
}
} // namespace fake
} // namespace storage