| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/app/diff_utils.h" |
| |
| #include <lib/fidl/cpp/optional.h> |
| #include <lib/fit/function.h> |
| |
| #include <limits> |
| #include <memory> |
| #include <vector> |
| |
| #include "src/ledger/bin/app/fidl/serialization_size.h" |
| #include "src/ledger/bin/app/page_utils.h" |
| #include "src/ledger/bin/storage/public/object.h" |
| #include "src/ledger/lib/callback/waiter.h" |
| #include "src/ledger/lib/logging/logging.h" |
| #include "src/ledger/lib/memory/ref_ptr.h" |
| #include "src/ledger/lib/util/ptr.h" |
| #include "src/ledger/lib/vmo/sized_vmo.h" |
| #include "src/ledger/lib/vmo/strings.h" |
| |
| namespace ledger { |
| namespace diff_utils { |
| namespace { |
| // Returns the key of a storage::ThreeWayChange object. This key is guaranteed |
| // to be unique. |
| const std::string& GetKey(const storage::ThreeWayChange& change) { |
| if (change.base) { |
| return change.base->key; |
| } |
| if (change.left) { |
| return change.left->key; |
| } |
| return change.right->key; |
| } |
| |
| // Constructs a ValuePtr object from an entry. The contents of the ValuePtr will |
| // be provided through the |waiter|. |
| ValuePtr GetValueFromEntry(storage::PageStorage* const storage, |
| const std::unique_ptr<storage::Entry>& entry, |
| fit::function<void(Status, SizedVmo)> callback) { |
| if (!entry) { |
| callback(Status::OK, SizedVmo()); |
| return nullptr; |
| } |
| ValuePtr value = Value::New(); |
| switch (entry->priority) { |
| case storage::KeyPriority::EAGER: |
| value->priority = Priority::EAGER; |
| break; |
| case storage::KeyPriority::LAZY: |
| value->priority = Priority::LAZY; |
| break; |
| } |
| storage->GetObjectPart(entry->object_identifier, 0u, std::numeric_limits<int64_t>::max(), |
| storage::PageStorage::Location::Local(), std::move(callback)); |
| return value; |
| } |
| |
| // Retrieves a value of an entry from storage. Returns null instead of the value |
| // if the entry has LAZY priority and the value is not on the device. |
| void GetOptionalValueFromReference( |
| storage::PageStorage* storage, const storage::ObjectIdentifier& object_identifier, |
| Priority priority, |
| fit::function<void(Status, std::unique_ptr<fuchsia::mem::Buffer>)> callback) { |
| storage->GetObjectPart( |
| object_identifier, 0u, std::numeric_limits<int64_t>::max(), |
| storage::PageStorage::Location::Local(), |
| [priority, callback = std::move(callback)](Status status, SizedVmo vmo) { |
| if ((status == Status::INTERNAL_NOT_FOUND) && (priority == Priority::LAZY)) { |
| callback(Status::OK, nullptr); |
| return; |
| } |
| callback(status, fidl::MakeOptional(std::move(vmo).ToTransport())); |
| }); |
| } |
| |
| // Returns true if the change is automatically mergeable, ie. is not |
| // conflicting. |
| bool IsMergeable(const storage::ThreeWayChange& change) { |
| return EqualPtr(change.base, change.left) || EqualPtr(change.base, change.right) || |
| EqualPtr(change.left, change.right); |
| } |
| } // namespace |
| |
| void ComputePageChange( |
| storage::PageStorage* storage, const storage::Commit& base, const storage::Commit& other, |
| std::string prefix_key, std::string min_key, PaginationBehavior pagination_behavior, |
| fit::function<void(Status, std::pair<PageChangePtr, std::string>)> callback) { |
| struct Context { |
| // The PageChangePtr to be returned through the callback. |
| PageChangePtr page_change = PageChange::New(); |
| // The serialization size of all entries. |
| size_t fidl_size = fidl_serialization::kPageChangeHeaderSize; |
| // The number of handles. |
| size_t handles_count = 0u; |
| // The next token to be returned through the callback. |
| std::string next_token = ""; |
| }; |
| |
| auto waiter = MakeRefCounted<Waiter<Status, std::unique_ptr<fuchsia::mem::Buffer>>>(Status::OK); |
| |
| auto context = std::make_unique<Context>(); |
| context->page_change->timestamp = other.GetTimestamp().get(); |
| context->page_change->changed_entries.resize(0); |
| context->page_change->deleted_keys.resize(0); |
| |
| if (min_key < prefix_key) { |
| min_key = prefix_key; |
| } |
| |
| // |on_next| is called for each change on the diff |
| auto on_next = [storage, waiter, prefix_key = std::move(prefix_key), context = context.get(), |
| pagination_behavior](const storage::EntryChange& change) { |
| if (!PageUtils::MatchesPrefix(change.entry.key, prefix_key)) { |
| return false; |
| } |
| size_t entry_size = change.deleted |
| ? fidl_serialization::GetByteVectorSize(change.entry.key.size()) |
| : fidl_serialization::GetEntrySize(change.entry.key.size()); |
| size_t entry_handle_count = change.deleted ? 0 : 1; |
| if (pagination_behavior == PaginationBehavior::BY_SIZE && |
| (context->fidl_size + entry_size > fidl_serialization::kMaxInlineDataSize || |
| context->handles_count + entry_handle_count > fidl_serialization::kMaxMessageHandles |
| |
| )) { |
| context->next_token = change.entry.key; |
| return false; |
| } |
| context->fidl_size += entry_size; |
| context->handles_count += entry_handle_count; |
| if (change.deleted) { |
| context->page_change->deleted_keys.push_back(convert::ToArray(change.entry.key)); |
| return true; |
| } |
| |
| Entry entry; |
| entry.key = convert::ToArray(change.entry.key); |
| Priority priority{change.entry.priority == storage::KeyPriority::EAGER ? Priority::EAGER |
| : Priority::LAZY}; |
| entry.priority = priority; |
| context->page_change->changed_entries.push_back(std::move(entry)); |
| GetOptionalValueFromReference(storage, change.entry.object_identifier, priority, |
| waiter->NewCallback()); |
| return true; |
| }; |
| |
| // |on_done| is called when the full diff is computed. |
| auto on_done = [waiter = std::move(waiter), context = std::move(context), |
| callback = std::move(callback)](Status status) mutable { |
| if (status != Status::OK) { |
| LEDGER_LOG(ERROR) << "Unable to compute diff for PageChange: " << fidl::ToUnderlying(status); |
| callback(status, std::make_pair(nullptr, "")); |
| return; |
| } |
| if (context->page_change->changed_entries.empty()) { |
| if (context->page_change->deleted_keys.empty()) { |
| callback(Status::OK, std::make_pair(nullptr, "")); |
| } else { |
| callback(Status::OK, std::make_pair(std::move(context->page_change), "")); |
| } |
| return; |
| } |
| |
| // We need to retrieve the values for each changed key/value pair in order |
| // to send it inside the PageChange object. |waiter| collates these |
| // asynchronous calls and |result_callback| processes them. |
| auto result_callback = [context = std::move(context), callback = std::move(callback)]( |
| Status status, |
| std::vector<std::unique_ptr<fuchsia::mem::Buffer>> results) mutable { |
| if (status != Status::OK) { |
| LEDGER_LOG(ERROR) << "Error while reading changed values when computing PageChange: " |
| << fidl::ToUnderlying(status); |
| callback(status, std::make_pair(nullptr, "")); |
| return; |
| } |
| LEDGER_DCHECK(results.size() == context->page_change->changed_entries.size()); |
| for (size_t i = 0; i < results.size(); i++) { |
| if (results[i]) { |
| LEDGER_DCHECK(results[i]->vmo); |
| context->page_change->changed_entries.at(i).value = std::move(results[i]); |
| } |
| } |
| callback(Status::OK, |
| std::make_pair(std::move(context->page_change), std::move(context->next_token))); |
| }; |
| waiter->Finalize(std::move(result_callback)); |
| }; |
| storage->GetCommitContentsDiff(base, other, std::move(min_key), std::move(on_next), |
| std::move(on_done)); |
| } |
| |
| void ComputeThreeWayDiff( |
| storage::PageStorage* storage, const storage::Commit& base, const storage::Commit& left, |
| const storage::Commit& right, std::string prefix_key, std::string min_key, DiffType diff_type, |
| fit::function<void(Status, std::pair<std::vector<DiffEntry>, std::string>)> callback) { |
| struct Context { |
| // The array to be returned through the callback. |
| std::vector<DiffEntry> changes; |
| // The serialization size of all entries. |
| size_t fidl_size = fidl_serialization::kVectorHeaderSize; |
| // The number of handles. |
| size_t handles_count = 0u; |
| // The next token to be returned through the callback. |
| std::string next_token = ""; |
| }; |
| |
| // This waiter collects the values (as VMOs) for all changes that will be |
| // returned. As each |DiffEntry| struct has three values, we ensure that |
| // values are always returned in a specific order (base, left, right). Some |
| // values may be empty, to denote a lack of diff. |
| auto waiter = MakeRefCounted<Waiter<Status, SizedVmo>>(Status::OK); |
| |
| auto context = std::make_unique<Context>(); |
| |
| if (min_key < prefix_key) { |
| min_key = prefix_key; |
| } |
| |
| // |on_next| is called for each change on the diff |
| auto on_next = [storage, waiter, prefix_key = std::move(prefix_key), context = context.get(), |
| diff_type](storage::ThreeWayChange change) mutable { |
| const std::string& key = GetKey(change); |
| if (!PageUtils::MatchesPrefix(key, prefix_key)) { |
| return false; |
| } |
| int number_of_values = bool(change.base) + bool(change.left) + bool(change.right); |
| size_t diffentry_size = fidl_serialization::GetDiffEntrySize(key.size(), number_of_values); |
| if (context->fidl_size + diffentry_size > fidl_serialization::kMaxInlineDataSize || |
| context->handles_count + number_of_values > fidl_serialization::kMaxMessageHandles) { |
| context->next_token = key; |
| // Stop the iteration as we are over the message capacity. |
| return false; |
| } |
| |
| if (diff_type == DiffType::CONFLICTING && IsMergeable(change)) { |
| // We are not interested in this change, continue to the next one. |
| return true; |
| } |
| |
| context->fidl_size += diffentry_size; |
| context->handles_count += number_of_values; |
| |
| DiffEntry diff_entry; |
| diff_entry.key = convert::ToArray(key); |
| diff_entry.base = GetValueFromEntry(storage, change.base, waiter->NewCallback()); |
| diff_entry.left = GetValueFromEntry(storage, change.left, waiter->NewCallback()); |
| diff_entry.right = GetValueFromEntry(storage, change.right, waiter->NewCallback()); |
| context->changes.push_back(std::move(diff_entry)); |
| return true; |
| }; |
| |
| // |on_done| is called when the full diff is computed. |
| auto on_done = [waiter = std::move(waiter), context = std::move(context), |
| callback = std::move(callback)](Status status) mutable { |
| if (status != Status::OK) { |
| LEDGER_LOG(ERROR) << "Unable to compute diff for PageChange: " << status; |
| callback(status, std::make_pair(std::vector<DiffEntry>(), "")); |
| return; |
| } |
| if (context->changes.empty()) { |
| callback(Status::OK, std::make_pair(std::vector<DiffEntry>(), "")); |
| return; |
| } |
| |
| // We need to retrieve the values for each changed key/value pair in order |
| // to send it inside the PageChange object. |waiter| collates these |
| // asynchronous calls and |result_callback| processes them. |
| auto result_callback = [context = std::move(context), callback = std::move(callback)]( |
| Status status, std::vector<SizedVmo> results) mutable { |
| if (status != Status::OK) { |
| LEDGER_LOG(ERROR) << "Error while reading changed values when computing PageChange: " |
| << fidl::ToUnderlying(status); |
| callback(status, std::make_pair(std::vector<DiffEntry>(), "")); |
| return; |
| } |
| LEDGER_DCHECK(results.size() == 3 * context->changes.size()); |
| for (size_t i = 0; i < context->changes.size(); i++) { |
| if (results[3 * i]) { |
| context->changes.at(i).base->value = |
| fidl::MakeOptional(std::move(results[3 * i]).ToTransport()); |
| } |
| if (results[3 * i + 1]) { |
| context->changes.at(i).left->value = |
| fidl::MakeOptional(std::move(results[3 * i + 1]).ToTransport()); |
| } |
| if (results[3 * i + 2]) { |
| context->changes.at(i).right->value = |
| fidl::MakeOptional(std::move(results[3 * i + 2]).ToTransport()); |
| } |
| } |
| callback(Status::OK, |
| std::make_pair(std::move(context->changes), std::move(context->next_token))); |
| }; |
| waiter->Finalize(std::move(result_callback)); |
| }; |
| storage->GetThreeWayContentsDiff(base, left, right, std::move(min_key), std::move(on_next), |
| std::move(on_done)); |
| } |
| |
| } // namespace diff_utils |
| } // namespace ledger |