blob: 070fb17d8812c0c678e7c934c58b2a179dd94ceb [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/sessionmgr/storage/story_storage.h"
#include <fuchsia/modular/internal/cpp/fidl.h>
#include <lib/fidl/cpp/clone.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/functional/make_copyable.h>
#include <lib/fxl/strings/string_view.h>
#include "peridot/bin/sessionmgr/storage/constants_and_utils.h"
#include "peridot/bin/sessionmgr/storage/story_storage_xdr.h"
#include "peridot/lib/fidl/clone.h"
#include "peridot/lib/ledger_client/operations.h"
namespace modular {
namespace {
// TODO(rosswang): replace with |std::string::starts_with| after C++20
bool StartsWith(const std::string& string, const std::string& prefix) {
return string.compare(0, prefix.size(), prefix) == 0;
}
std::string EntityKeyForCookie(const std::string& cookie) {
return kEntityKeyPrefix + cookie;
}
std::string EntityTypeKeyForCookie(const std::string& cookie) {
return kEntityKeyPrefix + cookie + "/type";
}
std::string CookieFromEntityKey(const std::string& key) {
return key.substr(sizeof(kEntityKeyPrefix) - 1);
}
} // namespace
StoryStorage::StoryStorage(LedgerClient* ledger_client,
fuchsia::ledger::PageId page_id)
: PageClient("StoryStorage", ledger_client, page_id, "" /* key_prefix */),
ledger_client_(ledger_client),
page_id_(page_id),
weak_ptr_factory_(this) {
FXL_DCHECK(ledger_client_ != nullptr);
}
FuturePtr<> StoryStorage::WriteModuleData(ModuleData module_data) {
auto module_path = fidl::Clone(module_data.module_path);
return UpdateModuleData(
module_path, fxl::MakeCopyable([module_data = std::move(module_data)](
ModuleDataPtr* module_data_ptr) {
*module_data_ptr = ModuleData::New();
module_data.Clone(module_data_ptr->get());
}));
}
namespace {
struct UpdateModuleDataState {
std::vector<std::string> module_path;
std::function<void(ModuleDataPtr*)> mutate_fn;
OperationQueue sub_operations;
};
} // namespace
FuturePtr<> StoryStorage::UpdateModuleData(
const std::vector<std::string>& module_path,
std::function<void(ModuleDataPtr*)> mutate_fn) {
auto op_state = std::make_shared<UpdateModuleDataState>();
op_state->module_path = module_path;
op_state->mutate_fn = std::move(mutate_fn);
auto key = MakeModuleKey(module_path);
auto op_body = [this, op_state, key](OperationBase* op) {
auto did_read =
Future<ModuleDataPtr>::Create("StoryStorage.UpdateModuleData.did_read");
op_state->sub_operations.Add(
new ReadDataCall<ModuleData>(page(), key, true /* not_found_is_ok */,
XdrModuleData, did_read->Completer()));
auto did_mutate = did_read->AsyncMap(
[this, op_state, key](ModuleDataPtr current_module_data) {
auto new_module_data = CloneOptional(current_module_data);
op_state->mutate_fn(&new_module_data);
if (!new_module_data && !current_module_data) {
return Future<>::CreateCompleted(
"StoryStorage.UpdateModuleData.did_mutate");
}
auto module_data_copy = CloneOptional(new_module_data);
std::string expected_value;
XdrWrite(&expected_value, &module_data_copy, XdrModuleData);
if (current_module_data) {
FXL_DCHECK(new_module_data)
<< "StoryStorage::UpdateModuleData(): mutate_fn() must not "
"set to null an existing ModuleData record.";
// We complete this Future chain when the Ledger gives us the
// notification that |module_data| has been written. The Ledger
// won't do that if the current value for |key| won't change, so
// we have to short-circuit here.
// ModuleData contains VMOs, so doing a comparison
// *new_module_data == *current_module_data won't be true even if
// the data in the VMOs under the hood is the same. When this
// happens the ledger won't notify us of any change, since the data
// was actually the same. To overcome this we compare the raw
// strings.
auto current_data_copy = CloneOptional(current_module_data);
std::string current_value;
XdrWrite(&current_value, &current_data_copy, XdrModuleData);
if (current_value == expected_value) {
return Future<>::CreateCompleted(
"StoryStorage.UpdateModuleData.did_mutate");
}
}
FXL_DCHECK(new_module_data->module_path == op_state->module_path)
<< "StorageStorage::UpdateModuleData(path, ...): mutate_fn() "
"must set "
"ModuleData.module_path to |path|.";
op_state->sub_operations.Add(new WriteDataCall<ModuleData>(
page(), key, XdrModuleData, std::move(module_data_copy), [] {}));
return WaitForWrite(key, expected_value);
});
return did_mutate;
};
auto ret = Future<>::Create("StoryStorage.UpdateModuleData.ret");
operation_queue_.Add(NewCallbackOperation(
"StoryStorage::UpdateModuleData", std::move(op_body), ret->Completer()));
return ret;
}
FuturePtr<ModuleDataPtr> StoryStorage::ReadModuleData(
const std::vector<std::string>& module_path) {
auto key = MakeModuleKey(module_path);
auto ret = Future<ModuleDataPtr>::Create("StoryStorage.ReadModuleData.ret");
operation_queue_.Add(
new ReadDataCall<ModuleData>(page(), key, true /* not_found_is_ok */,
XdrModuleData, ret->Completer()));
return ret;
}
FuturePtr<std::vector<ModuleData>> StoryStorage::ReadAllModuleData() {
auto ret = Future<std::vector<ModuleData>>::Create(
"StoryStorage.ReadAllModuleData.ret");
operation_queue_.Add(new ReadAllDataCall<ModuleData>(
page(), kModuleKeyPrefix, XdrModuleData, ret->Completer()));
return ret;
}
StoryStorage::LinkWatcherAutoCancel StoryStorage::WatchLink(
const LinkPath& link_path, LinkUpdatedCallback callback) {
auto it = link_watchers_.emplace(MakeLinkKey(link_path), std::move(callback));
auto auto_remove = [weak_this = GetWeakPtr(), it] {
if (!weak_this)
return;
weak_this->link_watchers_.erase(it);
};
return LinkWatcherAutoCancel(std::move(auto_remove));
}
namespace {
constexpr char kJsonNull[] = "null";
class ReadVmoCall
: public Operation<fuchsia::ledger::Status, fuchsia::mem::BufferPtr> {
public:
ReadVmoCall(PageClient* page_client, fidl::StringPtr key,
ResultCall result_call)
: Operation("StoryStorage::ReadVmoCall", std::move(result_call)),
page_client_(page_client),
key_(std::move(key)) {}
private:
void Run() override {
FlowToken flow{this, &status_, &value_};
page_snapshot_ = page_client_->NewSnapshot([this, weak_ptr = GetWeakPtr()] {
if (!weak_ptr) {
return;
}
// An error occurred getting the snapshot. Resetting page_snapshot_
// will ensure that the FlowToken it has captured below while waiting for
// a connected channel will be destroyed, and the operation will be
// complete.
status_ = fuchsia::ledger::Status::UNKNOWN_ERROR;
page_snapshot_ = fuchsia::ledger::PageSnapshotPtr();
});
page_snapshot_->Get(to_array(key_),
[this, flow](fuchsia::ledger::Status status,
fuchsia::mem::BufferPtr value) {
status_ = status;
value_ = std::move(value);
});
}
// Input parameters.
PageClient* const page_client_;
const fidl::StringPtr key_;
// Intermediate state.
fuchsia::ledger::PageSnapshotPtr page_snapshot_;
// Return values.
fuchsia::ledger::Status status_;
fuchsia::mem::BufferPtr value_;
FXL_DISALLOW_COPY_AND_ASSIGN(ReadVmoCall);
};
// TODO(rosswang): this is a temporary migration helper
std::tuple<StoryStorage::Status, fidl::StringPtr> ToLinkValue(
fuchsia::ledger::Status ledger_status,
fuchsia::mem::BufferPtr ledger_value) {
StoryStorage::Status link_value_status = StoryStorage::Status::OK;
fidl::StringPtr link_value;
switch (ledger_status) {
case fuchsia::ledger::Status::KEY_NOT_FOUND:
// Leave link_value as a null-initialized StringPtr.
break;
case fuchsia::ledger::Status::OK:
if (!ledger_value) {
link_value = kJsonNull;
} else {
std::string link_value_string;
if (fsl::StringFromVmo(*ledger_value, &link_value_string)) {
link_value = std::move(link_value_string);
} else {
FXL_LOG(ERROR) << "VMO could not be copied.";
link_value_status = StoryStorage::Status::VMO_COPY_ERROR;
}
}
break;
default:
FXL_LOG(ERROR) << "PageSnapshot.Get() "
<< fidl::ToUnderlying(ledger_status);
link_value_status = StoryStorage::Status::LEDGER_ERROR;
break;
}
return {link_value_status, link_value};
}
} // namespace
FuturePtr<StoryStorage::Status, std::string> StoryStorage::GetLinkValue(
const LinkPath& link_path) {
auto key = MakeLinkKey(link_path);
auto ret = Future<fuchsia::ledger::Status, fuchsia::mem::BufferPtr>::Create(
"StoryStorage::GetLinkValue " + key);
operation_queue_.Add(new ReadVmoCall(this, key, ret->Completer()));
return ret->Map(ToLinkValue)->Map([](Status status, fidl::StringPtr value) {
return std::make_tuple(status, value ? *value : kJsonNull);
});
}
namespace {
class WriteVmoCall : public Operation<StoryStorage::Status> {
public:
WriteVmoCall(PageClient* page_client, const std::string& key,
fuchsia::mem::Buffer value, ResultCall result_call)
: Operation("StoryStorage::WriteVmoCall", std::move(result_call)),
page_client_(page_client),
key_(key),
value_(std::move(value)) {}
private:
void Run() override {
FlowToken flow{this, &status_};
status_ = StoryStorage::Status::OK;
page_client_->page()->CreateReferenceFromBuffer(
std::move(value_), [this, flow, weak_ptr = GetWeakPtr()](
fuchsia::ledger::Status status,
fuchsia::ledger::ReferencePtr reference) {
if (weak_ptr) {
if (status == fuchsia::ledger::Status::OK) {
FXL_DCHECK(reference);
PutReference(std::move(reference), flow);
} else {
FXL_LOG(ERROR) << "StoryStorage.WriteVmoCall " << key_ << " "
<< " Page.CreateReferenceFromBuffer() "
<< fidl::ToUnderlying(status);
status_ = StoryStorage::Status::LEDGER_ERROR;
}
}
});
}
void PutReference(fuchsia::ledger::ReferencePtr reference, FlowToken flow) {
page_client_->page()->PutReference(
to_array(key_), std::move(*reference), fuchsia::ledger::Priority::EAGER,
[this, flow, weak_ptr = GetWeakPtr()](fuchsia::ledger::Status status) {
if (weak_ptr && status != fuchsia::ledger::Status::OK) {
FXL_LOG(ERROR) << "StoryStorage.WriteVmoCall " << key_ << " "
<< " Page.PutReference() "
<< fidl::ToUnderlying(status);
status_ = StoryStorage::Status::LEDGER_ERROR;
}
});
}
PageClient* const page_client_;
std::string key_;
fuchsia::mem::Buffer value_;
StoryStorage::Status status_;
FXL_DISALLOW_COPY_AND_ASSIGN(WriteVmoCall);
};
// Returns the type of the entity with the associated |cookie|.
//
// If no type is found for the given |cookie|, the returned status will be
// |INVALID_ENTITY_COOKIE|.
class GetEntityTypeCall
: public PageOperation<StoryStorage::Status, std::string> {
public:
GetEntityTypeCall(PageClient* page_client, const std::string& cookie,
ResultCall result_call)
: PageOperation("StoryStorage::GetEntityTypeCall", page_client->page(),
std::move(result_call)),
page_client_(page_client),
cookie_(cookie) {}
private:
void Run() override {
FlowToken flow{this, &status_, &type_};
status_ = StoryStorage::Status::OK;
operation_queue_.Add(new ReadVmoCall(
page_client_, EntityTypeKeyForCookie(cookie_),
[this, flow](fuchsia::ledger::Status status,
fuchsia::mem::BufferPtr buffer) {
if (status == fuchsia::ledger::Status::KEY_NOT_FOUND) {
status_ = StoryStorage::Status::INVALID_ENTITY_COOKIE;
return;
}
if (status != fuchsia::ledger::Status::OK) {
status_ = StoryStorage::Status::LEDGER_ERROR;
return;
}
if (buffer) {
FXL_CHECK(fsl::StringFromVmo(*buffer, &type_));
}
}));
}
PageClient* const page_client_;
const std::string cookie_;
std::string type_;
StoryStorage::Status status_;
OperationQueue operation_queue_;
FXL_DISALLOW_COPY_AND_ASSIGN(GetEntityTypeCall);
};
class GetEntityDataCall
: public PageOperation<StoryStorage::Status, fuchsia::mem::BufferPtr> {
public:
GetEntityDataCall(PageClient* page_client, const std::string& cookie,
const std::string& type, ResultCall result_call)
: PageOperation("StoryStorage::GetEntityDataCall", page_client->page(),
std::move(result_call)),
page_client_(page_client),
cookie_(cookie),
type_(type) {}
private:
void Run() override {
FlowToken flow{this, &status_, &result_};
status_ = StoryStorage::Status::OK;
operation_queue_.Add(new GetEntityTypeCall(
page_client_, cookie_,
[this, flow](StoryStorage::Status status, const std::string& type) {
if (status != StoryStorage::Status::OK) {
status_ = status;
return;
}
if (type_ != type) {
status_ = StoryStorage::Status::INVALID_ENTITY_TYPE;
return;
}
operation_queue_.Add(
new ReadVmoCall(page_client_, EntityKeyForCookie(cookie_),
[this, flow](fuchsia::ledger::Status status,
fuchsia::mem::BufferPtr buffer) {
StoryStorage::Status story_status =
status == fuchsia::ledger::Status::OK
? StoryStorage::Status::OK
: StoryStorage::Status::LEDGER_ERROR;
status_ = story_status;
result_ = std::move(buffer);
}));
}));
}
PageClient* const page_client_;
const std::string cookie_;
std::string type_;
fuchsia::mem::BufferPtr result_ = nullptr;
StoryStorage::Status status_;
OperationQueue operation_queue_;
FXL_DISALLOW_COPY_AND_ASSIGN(GetEntityDataCall);
};
// Sets the type and value of the Entity stored under |cookie|.
//
// The type and value are written to separate keys in a single transaction.
//
// The |result_call| is wrapped in a function which checks whether or not all
// the writes were successfull prior to commiting the transaction.
class SetEntityDataCall : public PageOperation<StoryStorage::Status> {
public:
SetEntityDataCall(PageClient* page_client, const std::string& cookie,
const std::string& type, fuchsia::mem::Buffer value,
ResultCall result_call)
: PageOperation(
"StoryStorage::SetEntityDataCall", page_client->page(),
[result_call = std::move(result_call),
page = page_client->page()](StoryStorage::Status status) {
// The result callback is wrapped in order to commit/rollback the
// transaction. It's important to note that the operation instance
// will be deleted by the time this callback is executed.
if (status == StoryStorage::Status::OK) {
page->Commit([result_call = std::move(result_call)](
fuchsia::ledger::Status status) {
if (status == fuchsia::ledger::Status::OK) {
result_call(StoryStorage::Status::OK);
} else {
result_call(StoryStorage::Status::LEDGER_ERROR);
}
});
} else {
page->Rollback([result_call = std::move(result_call)](
fuchsia::ledger::Status status) {});
result_call(status);
}
}),
page_client_(page_client),
cookie_(cookie),
type_(type),
value_(std::move(value)) {}
private:
void Run() override {
FlowToken flow{this, &status_};
status_ = StoryStorage::Status::OK;
// This transaction will be either committed or rolled back in the wrapped
// result call.
page()->StartTransaction([](fuchsia::ledger::Status status) {});
// First verify the type of the data to write matches the existing entity
// type.
operation_queue_.Add(new GetEntityTypeCall(
page_client_, cookie_,
[this, flow](StoryStorage::Status status, const std::string& type) {
if (status == StoryStorage::Status::LEDGER_ERROR) {
status_ = StoryStorage::Status::LEDGER_ERROR;
return;
}
if (status == StoryStorage::Status::INVALID_ENTITY_COOKIE ||
type == type_) {
// If the type is empty it has never been written before (empty
// types are not permitted so will not be written in the first
// place).
PerformWrites(flow);
} else {
status_ = StoryStorage::Status::INVALID_ENTITY_TYPE;
return;
}
}));
}
void PerformWrites(FlowToken flow) {
// Write the entity data.
WriteVmo(std::move(value_), EntityKeyForCookie(cookie_), flow);
// Write the entity type.
fuchsia::mem::Buffer type_vmo;
FXL_CHECK(fsl::VmoFromString(type_, &type_vmo));
WriteVmo(std::move(type_vmo), EntityTypeKeyForCookie(cookie_), flow);
}
void WriteVmo(fuchsia::mem::Buffer data, const std::string& key,
FlowToken flow) {
page()->CreateReferenceFromBuffer(
std::move(data),
[this, key, flow](fuchsia::ledger::Status status,
fuchsia::ledger::ReferencePtr reference) {
if (status == fuchsia::ledger::Status::OK) {
FXL_DCHECK(reference);
PutReference(std::move(reference), key, flow);
} else {
FXL_LOG(ERROR) << trace_name() << " "
<< "SetEntityDataCall.CreateReferenceFromBuffer "
<< key << " " << fidl::ToUnderlying(status);
status_ = StoryStorage::Status::LEDGER_ERROR;
}
});
}
void PutReference(fuchsia::ledger::ReferencePtr reference,
const std::string& key, FlowToken flow) {
page()->PutReference(
to_array(key), std::move(*reference), fuchsia::ledger::Priority::EAGER,
[this, key, flow](fuchsia::ledger::Status status) {
if (status != fuchsia::ledger::Status::OK) {
FXL_LOG(ERROR) << trace_name() << " "
<< "SetEntityDataCall.PutReference " << key << " "
<< fidl::ToUnderlying(status);
status_ = StoryStorage::Status::LEDGER_ERROR;
}
});
}
PageClient* const page_client_;
std::string cookie_;
std::string type_;
fuchsia::mem::Buffer value_;
StoryStorage::Status status_;
OperationQueue operation_queue_;
FXL_DISALLOW_COPY_AND_ASSIGN(SetEntityDataCall);
};
// Returns: 1) if a mutation happened, 2) the status and 3) the new value.
class UpdateLinkCall
: public Operation<bool, StoryStorage::Status, fidl::StringPtr> {
public:
UpdateLinkCall(
PageClient* page_client, std::string key,
std::function<void(fidl::StringPtr*)> mutate_fn,
std::function<FuturePtr<>(const std::string&, const std::string&)>
wait_for_write_fn,
ResultCall done)
: Operation("StoryStorage::UpdateLinkCall", std::move(done)),
page_client_(page_client),
key_(std::move(key)),
mutate_fn_(std::move(mutate_fn)),
wait_for_write_fn_(std::move(wait_for_write_fn)) {}
private:
void Run() override {
FlowToken flow{this, &did_update_, &status_, &new_value_};
operation_queue_.Add(
new ReadVmoCall(page_client_, key_,
[this, flow](fuchsia::ledger::Status status,
fuchsia::mem::BufferPtr current_value) {
fidl::StringPtr json_current_value;
std::tie(status_, json_current_value) =
ToLinkValue(status, std::move(current_value));
if (status_ == StoryStorage::Status::OK) {
Mutate(flow, std::move(json_current_value));
}
}));
}
void Mutate(FlowToken flow, fidl::StringPtr current_value) {
new_value_ = current_value;
mutate_fn_(&new_value_);
did_update_ = true;
if (new_value_ == current_value) {
did_update_ = false;
return;
}
fuchsia::mem::Buffer vmo;
if (!new_value_ || fsl::VmoFromString(*new_value_, &vmo)) {
operation_queue_.Add(new WriteVmoCall(
page_client_, key_, std::move(vmo),
[this, flow](StoryStorage::Status status) {
status_ = status;
// If we succeeded AND we set a new value, we need to wait for
// confirmation from the ledger.
if (status == StoryStorage::Status::OK && new_value_) {
wait_for_write_fn_(key_, new_value_)->Then([this, flow] {
Done(true, std::move(status_), std::move(new_value_));
});
}
}));
} else {
FXL_LOG(ERROR) << "VMO could not be copied.";
status_ = StoryStorage::Status::VMO_COPY_ERROR;
}
}
// Input parameters.
PageClient* const page_client_;
const std::string key_;
std::function<void(fidl::StringPtr*)> mutate_fn_;
std::function<FuturePtr<>(const std::string&, const std::string&)>
wait_for_write_fn_;
// Operation runtime state.
OperationQueue operation_queue_;
// Return values.
bool did_update_;
StoryStorage::Status status_;
fidl::StringPtr new_value_;
FXL_DISALLOW_COPY_AND_ASSIGN(UpdateLinkCall);
};
} // namespace
FuturePtr<StoryStorage::Status> StoryStorage::UpdateLinkValue(
const LinkPath& link_path, std::function<void(fidl::StringPtr*)> mutate_fn,
const void* context) {
// nullptr is reserved for updates that came from other instances of
// StoryStorage.
FXL_DCHECK(context != nullptr)
<< "StoryStorage::UpdateLinkValue(..., context) of nullptr is reserved.";
auto key = MakeLinkKey(link_path);
auto did_update = Future<bool, Status, fidl::StringPtr>::Create(
"StoryStorage.UpdateLinkValue.did_update");
operation_queue_.Add(new UpdateLinkCall(
this, key, std::move(mutate_fn),
std::bind(&StoryStorage::WaitForWrite, this, std::placeholders::_1,
std::placeholders::_2),
did_update->Completer()));
// We can't chain this call to the parent future chain because we do
// not want it to happen at all in the case of errors.
return did_update->WeakMap(
GetWeakPtr(),
[this, key, context](bool did_update, StoryStorage::Status status,
fidl::StringPtr new_value) {
// if |new_value| is null, it means we didn't write any new data, even
// if |status| == OK.
if (status == StoryStorage::Status::OK && did_update) {
NotifyLinkWatchers(key, new_value, context);
}
return status;
});
}
FuturePtr<StoryStorage::Status> StoryStorage::SetEntityData(
const std::string& cookie, const std::string& type,
fuchsia::mem::Buffer data) {
auto did_update = Future<StoryStorage::Status>::Create(
"StoryStorage.CreateEntity.did_update");
if (type.empty()) {
did_update->Complete(StoryStorage::Status::INVALID_ENTITY_TYPE);
} else if (cookie.empty()) {
did_update->Complete(StoryStorage::Status::INVALID_ENTITY_COOKIE);
} else {
operation_queue_.Add(new SetEntityDataCall(
this, cookie, type, std::move(data), did_update->Completer()));
}
return did_update;
}
FuturePtr<StoryStorage::Status, std::string> StoryStorage::GetEntityType(
const std::string& cookie) {
auto did_update = Future<StoryStorage::Status, std::string>::Create(
"StoryStorage.GetEntityType.did_update");
operation_queue_.Add(
new GetEntityTypeCall(this, cookie, did_update->Completer()));
return did_update;
}
FuturePtr<StoryStorage::Status, fuchsia::mem::BufferPtr>
StoryStorage::GetEntityData(const std::string& cookie,
const std::string& type) {
auto did_update =
Future<StoryStorage::Status, fuchsia::mem::BufferPtr>::Create(
"StoryStorage.GetEntityData.did_update");
operation_queue_.Add(
new GetEntityDataCall(this, cookie, type, did_update->Completer()));
return did_update;
}
void StoryStorage::WatchEntity(
const std::string& cookie, const std::string& type,
fuchsia::modular::EntityWatcherPtr entity_watcher) {
operation_queue_.Add(new GetEntityDataCall(
this, cookie, type,
fxl::MakeCopyable(
[this, cookie, entity_watcher = std::move(entity_watcher)](
StoryStorage::Status status,
std::unique_ptr<fuchsia::mem::Buffer> value) mutable {
// Send the current value as the initial update.
entity_watcher->OnUpdated(std::move(value));
auto existing_watchers_it = entity_watchers_.try_emplace(cookie);
existing_watchers_it.first->second.AddInterfacePtr(
std::move(entity_watcher));
})));
}
FuturePtr<StoryStorage::Status> StoryStorage::SetEntityName(
const std::string& cookie, const std::string& entity_name) {
auto did_set = Future<StoryStorage::Status>::Create(
"StoryStorage.SetEntityName.did_set");
fuchsia::mem::Buffer data;
if (fsl::VmoFromString(cookie, &data)) {
operation_queue_.Add(new WriteVmoCall(this, kEntityNamePrefix + entity_name,
std::move(data),
did_set->Completer()));
} else {
did_set->Complete(StoryStorage::Status::VMO_COPY_ERROR);
}
return did_set;
}
FuturePtr<StoryStorage::Status, std::string>
StoryStorage::GetEntityCookieForName(const std::string& entity_name) {
auto did_get = Future<StoryStorage::Status, std::string>::Create(
"StoryStorage.GetEntityName.did_get");
operation_queue_.Add(new ReadVmoCall(
this, kEntityNamePrefix + entity_name,
[did_get](fuchsia::ledger::Status status, fuchsia::mem::BufferPtr data) {
if (status != fuchsia::ledger::Status::OK || !data) {
did_get->Complete(StoryStorage::Status::LEDGER_ERROR, "");
return;
}
std::string cookie;
if (!fsl::StringFromVmo(*data, &cookie)) {
did_get->Complete(StoryStorage::Status::VMO_COPY_ERROR, "");
return;
}
did_get->Complete(StoryStorage::Status::OK, std::move(cookie));
}));
return did_get;
}
FuturePtr<> StoryStorage::Sync() {
auto ret = Future<>::Create("StoryStorage::Sync.ret");
operation_queue_.Add(NewCallbackOperation("StoryStorage::Sync",
[](OperationBase* op) {
return Future<>::CreateCompleted(
"StoryStorage::Sync");
},
ret->Completer()));
return ret;
}
void StoryStorage::OnPageChange(const std::string& key,
fuchsia::mem::BufferPtr value) {
if (StartsWith(key, kEntityKeyPrefix)) {
NotifyEntityWatchers(CookieFromEntityKey(key), std::move(*value));
return;
}
std::string value_string;
if (!fsl::StringFromVmo(*value, &value_string)) {
return;
}
// If there are any operations waiting on this particular write
// having happened, tell them to continue.
auto it = pending_writes_.find({key, value_string});
bool notify_link_listeners = true;
if (it != pending_writes_.end()) {
auto local_futures = std::move(it->second);
for (auto fut : local_futures) {
fut->Complete();
}
// Since the above write originated from this StoryStorage instance,
// we do not notify any link listeners.
notify_link_listeners = false;
}
if (StartsWith(key, kLinkKeyPrefix)) {
if (notify_link_listeners) {
NotifyLinkWatchers(key, value_string, nullptr /* context */);
}
} else if (StartsWith(key, kModuleKeyPrefix)) {
if (on_module_data_updated_) {
auto module_data = ModuleData::New();
if (!XdrRead(value_string, &module_data, XdrModuleData)) {
FXL_LOG(ERROR) << "Unable to parse ModuleData " << key << " " << value;
return;
}
on_module_data_updated_(std::move(*module_data));
}
} else if (!StartsWith(key, kEntityNamePrefix)) {
// TODO(thatguy): We store some Link data on the root page (where
// StoryData is stored) for the session shell to make use of. This means we
// get notified in that instance of changes we don't care about.
//
// Consider putting all story-scoped data under a shared prefix, and use
// that when initializing the PageClient.
FXL_LOG(ERROR) << "Unexpected StoryStorage Ledger key prefix: " << key;
}
}
void StoryStorage::OnPageDelete(const std::string& key) {
// ModuleData and Link values are never deleted, although it is
// theoretically possible that conflict resolution results in a key
// disappearing. We do not currently do this.
}
void StoryStorage::OnPageConflict(Conflict* conflict) {
// TODO(thatguy): Add basic conflict resolution. We can force a conflict for
// link data in tests by using Page.StartTransaction() in UpdateLinkValue().
FXL_LOG(WARNING) << "StoryStorage::OnPageConflict() for link key "
<< to_string(conflict->key);
}
void StoryStorage::NotifyLinkWatchers(const std::string& link_key,
fidl::StringPtr value,
const void* context) {
auto range = link_watchers_.equal_range(link_key);
for (auto it = range.first; it != range.second; ++it) {
it->second(value, context);
}
}
void StoryStorage::NotifyEntityWatchers(const std::string& cookie,
fuchsia::mem::Buffer value) {
auto range = entity_watchers_.equal_range(cookie);
for (auto it = range.first; it != range.second; ++it) {
for (const auto& watcher : it->second.ptrs()) {
fuchsia::mem::Buffer clone;
fuchsia::mem::Clone(value, &clone);
(*watcher)->OnUpdated(
std::make_unique<fuchsia::mem::Buffer>(std::move(clone)));
}
}
}
FuturePtr<> StoryStorage::WaitForWrite(const std::string& key,
const std::string& value) {
// TODO(thatguy): It is possible that through conflict resolution, the write
// we expect to get will never arrive. We must have the conflict resolver
// update |pending_writes_| with the result of conflict resolution.
auto did_see_write =
Future<>::Create("StoryStorage.WaitForWrite.did_see_write");
pending_writes_[std::make_pair(key, value)].push_back(did_see_write);
return did_see_write;
}
fxl::WeakPtr<StoryStorage> StoryStorage::GetWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
} // namespace modular