blob: fe2e38ac04b8fb864fac76f7196fe8a9e0856be0 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/app/page_snapshot_impl.h"
#include <algorithm>
#include <functional>
#include <limits>
#include <queue>
#include <vector>
#include <lib/callback/trace_callback.h>
#include <lib/callback/waiter.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/memory/ref_counted.h>
#include <lib/fxl/memory/ref_ptr.h>
#include "peridot/bin/ledger/app/constants.h"
#include "peridot/bin/ledger/app/fidl/serialization_size.h"
#include "peridot/bin/ledger/app/page_utils.h"
#include "peridot/lib/convert/convert.h"
namespace ledger {
namespace {
// Transform a SizedVmo to an optional Buffer. Returns null when
// status is not OK, or a not-null transport otherwise.
fuchsia::mem::BufferPtr ToOptionalTransport(Status status, fsl::SizedVmo vmo) {
if (status != Status::OK) {
return nullptr;
}
return fidl::MakeOptional(std::move(vmo).ToTransport());
}
template <typename EntryType>
EntryType CreateEntry(const storage::Entry& entry) {
EntryType result;
result.key = convert::ToArray(entry.key);
result.priority = entry.priority == storage::KeyPriority::EAGER
? Priority::EAGER
: Priority::LAZY;
return result;
}
// Returns the number of handles used by an entry of the given type. Specialized
// for each entry type.
template <class EntryType>
size_t HandleUsed();
template <>
size_t HandleUsed<Entry>() {
return 1;
}
template <>
size_t HandleUsed<InlinedEntry>() {
return 0;
}
// Computes the size of an Entry.
size_t ComputeEntrySize(const Entry& entry) {
return fidl_serialization::GetEntrySize(entry.key.size());
}
// Computes the size of an InlinedEntry.
size_t ComputeEntrySize(const InlinedEntry& entry) {
return fidl_serialization::GetInlinedEntrySize(entry);
}
// Fills an Entry from the content of object.
storage::Status FillSingleEntry(const storage::Object& object, Entry* entry) {
fsl::SizedVmo vmo;
storage::Status status = object.GetVmo(&vmo);
if (status != storage::Status::OK) {
return status;
}
entry->value = fidl::MakeOptional(std::move(vmo).ToTransport());
return storage::Status::OK;
}
// Fills an InlinedEntry from the content of object.
storage::Status FillSingleEntry(const storage::Object& object,
InlinedEntry* entry) {
fxl::StringView data;
storage::Status status = object.GetData(&data);
if (status != storage::Status::OK) {
return status;
}
entry->inlined_value = std::make_unique<InlinedValue>();
entry->inlined_value->value = convert::ToArray(data);
return storage::Status::OK;
}
// Calls |callback| with filled entries of the provided type per
// GetEntries/GetEntriesInline semantics.
// |fill_value| is a callback that fills the entry pointer with the content of
// the provided object.
template <typename EntryType>
void FillEntries(storage::PageStorage* page_storage,
const std::string& key_prefix, const storage::Commit* commit,
std::vector<uint8_t> key_start,
std::unique_ptr<Token> token,
fit::function<void(Status, std::vector<EntryType>,
std::unique_ptr<Token>)>
callback) {
// |token| represents the first key to be returned in the list of entries.
// Initially, all entries starting from |token| are requested from storage.
// Iteration stops if either all entries were found, or if the estimated
// serialization size of entries exceeds the maximum size of a FIDL message
// (fidl_serialization::kMaxInlineDataSize), or if the number of entries
// exceeds fidl_serialization::kMaxMessageHandles. If inline entries are
// requested, then the actual size of the message is computed as the values
// are added to the entries. This may result in less entries sent than
// initially planned. In the case when not all entries have been sent,
// callback will run with a PARTIAL_RESULT status and a token appropriate for
// resuming the iteration at the right place.
// Represents information shared between on_next and on_done callbacks.
struct Context {
std::vector<EntryType> entries;
// The serialization size of all entries.
size_t size = fidl_serialization::kVectorHeaderSize;
// The number of handles used.
size_t handle_count = 0u;
// If |entries| array size exceeds kMaxInlineDataSize, |next_token| will
// have the value of the following entry's key.
std::unique_ptr<Token> next_token;
};
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_entries");
auto waiter = fxl::MakeRefCounted<callback::Waiter<
storage::Status, std::unique_ptr<const storage::Object>>>(
storage::Status::OK);
auto context = std::make_unique<Context>();
// Use |token| for the first key if present.
std::string start = token
? convert::ToString(token->opaque_id)
: std::max(key_prefix, convert::ToString(key_start));
auto on_next = [page_storage, &key_prefix, context = context.get(),
waiter](storage::Entry entry) {
if (!PageUtils::MatchesPrefix(entry.key, key_prefix)) {
return false;
}
context->size += fidl_serialization::GetEntrySize(entry.key.size());
context->handle_count += HandleUsed<EntryType>();
if ((context->size > fidl_serialization::kMaxInlineDataSize ||
context->handle_count > fidl_serialization::kMaxMessageHandles) &&
!context->entries.empty()) {
context->next_token = std::make_unique<Token>();
context->next_token->opaque_id = convert::ToArray(entry.key);
return false;
}
context->entries.push_back(CreateEntry<EntryType>(entry));
page_storage->GetObject(
entry.object_identifier, storage::PageStorage::Location::LOCAL,
[priority = entry.priority, waiter_callback = waiter->NewCallback()](
storage::Status status,
std::unique_ptr<const storage::Object> object) {
if (status == storage::Status::NOT_FOUND &&
priority == storage::KeyPriority::LAZY) {
waiter_callback(storage::Status::OK, nullptr);
} else {
waiter_callback(status, std::move(object));
}
});
return true;
};
auto on_done = [waiter, context = std::move(context),
callback = std::move(timed_callback)](
storage::Status status) mutable {
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Error while reading: " << status;
callback(Status::IO_ERROR, std::vector<EntryType>(), nullptr);
return;
}
fit::function<void(storage::Status,
std::vector<std::unique_ptr<const storage::Object>>)>
result_callback =
[callback = std::move(callback), context = std::move(context)](
storage::Status status,
std::vector<std::unique_ptr<const storage::Object>>
results) mutable {
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Error while reading: " << status;
callback(Status::IO_ERROR, std::vector<EntryType>(), nullptr);
return;
}
FXL_DCHECK(context->entries.size() == results.size());
size_t real_size = 0;
size_t i = 0;
for (; i < results.size(); i++) {
EntryType& entry = context->entries.at(i);
size_t next_token_size =
i + 1 >= results.size()
? 0
: fidl_serialization::GetByteVectorSize(
context->entries.at(i + 1).key.size());
if (!results[i]) {
size_t entry_size = ComputeEntrySize(entry);
if (real_size + entry_size + next_token_size >
fidl_serialization::kMaxInlineDataSize) {
break;
}
real_size += entry_size;
// We don't have the object locally, but we decided not to
// abort. This means this object is a value of a lazy key and
// the client should ask to retrieve it over the network if
// they need it. Here, we just leave the value part of the
// entry null.
continue;
}
storage::Status read_status =
FillSingleEntry(*results[i], &entry);
if (read_status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(read_status), std::vector<EntryType>(), nullptr);
return;
}
size_t entry_size = ComputeEntrySize(entry);
if (real_size + entry_size + next_token_size >
fidl_serialization::kMaxInlineDataSize) {
break;
}
real_size += entry_size;
}
if (i != results.size()) {
if (i == 0) {
callback(Status::VALUE_TOO_LARGE, std::vector<EntryType>(), nullptr);
return;
}
// We had to bail out early because the result would be too big
// otherwise.
context->next_token = std::make_unique<Token>();
context->next_token->opaque_id =
std::move(context->entries.at(i).key);
context->entries.resize(i);
}
if (context->next_token) {
callback(Status::PARTIAL_RESULT, std::move(context->entries),
std::move(context->next_token));
return;
}
callback(Status::OK, std::move(context->entries), nullptr);
};
waiter->Finalize(std::move(result_callback));
};
page_storage->GetCommitContents(*commit, std::move(start), std::move(on_next),
std::move(on_done));
}
} // namespace
PageSnapshotImpl::PageSnapshotImpl(
storage::PageStorage* page_storage,
std::unique_ptr<const storage::Commit> commit, std::string key_prefix)
: page_storage_(page_storage),
commit_(std::move(commit)),
key_prefix_(std::move(key_prefix)) {}
PageSnapshotImpl::~PageSnapshotImpl() {}
void PageSnapshotImpl::GetEntries(std::vector<uint8_t> key_start,
std::unique_ptr<Token> token,
GetEntriesCallback callback) {
FillEntries<Entry>(page_storage_, key_prefix_, commit_.get(),
std::move(key_start), std::move(token),
std::move(callback));
}
void PageSnapshotImpl::GetEntriesInline(std::vector<uint8_t> key_start,
std::unique_ptr<Token> token,
GetEntriesInlineCallback callback) {
FillEntries<InlinedEntry>(page_storage_, key_prefix_, commit_.get(),
std::move(key_start), std::move(token),
std::move(callback));
}
void PageSnapshotImpl::GetKeys(std::vector<uint8_t> key_start,
std::unique_ptr<Token> token,
GetKeysCallback callback) {
// Represents the information that needs to be shared between on_next and
// on_done callbacks.
struct Context {
// The result of GetKeys. New keys from on_next are appended to this array.
std::vector<std::vector<uint8_t>> keys;
// The total size in number of bytes of the |keys| array.
size_t size = fidl_serialization::kVectorHeaderSize;
// If the |keys| array size exceeds the maximum allowed inlined data size,
// |next_token| will have the value of the next key (not included in array)
// which can be used as the next token.
std::unique_ptr<Token> next_token;
};
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_keys");
auto context = std::make_unique<Context>();
auto on_next = [this, context = context.get()](storage::Entry entry) {
if (!PageUtils::MatchesPrefix(entry.key, key_prefix_)) {
return false;
}
context->size += fidl_serialization::GetByteVectorSize(entry.key.size());
if (context->size > fidl_serialization::kMaxInlineDataSize) {
context->next_token = std::make_unique<Token>();
context->next_token->opaque_id = convert::ToArray(entry.key);
return false;
}
context->keys.push_back(convert::ToArray(entry.key));
return true;
};
auto on_done = [context = std::move(context),
callback =
std::move(timed_callback)](storage::Status status) {
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Error while reading: " << status;
callback(Status::IO_ERROR,
std::vector<std::vector<uint8_t>>(), nullptr);
return;
}
if (context->next_token) {
callback(Status::PARTIAL_RESULT, std::move(context->keys),
std::move(context->next_token));
} else {
callback(Status::OK, std::move(context->keys), nullptr);
}
};
if (token) {
page_storage_->GetCommitContents(*commit_,
convert::ToString(token->opaque_id),
std::move(on_next), std::move(on_done));
} else {
page_storage_->GetCommitContents(
*commit_, std::max(convert::ToString(key_start), key_prefix_),
std::move(on_next), std::move(on_done));
}
}
void PageSnapshotImpl::Get(std::vector<uint8_t> key, GetCallback callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status, Status::KEY_NOT_FOUND),
nullptr);
return;
}
PageUtils::ResolveObjectIdentifierAsBuffer(
page_storage_, entry.object_identifier, 0u,
std::numeric_limits<int64_t>::max(),
storage::PageStorage::Location::LOCAL, Status::NEEDS_FETCH,
[callback = std::move(callback)](Status status,
fsl::SizedVmo data) {
callback(status, ToOptionalTransport(status, std::move(data)));
});
});
}
void PageSnapshotImpl::GetInline(std::vector<uint8_t> key,
GetInlineCallback callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_inline");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status, Status::KEY_NOT_FOUND),
nullptr);
return;
}
PageUtils::ResolveObjectIdentifierAsStringView(
page_storage_, entry.object_identifier,
storage::PageStorage::Location::LOCAL, Status::NEEDS_FETCH,
[callback = std::move(callback)](Status status,
fxl::StringView data_view) {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
if (fidl_serialization::GetByteVectorSize(data_view.size()) +
fidl_serialization::kStatusEnumSize >
fidl_serialization::kMaxInlineDataSize) {
callback(Status::VALUE_TOO_LARGE, nullptr);
return;
}
auto inlined_value = std::make_unique<InlinedValue>();
inlined_value->value = convert::ToArray(data_view);
callback(status, std::move(inlined_value));
});
});
}
void PageSnapshotImpl::Fetch(std::vector<uint8_t> key,
FetchCallback callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_fetch");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status, Status::KEY_NOT_FOUND),
nullptr);
return;
}
PageUtils::ResolveObjectIdentifierAsBuffer(
page_storage_, entry.object_identifier, 0u,
std::numeric_limits<int64_t>::max(),
storage::PageStorage::Location::NETWORK, Status::INTERNAL_ERROR,
[callback = std::move(callback)](Status status,
fsl::SizedVmo data) {
callback(status, ToOptionalTransport(status, std::move(data)));
});
});
}
void PageSnapshotImpl::FetchPartial(std::vector<uint8_t> key,
int64_t offset, int64_t max_size,
FetchPartialCallback callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_fetch_partial");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, offset, max_size, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status, Status::KEY_NOT_FOUND),
nullptr);
return;
}
PageUtils::ResolveObjectIdentifierAsBuffer(
page_storage_, entry.object_identifier, offset, max_size,
storage::PageStorage::Location::NETWORK, Status::INTERNAL_ERROR,
[callback = std::move(callback)](Status status,
fsl::SizedVmo data) {
callback(status, ToOptionalTransport(status, std::move(data)));
});
});
}
} // namespace ledger