| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "peridot/bin/sessionmgr/storage/story_storage.h" |
| |
| #include <fuchsia/modular/internal/cpp/fidl.h> |
| #include <lib/fidl/cpp/clone.h> |
| #include <lib/fsl/vmo/strings.h> |
| #include <lib/fxl/functional/make_copyable.h> |
| #include <lib/fxl/strings/string_view.h> |
| |
| #include "peridot/bin/sessionmgr/storage/constants_and_utils.h" |
| #include "peridot/bin/sessionmgr/storage/story_storage_xdr.h" |
| #include "peridot/lib/fidl/clone.h" |
| #include "peridot/lib/ledger_client/operations.h" |
| |
| namespace modular { |
| |
| namespace { |
| |
| // TODO(rosswang): replace with |std::string::starts_with| after C++20 |
| bool StartsWith(const std::string& string, const std::string& prefix) { |
| return string.compare(0, prefix.size(), prefix) == 0; |
| } |
| |
| std::string EntityKeyForCookie(const std::string& cookie) { |
| return kEntityKeyPrefix + cookie; |
| } |
| |
| std::string EntityTypeKeyForCookie(const std::string& cookie) { |
| return kEntityKeyPrefix + cookie + "/type"; |
| } |
| |
| std::string CookieFromEntityKey(const std::string& key) { |
| return key.substr(sizeof(kEntityKeyPrefix) - 1); |
| } |
| |
| } // namespace |
| |
| StoryStorage::StoryStorage(LedgerClient* ledger_client, |
| fuchsia::ledger::PageId page_id) |
| : PageClient("StoryStorage", ledger_client, page_id, "" /* key_prefix */), |
| ledger_client_(ledger_client), |
| page_id_(page_id), |
| weak_ptr_factory_(this) { |
| FXL_DCHECK(ledger_client_ != nullptr); |
| } |
| |
| FuturePtr<> StoryStorage::WriteModuleData(ModuleData module_data) { |
| auto module_path = fidl::Clone(module_data.module_path); |
| return UpdateModuleData( |
| module_path, fxl::MakeCopyable([module_data = std::move(module_data)]( |
| ModuleDataPtr* module_data_ptr) { |
| *module_data_ptr = ModuleData::New(); |
| module_data.Clone(module_data_ptr->get()); |
| })); |
| } |
| |
| namespace { |
| |
| struct UpdateModuleDataState { |
| std::vector<std::string> module_path; |
| std::function<void(ModuleDataPtr*)> mutate_fn; |
| OperationQueue sub_operations; |
| }; |
| |
| } // namespace |
| |
| FuturePtr<> StoryStorage::UpdateModuleData( |
| const std::vector<std::string>& module_path, |
| std::function<void(ModuleDataPtr*)> mutate_fn) { |
| auto op_state = std::make_shared<UpdateModuleDataState>(); |
| op_state->module_path = module_path; |
| op_state->mutate_fn = std::move(mutate_fn); |
| |
| auto key = MakeModuleKey(module_path); |
| auto op_body = [this, op_state, key](OperationBase* op) { |
| auto did_read = |
| Future<ModuleDataPtr>::Create("StoryStorage.UpdateModuleData.did_read"); |
| op_state->sub_operations.Add( |
| new ReadDataCall<ModuleData>(page(), key, true /* not_found_is_ok */, |
| XdrModuleData, did_read->Completer())); |
| |
| auto did_mutate = did_read->AsyncMap( |
| [this, op_state, key](ModuleDataPtr current_module_data) { |
| auto new_module_data = CloneOptional(current_module_data); |
| op_state->mutate_fn(&new_module_data); |
| |
| if (!new_module_data && !current_module_data) { |
| return Future<>::CreateCompleted( |
| "StoryStorage.UpdateModuleData.did_mutate"); |
| } |
| |
| auto module_data_copy = CloneOptional(new_module_data); |
| std::string expected_value; |
| XdrWrite(&expected_value, &module_data_copy, XdrModuleData); |
| |
| if (current_module_data) { |
| FXL_DCHECK(new_module_data) |
| << "StoryStorage::UpdateModuleData(): mutate_fn() must not " |
| "set to null an existing ModuleData record."; |
| |
| // We complete this Future chain when the Ledger gives us the |
| // notification that |module_data| has been written. The Ledger |
| // won't do that if the current value for |key| won't change, so |
| // we have to short-circuit here. |
| // ModuleData contains VMOs, so doing a comparison |
| // *new_module_data == *current_module_data won't be true even if |
| // the data in the VMOs under the hood is the same. When this |
| // happens the ledger won't notify us of any change, since the data |
| // was actually the same. To overcome this we compare the raw |
| // strings. |
| auto current_data_copy = CloneOptional(current_module_data); |
| std::string current_value; |
| XdrWrite(¤t_value, ¤t_data_copy, XdrModuleData); |
| if (current_value == expected_value) { |
| return Future<>::CreateCompleted( |
| "StoryStorage.UpdateModuleData.did_mutate"); |
| } |
| } |
| |
| FXL_DCHECK(new_module_data->module_path == op_state->module_path) |
| << "StorageStorage::UpdateModuleData(path, ...): mutate_fn() " |
| "must set " |
| "ModuleData.module_path to |path|."; |
| |
| op_state->sub_operations.Add(new WriteDataCall<ModuleData>( |
| page(), key, XdrModuleData, std::move(module_data_copy), [] {})); |
| |
| return WaitForWrite(key, expected_value); |
| }); |
| |
| return did_mutate; |
| }; |
| |
| auto ret = Future<>::Create("StoryStorage.UpdateModuleData.ret"); |
| operation_queue_.Add(NewCallbackOperation( |
| "StoryStorage::UpdateModuleData", std::move(op_body), ret->Completer())); |
| return ret; |
| } |
| |
| FuturePtr<ModuleDataPtr> StoryStorage::ReadModuleData( |
| const std::vector<std::string>& module_path) { |
| auto key = MakeModuleKey(module_path); |
| auto ret = Future<ModuleDataPtr>::Create("StoryStorage.ReadModuleData.ret"); |
| operation_queue_.Add( |
| new ReadDataCall<ModuleData>(page(), key, true /* not_found_is_ok */, |
| XdrModuleData, ret->Completer())); |
| return ret; |
| } |
| |
| FuturePtr<std::vector<ModuleData>> StoryStorage::ReadAllModuleData() { |
| auto ret = Future<std::vector<ModuleData>>::Create( |
| "StoryStorage.ReadAllModuleData.ret"); |
| operation_queue_.Add(new ReadAllDataCall<ModuleData>( |
| page(), kModuleKeyPrefix, XdrModuleData, ret->Completer())); |
| return ret; |
| } |
| |
| StoryStorage::LinkWatcherAutoCancel StoryStorage::WatchLink( |
| const LinkPath& link_path, LinkUpdatedCallback callback) { |
| auto it = link_watchers_.emplace(MakeLinkKey(link_path), std::move(callback)); |
| |
| auto auto_remove = [weak_this = GetWeakPtr(), it] { |
| if (!weak_this) |
| return; |
| |
| weak_this->link_watchers_.erase(it); |
| }; |
| |
| return LinkWatcherAutoCancel(std::move(auto_remove)); |
| } |
| |
| namespace { |
| |
| constexpr char kJsonNull[] = "null"; |
| |
| class ReadVmoCall |
| : public Operation<fuchsia::ledger::Status, fuchsia::mem::BufferPtr> { |
| public: |
| ReadVmoCall(PageClient* page_client, fidl::StringPtr key, |
| ResultCall result_call) |
| : Operation("StoryStorage::ReadVmoCall", std::move(result_call)), |
| page_client_(page_client), |
| key_(std::move(key)) {} |
| |
| private: |
| void Run() override { |
| FlowToken flow{this, &status_, &value_}; |
| |
| page_snapshot_ = page_client_->NewSnapshot([this, weak_ptr = GetWeakPtr()] { |
| if (!weak_ptr) { |
| return; |
| } |
| // An error occurred getting the snapshot. Resetting page_snapshot_ |
| // will ensure that the FlowToken it has captured below while waiting for |
| // a connected channel will be destroyed, and the operation will be |
| // complete. |
| status_ = fuchsia::ledger::Status::UNKNOWN_ERROR; |
| page_snapshot_ = fuchsia::ledger::PageSnapshotPtr(); |
| }); |
| |
| page_snapshot_->Get(to_array(key_), |
| [this, flow](fuchsia::ledger::Status status, |
| fuchsia::mem::BufferPtr value) { |
| status_ = status; |
| value_ = std::move(value); |
| }); |
| } |
| |
| // Input parameters. |
| PageClient* const page_client_; |
| const fidl::StringPtr key_; |
| |
| // Intermediate state. |
| fuchsia::ledger::PageSnapshotPtr page_snapshot_; |
| |
| // Return values. |
| fuchsia::ledger::Status status_; |
| fuchsia::mem::BufferPtr value_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(ReadVmoCall); |
| }; |
| |
| // TODO(rosswang): this is a temporary migration helper |
| std::tuple<StoryStorage::Status, fidl::StringPtr> ToLinkValue( |
| fuchsia::ledger::Status ledger_status, |
| fuchsia::mem::BufferPtr ledger_value) { |
| StoryStorage::Status link_value_status = StoryStorage::Status::OK; |
| fidl::StringPtr link_value; |
| |
| switch (ledger_status) { |
| case fuchsia::ledger::Status::KEY_NOT_FOUND: |
| // Leave link_value as a null-initialized StringPtr. |
| break; |
| case fuchsia::ledger::Status::OK: |
| if (!ledger_value) { |
| link_value = kJsonNull; |
| } else { |
| std::string link_value_string; |
| if (fsl::StringFromVmo(*ledger_value, &link_value_string)) { |
| link_value = std::move(link_value_string); |
| } else { |
| FXL_LOG(ERROR) << "VMO could not be copied."; |
| link_value_status = StoryStorage::Status::VMO_COPY_ERROR; |
| } |
| } |
| break; |
| default: |
| FXL_LOG(ERROR) << "PageSnapshot.Get() " |
| << fidl::ToUnderlying(ledger_status); |
| link_value_status = StoryStorage::Status::LEDGER_ERROR; |
| break; |
| } |
| |
| return {link_value_status, link_value}; |
| } |
| |
| } // namespace |
| |
| FuturePtr<StoryStorage::Status, std::string> StoryStorage::GetLinkValue( |
| const LinkPath& link_path) { |
| auto key = MakeLinkKey(link_path); |
| auto ret = Future<fuchsia::ledger::Status, fuchsia::mem::BufferPtr>::Create( |
| "StoryStorage::GetLinkValue " + key); |
| operation_queue_.Add(new ReadVmoCall(this, key, ret->Completer())); |
| |
| return ret->Map(ToLinkValue)->Map([](Status status, fidl::StringPtr value) { |
| return std::make_tuple(status, value ? *value : kJsonNull); |
| }); |
| } |
| |
| namespace { |
| |
| class WriteVmoCall : public Operation<StoryStorage::Status> { |
| public: |
| WriteVmoCall(PageClient* page_client, const std::string& key, |
| fuchsia::mem::Buffer value, ResultCall result_call) |
| : Operation("StoryStorage::WriteVmoCall", std::move(result_call)), |
| page_client_(page_client), |
| key_(key), |
| value_(std::move(value)) {} |
| |
| private: |
| void Run() override { |
| FlowToken flow{this, &status_}; |
| status_ = StoryStorage::Status::OK; |
| |
| page_client_->page()->CreateReferenceFromBuffer( |
| std::move(value_), [this, flow, weak_ptr = GetWeakPtr()]( |
| fuchsia::ledger::Status status, |
| fuchsia::ledger::ReferencePtr reference) { |
| if (weak_ptr) { |
| if (status == fuchsia::ledger::Status::OK) { |
| FXL_DCHECK(reference); |
| PutReference(std::move(reference), flow); |
| } else { |
| FXL_LOG(ERROR) << "StoryStorage.WriteVmoCall " << key_ << " " |
| << " Page.CreateReferenceFromBuffer() " |
| << fidl::ToUnderlying(status); |
| status_ = StoryStorage::Status::LEDGER_ERROR; |
| } |
| } |
| }); |
| } |
| |
| void PutReference(fuchsia::ledger::ReferencePtr reference, FlowToken flow) { |
| page_client_->page()->PutReference( |
| to_array(key_), std::move(*reference), fuchsia::ledger::Priority::EAGER, |
| [this, flow, weak_ptr = GetWeakPtr()](fuchsia::ledger::Status status) { |
| if (weak_ptr && status != fuchsia::ledger::Status::OK) { |
| FXL_LOG(ERROR) << "StoryStorage.WriteVmoCall " << key_ << " " |
| << " Page.PutReference() " |
| << fidl::ToUnderlying(status); |
| status_ = StoryStorage::Status::LEDGER_ERROR; |
| } |
| }); |
| } |
| |
| PageClient* const page_client_; |
| std::string key_; |
| fuchsia::mem::Buffer value_; |
| |
| StoryStorage::Status status_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(WriteVmoCall); |
| }; |
| |
| // Returns the type of the entity with the associated |cookie|. |
| // |
| // If no type is found for the given |cookie|, the returned status will be |
| // |INVALID_ENTITY_COOKIE|. |
| class GetEntityTypeCall |
| : public PageOperation<StoryStorage::Status, std::string> { |
| public: |
| GetEntityTypeCall(PageClient* page_client, const std::string& cookie, |
| ResultCall result_call) |
| : PageOperation("StoryStorage::GetEntityTypeCall", page_client->page(), |
| std::move(result_call)), |
| page_client_(page_client), |
| cookie_(cookie) {} |
| |
| private: |
| void Run() override { |
| FlowToken flow{this, &status_, &type_}; |
| status_ = StoryStorage::Status::OK; |
| |
| operation_queue_.Add(new ReadVmoCall( |
| page_client_, EntityTypeKeyForCookie(cookie_), |
| [this, flow](fuchsia::ledger::Status status, |
| fuchsia::mem::BufferPtr buffer) { |
| if (status == fuchsia::ledger::Status::KEY_NOT_FOUND) { |
| status_ = StoryStorage::Status::INVALID_ENTITY_COOKIE; |
| return; |
| } |
| |
| if (status != fuchsia::ledger::Status::OK) { |
| status_ = StoryStorage::Status::LEDGER_ERROR; |
| return; |
| } |
| if (buffer) { |
| FXL_CHECK(fsl::StringFromVmo(*buffer, &type_)); |
| } |
| })); |
| } |
| |
| PageClient* const page_client_; |
| const std::string cookie_; |
| std::string type_; |
| StoryStorage::Status status_; |
| OperationQueue operation_queue_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(GetEntityTypeCall); |
| }; |
| |
| class GetEntityDataCall |
| : public PageOperation<StoryStorage::Status, fuchsia::mem::BufferPtr> { |
| public: |
| GetEntityDataCall(PageClient* page_client, const std::string& cookie, |
| const std::string& type, ResultCall result_call) |
| : PageOperation("StoryStorage::GetEntityDataCall", page_client->page(), |
| std::move(result_call)), |
| page_client_(page_client), |
| cookie_(cookie), |
| type_(type) {} |
| |
| private: |
| void Run() override { |
| FlowToken flow{this, &status_, &result_}; |
| status_ = StoryStorage::Status::OK; |
| |
| operation_queue_.Add(new GetEntityTypeCall( |
| page_client_, cookie_, |
| [this, flow](StoryStorage::Status status, const std::string& type) { |
| if (status != StoryStorage::Status::OK) { |
| status_ = status; |
| return; |
| } |
| |
| if (type_ != type) { |
| status_ = StoryStorage::Status::INVALID_ENTITY_TYPE; |
| return; |
| } |
| |
| operation_queue_.Add( |
| new ReadVmoCall(page_client_, EntityKeyForCookie(cookie_), |
| [this, flow](fuchsia::ledger::Status status, |
| fuchsia::mem::BufferPtr buffer) { |
| StoryStorage::Status story_status = |
| status == fuchsia::ledger::Status::OK |
| ? StoryStorage::Status::OK |
| : StoryStorage::Status::LEDGER_ERROR; |
| status_ = story_status; |
| result_ = std::move(buffer); |
| })); |
| })); |
| } |
| |
| PageClient* const page_client_; |
| const std::string cookie_; |
| std::string type_; |
| fuchsia::mem::BufferPtr result_ = nullptr; |
| StoryStorage::Status status_; |
| OperationQueue operation_queue_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(GetEntityDataCall); |
| }; |
| |
| // Sets the type and value of the Entity stored under |cookie|. |
| // |
| // The type and value are written to separate keys in a single transaction. |
| // |
| // The |result_call| is wrapped in a function which checks whether or not all |
| // the writes were successfull prior to commiting the transaction. |
| class SetEntityDataCall : public PageOperation<StoryStorage::Status> { |
| public: |
| SetEntityDataCall(PageClient* page_client, const std::string& cookie, |
| const std::string& type, fuchsia::mem::Buffer value, |
| ResultCall result_call) |
| : PageOperation( |
| "StoryStorage::SetEntityDataCall", page_client->page(), |
| [result_call = std::move(result_call), |
| page = page_client->page()](StoryStorage::Status status) { |
| // The result callback is wrapped in order to commit/rollback the |
| // transaction. It's important to note that the operation instance |
| // will be deleted by the time this callback is executed. |
| if (status == StoryStorage::Status::OK) { |
| page->Commit([result_call = std::move(result_call)]( |
| fuchsia::ledger::Status status) { |
| if (status == fuchsia::ledger::Status::OK) { |
| result_call(StoryStorage::Status::OK); |
| } else { |
| result_call(StoryStorage::Status::LEDGER_ERROR); |
| } |
| }); |
| } else { |
| page->Rollback([result_call = std::move(result_call)]( |
| fuchsia::ledger::Status status) {}); |
| result_call(status); |
| } |
| }), |
| page_client_(page_client), |
| cookie_(cookie), |
| type_(type), |
| value_(std::move(value)) {} |
| |
| private: |
| void Run() override { |
| FlowToken flow{this, &status_}; |
| status_ = StoryStorage::Status::OK; |
| // This transaction will be either committed or rolled back in the wrapped |
| // result call. |
| page()->StartTransaction([](fuchsia::ledger::Status status) {}); |
| |
| // First verify the type of the data to write matches the existing entity |
| // type. |
| operation_queue_.Add(new GetEntityTypeCall( |
| page_client_, cookie_, |
| [this, flow](StoryStorage::Status status, const std::string& type) { |
| if (status == StoryStorage::Status::LEDGER_ERROR) { |
| status_ = StoryStorage::Status::LEDGER_ERROR; |
| return; |
| } |
| |
| if (status == StoryStorage::Status::INVALID_ENTITY_COOKIE || |
| type == type_) { |
| // If the type is empty it has never been written before (empty |
| // types are not permitted so will not be written in the first |
| // place). |
| PerformWrites(flow); |
| } else { |
| status_ = StoryStorage::Status::INVALID_ENTITY_TYPE; |
| return; |
| } |
| })); |
| } |
| |
| void PerformWrites(FlowToken flow) { |
| // Write the entity data. |
| WriteVmo(std::move(value_), EntityKeyForCookie(cookie_), flow); |
| |
| // Write the entity type. |
| fuchsia::mem::Buffer type_vmo; |
| FXL_CHECK(fsl::VmoFromString(type_, &type_vmo)); |
| WriteVmo(std::move(type_vmo), EntityTypeKeyForCookie(cookie_), flow); |
| } |
| |
| void WriteVmo(fuchsia::mem::Buffer data, const std::string& key, |
| FlowToken flow) { |
| page()->CreateReferenceFromBuffer( |
| std::move(data), |
| [this, key, flow](fuchsia::ledger::Status status, |
| fuchsia::ledger::ReferencePtr reference) { |
| if (status == fuchsia::ledger::Status::OK) { |
| FXL_DCHECK(reference); |
| PutReference(std::move(reference), key, flow); |
| } else { |
| FXL_LOG(ERROR) << trace_name() << " " |
| << "SetEntityDataCall.CreateReferenceFromBuffer " |
| << key << " " << fidl::ToUnderlying(status); |
| status_ = StoryStorage::Status::LEDGER_ERROR; |
| } |
| }); |
| } |
| |
| void PutReference(fuchsia::ledger::ReferencePtr reference, |
| const std::string& key, FlowToken flow) { |
| page()->PutReference( |
| to_array(key), std::move(*reference), fuchsia::ledger::Priority::EAGER, |
| [this, key, flow](fuchsia::ledger::Status status) { |
| if (status != fuchsia::ledger::Status::OK) { |
| FXL_LOG(ERROR) << trace_name() << " " |
| << "SetEntityDataCall.PutReference " << key << " " |
| << fidl::ToUnderlying(status); |
| status_ = StoryStorage::Status::LEDGER_ERROR; |
| } |
| }); |
| } |
| |
| PageClient* const page_client_; |
| std::string cookie_; |
| std::string type_; |
| fuchsia::mem::Buffer value_; |
| |
| StoryStorage::Status status_; |
| |
| OperationQueue operation_queue_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(SetEntityDataCall); |
| }; |
| |
| // Returns: 1) if a mutation happened, 2) the status and 3) the new value. |
| class UpdateLinkCall |
| : public Operation<bool, StoryStorage::Status, fidl::StringPtr> { |
| public: |
| UpdateLinkCall( |
| PageClient* page_client, std::string key, |
| std::function<void(fidl::StringPtr*)> mutate_fn, |
| std::function<FuturePtr<>(const std::string&, const std::string&)> |
| wait_for_write_fn, |
| ResultCall done) |
| : Operation("StoryStorage::UpdateLinkCall", std::move(done)), |
| page_client_(page_client), |
| key_(std::move(key)), |
| mutate_fn_(std::move(mutate_fn)), |
| wait_for_write_fn_(std::move(wait_for_write_fn)) {} |
| |
| private: |
| void Run() override { |
| FlowToken flow{this, &did_update_, &status_, &new_value_}; |
| |
| operation_queue_.Add( |
| new ReadVmoCall(page_client_, key_, |
| [this, flow](fuchsia::ledger::Status status, |
| fuchsia::mem::BufferPtr current_value) { |
| fidl::StringPtr json_current_value; |
| std::tie(status_, json_current_value) = |
| ToLinkValue(status, std::move(current_value)); |
| |
| if (status_ == StoryStorage::Status::OK) { |
| Mutate(flow, std::move(json_current_value)); |
| } |
| })); |
| } |
| |
| void Mutate(FlowToken flow, fidl::StringPtr current_value) { |
| new_value_ = current_value; |
| mutate_fn_(&new_value_); |
| |
| did_update_ = true; |
| if (new_value_ == current_value) { |
| did_update_ = false; |
| return; |
| } |
| |
| fuchsia::mem::Buffer vmo; |
| if (!new_value_ || fsl::VmoFromString(*new_value_, &vmo)) { |
| operation_queue_.Add(new WriteVmoCall( |
| page_client_, key_, std::move(vmo), |
| [this, flow](StoryStorage::Status status) { |
| status_ = status; |
| |
| // If we succeeded AND we set a new value, we need to wait for |
| // confirmation from the ledger. |
| if (status == StoryStorage::Status::OK && new_value_) { |
| wait_for_write_fn_(key_, new_value_)->Then([this, flow] { |
| Done(true, std::move(status_), std::move(new_value_)); |
| }); |
| } |
| })); |
| } else { |
| FXL_LOG(ERROR) << "VMO could not be copied."; |
| status_ = StoryStorage::Status::VMO_COPY_ERROR; |
| } |
| } |
| |
| // Input parameters. |
| PageClient* const page_client_; |
| const std::string key_; |
| std::function<void(fidl::StringPtr*)> mutate_fn_; |
| std::function<FuturePtr<>(const std::string&, const std::string&)> |
| wait_for_write_fn_; |
| |
| // Operation runtime state. |
| OperationQueue operation_queue_; |
| |
| // Return values. |
| bool did_update_; |
| StoryStorage::Status status_; |
| fidl::StringPtr new_value_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(UpdateLinkCall); |
| }; |
| |
| } // namespace |
| |
| FuturePtr<StoryStorage::Status> StoryStorage::UpdateLinkValue( |
| const LinkPath& link_path, std::function<void(fidl::StringPtr*)> mutate_fn, |
| const void* context) { |
| // nullptr is reserved for updates that came from other instances of |
| // StoryStorage. |
| FXL_DCHECK(context != nullptr) |
| << "StoryStorage::UpdateLinkValue(..., context) of nullptr is reserved."; |
| |
| auto key = MakeLinkKey(link_path); |
| auto did_update = Future<bool, Status, fidl::StringPtr>::Create( |
| "StoryStorage.UpdateLinkValue.did_update"); |
| operation_queue_.Add(new UpdateLinkCall( |
| this, key, std::move(mutate_fn), |
| std::bind(&StoryStorage::WaitForWrite, this, std::placeholders::_1, |
| std::placeholders::_2), |
| did_update->Completer())); |
| |
| // We can't chain this call to the parent future chain because we do |
| // not want it to happen at all in the case of errors. |
| return did_update->WeakMap( |
| GetWeakPtr(), |
| [this, key, context](bool did_update, StoryStorage::Status status, |
| fidl::StringPtr new_value) { |
| // if |new_value| is null, it means we didn't write any new data, even |
| // if |status| == OK. |
| if (status == StoryStorage::Status::OK && did_update) { |
| NotifyLinkWatchers(key, new_value, context); |
| } |
| |
| return status; |
| }); |
| } |
| |
| FuturePtr<StoryStorage::Status> StoryStorage::SetEntityData( |
| const std::string& cookie, const std::string& type, |
| fuchsia::mem::Buffer data) { |
| auto did_update = Future<StoryStorage::Status>::Create( |
| "StoryStorage.CreateEntity.did_update"); |
| if (type.empty()) { |
| did_update->Complete(StoryStorage::Status::INVALID_ENTITY_TYPE); |
| } else if (cookie.empty()) { |
| did_update->Complete(StoryStorage::Status::INVALID_ENTITY_COOKIE); |
| } else { |
| operation_queue_.Add(new SetEntityDataCall( |
| this, cookie, type, std::move(data), did_update->Completer())); |
| } |
| return did_update; |
| } |
| |
| FuturePtr<StoryStorage::Status, std::string> StoryStorage::GetEntityType( |
| const std::string& cookie) { |
| auto did_update = Future<StoryStorage::Status, std::string>::Create( |
| "StoryStorage.GetEntityType.did_update"); |
| operation_queue_.Add( |
| new GetEntityTypeCall(this, cookie, did_update->Completer())); |
| return did_update; |
| } |
| |
| FuturePtr<StoryStorage::Status, fuchsia::mem::BufferPtr> |
| StoryStorage::GetEntityData(const std::string& cookie, |
| const std::string& type) { |
| auto did_update = |
| Future<StoryStorage::Status, fuchsia::mem::BufferPtr>::Create( |
| "StoryStorage.GetEntityData.did_update"); |
| operation_queue_.Add( |
| new GetEntityDataCall(this, cookie, type, did_update->Completer())); |
| return did_update; |
| } |
| |
| void StoryStorage::WatchEntity( |
| const std::string& cookie, const std::string& type, |
| fuchsia::modular::EntityWatcherPtr entity_watcher) { |
| operation_queue_.Add(new GetEntityDataCall( |
| this, cookie, type, |
| fxl::MakeCopyable( |
| [this, cookie, entity_watcher = std::move(entity_watcher)]( |
| StoryStorage::Status status, |
| std::unique_ptr<fuchsia::mem::Buffer> value) mutable { |
| // Send the current value as the initial update. |
| entity_watcher->OnUpdated(std::move(value)); |
| |
| auto existing_watchers_it = entity_watchers_.try_emplace(cookie); |
| existing_watchers_it.first->second.AddInterfacePtr( |
| std::move(entity_watcher)); |
| }))); |
| } |
| |
| FuturePtr<StoryStorage::Status> StoryStorage::SetEntityName( |
| const std::string& cookie, const std::string& entity_name) { |
| auto did_set = Future<StoryStorage::Status>::Create( |
| "StoryStorage.SetEntityName.did_set"); |
| fuchsia::mem::Buffer data; |
| if (fsl::VmoFromString(cookie, &data)) { |
| operation_queue_.Add(new WriteVmoCall(this, kEntityNamePrefix + entity_name, |
| std::move(data), |
| did_set->Completer())); |
| } else { |
| did_set->Complete(StoryStorage::Status::VMO_COPY_ERROR); |
| } |
| return did_set; |
| } |
| |
| FuturePtr<StoryStorage::Status, std::string> |
| StoryStorage::GetEntityCookieForName(const std::string& entity_name) { |
| auto did_get = Future<StoryStorage::Status, std::string>::Create( |
| "StoryStorage.GetEntityName.did_get"); |
| operation_queue_.Add(new ReadVmoCall( |
| this, kEntityNamePrefix + entity_name, |
| [did_get](fuchsia::ledger::Status status, fuchsia::mem::BufferPtr data) { |
| if (status != fuchsia::ledger::Status::OK || !data) { |
| did_get->Complete(StoryStorage::Status::LEDGER_ERROR, ""); |
| return; |
| } |
| std::string cookie; |
| if (!fsl::StringFromVmo(*data, &cookie)) { |
| did_get->Complete(StoryStorage::Status::VMO_COPY_ERROR, ""); |
| return; |
| } |
| |
| did_get->Complete(StoryStorage::Status::OK, std::move(cookie)); |
| })); |
| return did_get; |
| } |
| |
| FuturePtr<> StoryStorage::Sync() { |
| auto ret = Future<>::Create("StoryStorage::Sync.ret"); |
| operation_queue_.Add(NewCallbackOperation("StoryStorage::Sync", |
| [](OperationBase* op) { |
| return Future<>::CreateCompleted( |
| "StoryStorage::Sync"); |
| }, |
| ret->Completer())); |
| return ret; |
| } |
| |
| void StoryStorage::OnPageChange(const std::string& key, |
| fuchsia::mem::BufferPtr value) { |
| if (StartsWith(key, kEntityKeyPrefix)) { |
| NotifyEntityWatchers(CookieFromEntityKey(key), std::move(*value)); |
| return; |
| } |
| |
| std::string value_string; |
| if (!fsl::StringFromVmo(*value, &value_string)) { |
| return; |
| } |
| |
| // If there are any operations waiting on this particular write |
| // having happened, tell them to continue. |
| auto it = pending_writes_.find({key, value_string}); |
| bool notify_link_listeners = true; |
| if (it != pending_writes_.end()) { |
| auto local_futures = std::move(it->second); |
| for (auto fut : local_futures) { |
| fut->Complete(); |
| } |
| |
| // Since the above write originated from this StoryStorage instance, |
| // we do not notify any link listeners. |
| notify_link_listeners = false; |
| } |
| |
| if (StartsWith(key, kLinkKeyPrefix)) { |
| if (notify_link_listeners) { |
| NotifyLinkWatchers(key, value_string, nullptr /* context */); |
| } |
| } else if (StartsWith(key, kModuleKeyPrefix)) { |
| if (on_module_data_updated_) { |
| auto module_data = ModuleData::New(); |
| if (!XdrRead(value_string, &module_data, XdrModuleData)) { |
| FXL_LOG(ERROR) << "Unable to parse ModuleData " << key << " " << value; |
| return; |
| } |
| on_module_data_updated_(std::move(*module_data)); |
| } |
| } else if (!StartsWith(key, kEntityNamePrefix)) { |
| // TODO(thatguy): We store some Link data on the root page (where |
| // StoryData is stored) for the session shell to make use of. This means we |
| // get notified in that instance of changes we don't care about. |
| // |
| // Consider putting all story-scoped data under a shared prefix, and use |
| // that when initializing the PageClient. |
| FXL_LOG(ERROR) << "Unexpected StoryStorage Ledger key prefix: " << key; |
| } |
| } |
| |
| void StoryStorage::OnPageDelete(const std::string& key) { |
| // ModuleData and Link values are never deleted, although it is |
| // theoretically possible that conflict resolution results in a key |
| // disappearing. We do not currently do this. |
| } |
| |
| void StoryStorage::OnPageConflict(Conflict* conflict) { |
| // TODO(thatguy): Add basic conflict resolution. We can force a conflict for |
| // link data in tests by using Page.StartTransaction() in UpdateLinkValue(). |
| FXL_LOG(WARNING) << "StoryStorage::OnPageConflict() for link key " |
| << to_string(conflict->key); |
| } |
| |
| void StoryStorage::NotifyLinkWatchers(const std::string& link_key, |
| fidl::StringPtr value, |
| const void* context) { |
| auto range = link_watchers_.equal_range(link_key); |
| for (auto it = range.first; it != range.second; ++it) { |
| it->second(value, context); |
| } |
| } |
| |
| void StoryStorage::NotifyEntityWatchers(const std::string& cookie, |
| fuchsia::mem::Buffer value) { |
| auto range = entity_watchers_.equal_range(cookie); |
| for (auto it = range.first; it != range.second; ++it) { |
| for (const auto& watcher : it->second.ptrs()) { |
| fuchsia::mem::Buffer clone; |
| fuchsia::mem::Clone(value, &clone); |
| (*watcher)->OnUpdated( |
| std::make_unique<fuchsia::mem::Buffer>(std::move(clone))); |
| } |
| } |
| } |
| |
| FuturePtr<> StoryStorage::WaitForWrite(const std::string& key, |
| const std::string& value) { |
| // TODO(thatguy): It is possible that through conflict resolution, the write |
| // we expect to get will never arrive. We must have the conflict resolver |
| // update |pending_writes_| with the result of conflict resolution. |
| auto did_see_write = |
| Future<>::Create("StoryStorage.WaitForWrite.did_see_write"); |
| pending_writes_[std::make_pair(key, value)].push_back(did_see_write); |
| return did_see_write; |
| } |
| |
| fxl::WeakPtr<StoryStorage> StoryStorage::GetWeakPtr() { |
| return weak_ptr_factory_.GetWeakPtr(); |
| } |
| |
| } // namespace modular |