blob: e523afffbc77eafaafed0e579a665b0c3f829392 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/app/page_snapshot_impl.h"
#include <lib/callback/trace_callback.h>
#include <lib/callback/waiter.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <algorithm>
#include <functional>
#include <limits>
#include <queue>
#include <vector>
#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/app/constants.h"
#include "src/ledger/bin/app/fidl/serialization_size.h"
#include "src/ledger/bin/app/page_utils.h"
#include "src/ledger/bin/fidl/include/types.h"
#include "src/lib/fxl/logging.h"
#include "src/lib/fxl/memory/ref_counted.h"
#include "src/lib/fxl/memory/ref_ptr.h"
namespace ledger {
namespace {
template <typename EntryType>
EntryType CreateEntry(const storage::Entry& entry) {
EntryType result;
result.key = convert::ToArray(entry.key);
result.priority = entry.priority == storage::KeyPriority::EAGER
? Priority::EAGER
: Priority::LAZY;
return result;
}
// Returns the number of handles used by an entry of the given type. Specialized
// for each entry type.
template <class EntryType>
size_t HandleUsed();
template <>
size_t HandleUsed<Entry>() {
return 1;
}
template <>
size_t HandleUsed<InlinedEntry>() {
return 0;
}
// Computes the size of an Entry.
size_t ComputeEntrySize(const Entry& entry) {
return fidl_serialization::GetEntrySize(entry.key.size());
}
// Computes the size of an InlinedEntry.
size_t ComputeEntrySize(const InlinedEntry& entry) {
return fidl_serialization::GetInlinedEntrySize(entry);
}
// Fills an Entry from the content of object.
storage::Status FillSingleEntry(const storage::Object& object, Entry* entry) {
fsl::SizedVmo vmo;
storage::Status status = object.GetVmo(&vmo);
if (status != storage::Status::OK) {
return status;
}
entry->value = fidl::MakeOptional(std::move(vmo).ToTransport());
return storage::Status::OK;
}
// Fills an InlinedEntry from the content of object.
storage::Status FillSingleEntry(const storage::Object& object,
InlinedEntry* entry) {
fxl::StringView data;
storage::Status status = object.GetData(&data);
if (status != storage::Status::OK) {
return status;
}
entry->inlined_value = std::make_unique<InlinedValue>();
entry->inlined_value->value = convert::ToArray(data);
return storage::Status::OK;
}
// Calls |callback| with filled entries of the provided type per
// GetEntries/GetEntriesInline semantics.
// |fill_value| is a callback that fills the entry pointer with the content of
// the provided object.
template <typename EntryType>
void FillEntries(
storage::PageStorage* page_storage, const std::string& key_prefix,
const storage::Commit* commit, std::vector<uint8_t> key_start,
std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus, std::vector<EntryType>,
std::unique_ptr<Token>)>
callback) {
// |token| represents the first key to be returned in the list of entries.
// Initially, all entries starting from |token| are requested from storage.
// Iteration stops if either all entries were found, or if the estimated
// serialization size of entries exceeds the maximum size of a FIDL message
// (fidl_serialization::kMaxInlineDataSize), or if the number of entries
// exceeds fidl_serialization::kMaxMessageHandles. If inline entries are
// requested, then the actual size of the message is computed as the values
// are added to the entries. This may result in less entries sent than
// initially planned. In the case when not all entries have been sent,
// callback will run with a PARTIAL_RESULT status and a token appropriate for
// resuming the iteration at the right place.
// Represents information shared between on_next and on_done callbacks.
struct Context {
std::vector<EntryType> entries;
// The serialization size of all entries.
size_t size = fidl_serialization::kVectorHeaderSize;
// The number of handles used.
size_t handle_count = 0u;
// If |entries| array size exceeds kMaxInlineDataSize, |next_token| will
// have the value of the following entry's key.
std::unique_ptr<Token> next_token;
};
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_entries");
auto waiter = fxl::MakeRefCounted<callback::Waiter<
storage::Status, std::unique_ptr<const storage::Object>>>(
storage::Status::OK);
auto context = std::make_unique<Context>();
// Use |token| for the first key if present.
std::string start = token
? convert::ToString(token->opaque_id)
: std::max(key_prefix, convert::ToString(key_start));
auto on_next = [page_storage, &key_prefix, context = context.get(),
waiter](storage::Entry entry) {
if (!PageUtils::MatchesPrefix(entry.key, key_prefix)) {
return false;
}
context->size += fidl_serialization::GetEntrySize(entry.key.size());
context->handle_count += HandleUsed<EntryType>();
if ((context->size > fidl_serialization::kMaxInlineDataSize ||
context->handle_count > fidl_serialization::kMaxMessageHandles) &&
!context->entries.empty()) {
context->next_token = std::make_unique<Token>();
context->next_token->opaque_id = convert::ToArray(entry.key);
return false;
}
context->entries.push_back(CreateEntry<EntryType>(entry));
page_storage->GetObject(
entry.object_identifier, storage::PageStorage::Location::LOCAL,
[priority = entry.priority, waiter_callback = waiter->NewCallback()](
storage::Status status,
std::unique_ptr<const storage::Object> object) {
if (status == storage::Status::INTERNAL_NOT_FOUND &&
priority == storage::KeyPriority::LAZY) {
waiter_callback(storage::Status::OK, nullptr);
} else {
waiter_callback(status, std::move(object));
}
});
return true;
};
auto on_done =
[waiter, context = std::move(context),
callback = std::move(timed_callback)](storage::Status status) mutable {
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Error while reading: " << status;
callback(Status::IO_ERROR, IterationStatus::OK,
std::vector<EntryType>(), nullptr);
return;
}
fit::function<void(storage::Status,
std::vector<std::unique_ptr<const storage::Object>>)>
result_callback =
[callback = std::move(callback), context = std::move(context)](
storage::Status status,
std::vector<std::unique_ptr<const storage::Object>>
results) mutable {
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Error while reading: " << status;
callback(Status::IO_ERROR, IterationStatus::OK,
std::vector<EntryType>(), nullptr);
return;
}
FXL_DCHECK(context->entries.size() == results.size());
size_t real_size = 0;
size_t i = 0;
for (; i < results.size(); i++) {
EntryType& entry = context->entries.at(i);
size_t next_token_size =
i + 1 >= results.size()
? 0
: fidl_serialization::GetByteVectorSize(
context->entries.at(i + 1).key.size());
if (!results[i]) {
size_t entry_size = ComputeEntrySize(entry);
if (real_size + entry_size + next_token_size >
fidl_serialization::kMaxInlineDataSize) {
break;
}
real_size += entry_size;
// We don't have the object locally, but we decided not to
// abort. This means this object is a value of a lazy key
// and the client should ask to retrieve it over the
// network if they need it. Here, we just leave the value
// part of the entry null.
continue;
}
storage::Status read_status =
FillSingleEntry(*results[i], &entry);
if (read_status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(read_status),
IterationStatus::OK, std::vector<EntryType>(),
nullptr);
return;
}
size_t entry_size = ComputeEntrySize(entry);
if (real_size + entry_size + next_token_size >
fidl_serialization::kMaxInlineDataSize) {
break;
}
real_size += entry_size;
}
if (i != results.size()) {
if (i == 0) {
callback(Status::VALUE_TOO_LARGE, IterationStatus::OK,
std::vector<EntryType>(), nullptr);
return;
}
// We had to bail out early because the result would be too
// big otherwise.
context->next_token = std::make_unique<Token>();
context->next_token->opaque_id =
std::move(context->entries.at(i).key);
context->entries.resize(i);
}
if (context->next_token) {
callback(Status::OK, IterationStatus::PARTIAL_RESULT,
std::move(context->entries),
std::move(context->next_token));
return;
}
callback(Status::OK, IterationStatus::OK,
std::move(context->entries), nullptr);
};
waiter->Finalize(std::move(result_callback));
};
page_storage->GetCommitContents(*commit, std::move(start), std::move(on_next),
std::move(on_done));
}
// Adapt callback for the error notifier API.
template <typename... A>
fit::function<void(Status, IterationStatus, A...)> AdaptCallback(
fit::function<void(Status, Status, A...)> callback) {
return [callback = std::move(callback)](
Status status, IterationStatus iteration_status, A... args) {
callback(status,
iteration_status == IterationStatus::OK ? Status::OK
: Status::PARTIAL_RESULT,
std::forward<A>(args)...);
};
}
template <typename Result>
Result ToErrorResult(fuchsia::ledger::Error error) {
Result result;
result.set_err(error);
return result;
}
} // namespace
PageSnapshotImpl::PageSnapshotImpl(
storage::PageStorage* page_storage,
std::unique_ptr<const storage::Commit> commit, std::string key_prefix)
: page_storage_(page_storage),
commit_(std::move(commit)),
key_prefix_(std::move(key_prefix)) {}
PageSnapshotImpl::~PageSnapshotImpl() {}
void PageSnapshotImpl::GetEntries(
std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus, std::vector<Entry>,
std::unique_ptr<Token>)>
callback) {
FillEntries<Entry>(page_storage_, key_prefix_, commit_.get(),
std::move(key_start), std::move(token),
std::move(callback));
}
void PageSnapshotImpl::GetEntriesInline(
std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus, std::vector<InlinedEntry>,
std::unique_ptr<Token>)>
callback) {
FillEntries<InlinedEntry>(page_storage_, key_prefix_, commit_.get(),
std::move(key_start), std::move(token),
std::move(callback));
}
void PageSnapshotImpl::GetKeys(
std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus,
std::vector<std::vector<uint8_t>>,
std::unique_ptr<Token>)>
callback) {
// Represents the information that needs to be shared between on_next and
// on_done callbacks.
struct Context {
// The result of GetKeys. New keys from on_next are appended to this array.
std::vector<std::vector<uint8_t>> keys;
// The total size in number of bytes of the |keys| array.
size_t size = fidl_serialization::kVectorHeaderSize;
// If the |keys| array size exceeds the maximum allowed inlined data size,
// |next_token| will have the value of the next key (not included in array)
// which can be used as the next token.
std::unique_ptr<Token> next_token;
};
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_keys");
auto context = std::make_unique<Context>();
auto on_next = [this, context = context.get()](storage::Entry entry) {
if (!PageUtils::MatchesPrefix(entry.key, key_prefix_)) {
return false;
}
context->size += fidl_serialization::GetByteVectorSize(entry.key.size());
if (context->size > fidl_serialization::kMaxInlineDataSize) {
context->next_token = std::make_unique<Token>();
context->next_token->opaque_id = convert::ToArray(entry.key);
return false;
}
context->keys.push_back(convert::ToArray(entry.key));
return true;
};
auto on_done = [context = std::move(context),
callback =
std::move(timed_callback)](storage::Status status) {
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Error while reading: " << status;
callback(Status::IO_ERROR, IterationStatus::OK,
std::vector<std::vector<uint8_t>>(), nullptr);
return;
}
if (context->next_token) {
callback(Status::OK, IterationStatus::PARTIAL_RESULT,
std::move(context->keys), std::move(context->next_token));
} else {
callback(Status::OK, IterationStatus::OK, std::move(context->keys),
nullptr);
}
};
if (token) {
page_storage_->GetCommitContents(*commit_,
convert::ToString(token->opaque_id),
std::move(on_next), std::move(on_done));
} else {
page_storage_->GetCommitContents(
*commit_, std::max(convert::ToString(key_start), key_prefix_),
std::move(on_next), std::move(on_done));
}
}
void PageSnapshotImpl::Get(
std::vector<uint8_t> key,
fit::function<void(Status, fuchsia::ledger::PageSnapshot_Get_Result)>
callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status == storage::Status::KEY_NOT_FOUND) {
callback(Status::OK,
ToErrorResult<fuchsia::ledger::PageSnapshot_Get_Result>(
fuchsia::ledger::Error::KEY_NOT_FOUND));
return;
}
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status),
fuchsia::ledger::PageSnapshot_Get_Result());
return;
}
PageUtils::ResolveObjectIdentifierAsBuffer(
page_storage_, entry.object_identifier, 0u,
std::numeric_limits<int64_t>::max(),
storage::PageStorage::Location::LOCAL,
[callback = std::move(callback)](storage::Status status,
fsl::SizedVmo data) {
if (status == storage::Status::INTERNAL_NOT_FOUND) {
callback(
Status::OK,
ToErrorResult<fuchsia::ledger::PageSnapshot_Get_Result>(
fuchsia::ledger::Error::NEEDS_FETCH));
return;
}
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status),
fuchsia::ledger::PageSnapshot_Get_Result());
return;
}
fuchsia::ledger::PageSnapshot_Get_Result result;
result.response().buffer = std::move(data).ToTransport();
callback(Status::OK, std::move(result));
});
});
}
void PageSnapshotImpl::GetInline(
std::vector<uint8_t> key,
fit::function<void(Status, fuchsia::ledger::PageSnapshot_GetInline_Result)>
callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_inline");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status == storage::Status::KEY_NOT_FOUND) {
callback(
Status::OK,
ToErrorResult<fuchsia::ledger::PageSnapshot_GetInline_Result>(
fuchsia::ledger::Error::KEY_NOT_FOUND));
return;
}
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status),
fuchsia::ledger::PageSnapshot_GetInline_Result());
return;
}
PageUtils::ResolveObjectIdentifierAsStringView(
page_storage_, entry.object_identifier,
storage::PageStorage::Location::LOCAL,
[callback = std::move(callback)](storage::Status status,
fxl::StringView data_view) {
if (status == storage::Status::INTERNAL_NOT_FOUND) {
callback(Status::OK,
ToErrorResult<
fuchsia::ledger::PageSnapshot_GetInline_Result>(
fuchsia::ledger::Error::NEEDS_FETCH));
return;
}
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status),
fuchsia::ledger::PageSnapshot_GetInline_Result());
return;
}
if (fidl_serialization::GetByteVectorSize(data_view.size()) +
fidl_serialization::kStatusEnumSize >
fidl_serialization::kMaxInlineDataSize) {
callback(Status::VALUE_TOO_LARGE,
fuchsia::ledger::PageSnapshot_GetInline_Result());
return;
}
fuchsia::ledger::PageSnapshot_GetInline_Result result;
result.response().value.value = convert::ToArray(data_view);
callback(Status::OK, std::move(result));
});
});
}
void PageSnapshotImpl::Fetch(
std::vector<uint8_t> key,
fit::function<void(Status, fuchsia::ledger::PageSnapshot_Fetch_Result)>
callback) {
FetchPartial(
std::move(key), 0, -1,
[callback = std::move(callback)](
Status status,
fuchsia::ledger::PageSnapshot_FetchPartial_Result result) {
if (status != Status::OK) {
callback(status, fuchsia::ledger::PageSnapshot_Fetch_Result());
return;
}
fuchsia::ledger::PageSnapshot_Fetch_Result new_result;
if (result.is_err()) {
new_result.set_err(result.err());
} else {
new_result.response().buffer = std::move(result.response().buffer);
}
callback(Status::OK, std::move(new_result));
});
}
void PageSnapshotImpl::FetchPartial(
std::vector<uint8_t> key, int64_t offset, int64_t max_size,
fit::function<void(Status,
fuchsia::ledger::PageSnapshot_FetchPartial_Result)>
callback) {
auto timed_callback =
TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_fetch_partial");
page_storage_->GetEntryFromCommit(
*commit_, convert::ToString(key),
[this, offset, max_size, callback = std::move(timed_callback)](
storage::Status status, storage::Entry entry) mutable {
if (status == storage::Status::KEY_NOT_FOUND) {
callback(
Status::OK,
ToErrorResult<fuchsia::ledger::PageSnapshot_FetchPartial_Result>(
fuchsia::ledger::Error::KEY_NOT_FOUND));
return;
}
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status),
fuchsia::ledger::PageSnapshot_FetchPartial_Result());
return;
}
PageUtils::ResolveObjectIdentifierAsBuffer(
page_storage_, entry.object_identifier, offset, max_size,
storage::PageStorage::Location::NETWORK,
[callback = std::move(callback)](storage::Status status,
fsl::SizedVmo data) {
if (status == storage::Status::NETWORK_ERROR) {
callback(Status::OK,
ToErrorResult<
fuchsia::ledger::PageSnapshot_FetchPartial_Result>(
fuchsia::ledger::Error::NETWORK_ERROR));
return;
}
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status),
fuchsia::ledger::PageSnapshot_FetchPartial_Result());
return;
}
fuchsia::ledger::PageSnapshot_FetchPartial_Result result;
result.response().buffer = std::move(data).ToTransport();
callback(Status::OK, std::move(result));
});
});
}
} // namespace ledger