blob: 4265eefb0344a6c3a7c3e1f1683460dfbb9803d5 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/cloud_provider_firestore/app/page_cloud_impl.h"
#include <lib/callback/scoped_callback.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fsl/vmo/sized_vmo.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/functional/make_copyable.h>
#include <lib/fxl/strings/concatenate.h>
#include "peridot/bin/cloud_provider_firestore/app/grpc_status.h"
#include "peridot/bin/cloud_provider_firestore/firestore/encoding.h"
#include "peridot/lib/convert/convert.h"
namespace cloud_provider_firestore {
namespace {
constexpr char kSeparator[] = "/";
constexpr char kObjectCollection[] = "objects";
constexpr char kCommitLogCollection[] = "commit-log";
constexpr char kDataKey[] = "data";
constexpr char kTimestampField[] = "timestamp";
constexpr size_t kFirestoreMaxDocumentSize = 1'000'000;
// Ledger stores objects chunked to ~64k, so even 500kB is more than should ever
// be needed.
constexpr size_t kMaxObjectSize = kFirestoreMaxDocumentSize / 2;
std::string GetObjectPath(fxl::StringView page_path,
fxl::StringView object_id) {
std::string encoded_object_id = EncodeKey(object_id);
return fxl::Concatenate({page_path, kSeparator, kObjectCollection, kSeparator,
encoded_object_id});
}
std::string GetCommitBatchPath(fxl::StringView page_path,
fxl::StringView batch_id) {
std::string encoded_batch_id = EncodeKey(batch_id);
return fxl::Concatenate({page_path, kSeparator, kCommitLogCollection,
kSeparator, encoded_batch_id});
}
google::firestore::v1beta1::StructuredQuery MakeCommitQuery(
std::unique_ptr<google::protobuf::Timestamp> timestamp_or_null) {
google::firestore::v1beta1::StructuredQuery query;
// Sub-collections to be queried.
google::firestore::v1beta1::StructuredQuery::CollectionSelector& selector =
*query.add_from();
selector.set_collection_id(kCommitLogCollection);
selector.set_all_descendants(false);
// Ordering.
google::firestore::v1beta1::StructuredQuery::Order& order_by =
*query.add_order_by();
order_by.mutable_field()->set_field_path(kTimestampField);
// Filtering.
if (timestamp_or_null) {
google::firestore::v1beta1::StructuredQuery::Filter& filter =
*query.mutable_where();
google::firestore::v1beta1::StructuredQuery::FieldFilter& field_filter =
*filter.mutable_field_filter();
field_filter.mutable_field()->set_field_path(kTimestampField);
field_filter.set_op(
google::firestore::v1beta1::
StructuredQuery_FieldFilter_Operator_GREATER_THAN_OR_EQUAL);
field_filter.mutable_value()->mutable_timestamp_value()->Swap(
timestamp_or_null.get());
}
return query;
}
} // namespace
PageCloudImpl::PageCloudImpl(
std::string page_path, rng::Random* random,
CredentialsProvider* credentials_provider,
FirestoreService* firestore_service,
fidl::InterfaceRequest<cloud_provider::PageCloud> request)
: page_path_(std::move(page_path)),
random_(random),
credentials_provider_(credentials_provider),
firestore_service_(firestore_service),
binding_(this, std::move(request)),
weak_ptr_factory_(this) {
// The class shuts down when the client connection is disconnected.
binding_.set_error_handler([this](zx_status_t status) {
if (on_empty_) {
on_empty_();
}
});
}
PageCloudImpl::~PageCloudImpl() {}
void PageCloudImpl::ScopedGetCredentials(
fit::function<void(std::shared_ptr<grpc::CallCredentials>)> callback) {
credentials_provider_->GetCredentials(callback::MakeScoped(
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
void PageCloudImpl::AddCommits(cloud_provider::CommitPack commits,
AddCommitsCallback callback) {
std::vector<cloud_provider::CommitPackEntry> commit_pack_entries;
if (!cloud_provider::DecodeCommitPack(commits, &commit_pack_entries)) {
callback(cloud_provider::Status::ARGUMENT_ERROR);
return;
}
auto request = google::firestore::v1beta1::CommitRequest();
request.set_database(firestore_service_->GetDatabasePath());
// Set the document name to a new UUID. Firestore Commit() API doesn't allow
// to request the ID to be assigned by the server.
const std::string document_name = GetCommitBatchPath(
page_path_, convert::ToHex(random_->RandomUniqueBytes()));
// The commit batch is added in a single commit containing multiple writes.
//
// First write adds the document containing the encoded commit batch.
google::firestore::v1beta1::Write& add_batch_write = *(request.add_writes());
EncodeCommitBatch(commits, add_batch_write.mutable_update());
(*add_batch_write.mutable_update()->mutable_name()) = document_name;
// Ensure that the write doesn't overwrite an existing document.
add_batch_write.mutable_current_document()->set_exists(false);
// The second write sets the timestamp field to the server-side request
// timestamp.
google::firestore::v1beta1::Write& set_timestamp_write =
*(request.add_writes());
(*set_timestamp_write.mutable_transform()->mutable_document()) =
document_name;
google::firestore::v1beta1::DocumentTransform_FieldTransform& transform =
*(set_timestamp_write.mutable_transform()->add_field_transforms());
*(transform.mutable_field_path()) = kTimestampField;
transform.set_set_to_server_value(
google::firestore::v1beta1::
DocumentTransform_FieldTransform_ServerValue_REQUEST_TIME);
ScopedGetCredentials(
[this, request = std::move(request),
callback = std::move(callback)](auto call_credentials) mutable {
firestore_service_->Commit(
std::move(request), std::move(call_credentials),
[callback = std::move(callback)](auto status, auto result) {
if (LogGrpcRequestError(status)) {
callback(ConvertGrpcStatus(status.error_code()));
return;
}
callback(cloud_provider::Status::OK);
});
});
}
void PageCloudImpl::GetCommits(
std::unique_ptr<cloud_provider::Token> min_position_token,
GetCommitsCallback callback) {
std::unique_ptr<google::protobuf::Timestamp> timestamp_or_null;
if (min_position_token) {
timestamp_or_null = std::make_unique<google::protobuf::Timestamp>();
if (!timestamp_or_null->ParseFromString(
convert::ToString(min_position_token->opaque_id))) {
callback(cloud_provider::Status::ARGUMENT_ERROR, nullptr, nullptr);
return;
}
}
auto request = google::firestore::v1beta1::RunQueryRequest();
request.set_parent(page_path_);
auto query = MakeCommitQuery(std::move(timestamp_or_null));
request.mutable_structured_query()->Swap(&query);
ScopedGetCredentials([this, request = std::move(request),
callback = std::move(callback)](
auto call_credentials) mutable {
firestore_service_->RunQuery(
std::move(request), std::move(call_credentials),
[callback = std::move(callback)](auto status, auto result) {
if (LogGrpcRequestError(status)) {
callback(ConvertGrpcStatus(status.error_code()), nullptr, nullptr);
return;
}
std::vector<cloud_provider::CommitPackEntry> commit_entries;
std::string timestamp;
for (const auto& response : result) {
if (!response.has_document()) {
continue;
}
std::vector<cloud_provider::CommitPackEntry> batch_entries;
if (!DecodeCommitBatch(response.document(), &batch_entries,
&timestamp)) {
callback(cloud_provider::Status::PARSE_ERROR, nullptr, nullptr);
return;
}
std::move(batch_entries.begin(), batch_entries.end(),
std::back_inserter(commit_entries));
}
cloud_provider::CommitPack commit_pack;
if (!cloud_provider::EncodeCommitPack(commit_entries, &commit_pack)) {
callback(cloud_provider::Status::INTERNAL_ERROR, nullptr, nullptr);
return;
}
std::unique_ptr<cloud_provider::Token> token;
if (!commit_entries.empty()) {
token = std::make_unique<cloud_provider::Token>();
token->opaque_id = convert::ToArray(timestamp);
}
callback(cloud_provider::Status::OK,
fidl::MakeOptional(std::move(commit_pack)),
std::move(token));
});
});
}
void PageCloudImpl::AddObject(std::vector<uint8_t> id,
fuchsia::mem::Buffer data,
AddObjectCallback callback) {
std::string data_str;
fsl::SizedVmo vmo;
if (!fsl::StringFromVmo(data, &data_str) ||
data_str.size() > kMaxObjectSize) {
callback(cloud_provider::Status::ARGUMENT_ERROR);
return;
}
auto request = google::firestore::v1beta1::CreateDocumentRequest();
request.set_parent(page_path_);
request.set_collection_id(kObjectCollection);
google::firestore::v1beta1::Document* document = request.mutable_document();
request.set_document_id(EncodeKey(convert::ToString(id)));
*((*document->mutable_fields())[kDataKey].mutable_bytes_value()) =
std::move(data_str);
ScopedGetCredentials(
[this, request = std::move(request),
callback = std::move(callback)](auto call_credentials) mutable {
firestore_service_->CreateDocument(
std::move(request), std::move(call_credentials),
[callback = std::move(callback)](auto status, auto result) {
if (status.error_code() == grpc::ALREADY_EXISTS) {
callback(cloud_provider::Status::OK);
return;
}
if (LogGrpcRequestError(status)) {
callback(ConvertGrpcStatus(status.error_code()));
return;
}
callback(cloud_provider::Status::OK);
});
});
}
void PageCloudImpl::GetObject(std::vector<uint8_t> id,
GetObjectCallback callback) {
auto request = google::firestore::v1beta1::GetDocumentRequest();
request.set_name(GetObjectPath(page_path_, convert::ToString(id)));
ScopedGetCredentials(
[this, request = std::move(request),
callback = std::move(callback)](auto call_credentials) mutable {
firestore_service_->GetDocument(
std::move(request), std::move(call_credentials),
[callback = std::move(callback)](auto status, auto result) {
if (LogGrpcRequestError(status)) {
callback(ConvertGrpcStatus(status.error_code()), nullptr);
return;
}
if (result.fields().count(kDataKey) != 1) {
FXL_LOG(ERROR)
<< "Incorrect format of the retrieved object document";
callback(cloud_provider::Status::PARSE_ERROR, nullptr);
return;
}
const std::string& bytes =
result.fields().at(kDataKey).bytes_value();
::fuchsia::mem::Buffer buffer;
if (!fsl::VmoFromString(bytes, &buffer)) {
callback(cloud_provider::Status::INTERNAL_ERROR, nullptr);
return;
}
callback(cloud_provider::Status::OK,
fidl::MakeOptional(std::move(buffer)));
});
});
}
void PageCloudImpl::SetWatcher(
std::unique_ptr<cloud_provider::Token> min_position_token,
fidl::InterfaceHandle<cloud_provider::PageCloudWatcher> watcher,
SetWatcherCallback callback) {
std::unique_ptr<google::protobuf::Timestamp> timestamp_or_null;
if (min_position_token) {
timestamp_or_null = std::make_unique<google::protobuf::Timestamp>();
if (!timestamp_or_null->ParseFromString(
convert::ToString(min_position_token->opaque_id))) {
callback(cloud_provider::Status::ARGUMENT_ERROR);
return;
}
}
watcher_ = watcher.Bind();
watcher_.set_error_handler([this](zx_status_t status) { ShutDownWatcher(); });
watcher_timestamp_or_null_ = std::move(timestamp_or_null);
set_watcher_callback_ = std::move(callback);
ScopedGetCredentials([this](auto call_credentials) mutable {
// Initiate the listen RPC. We will receive a call on OnConnected() when the
// listen stream is ready.
listen_call_handler_ =
firestore_service_->Listen(std::move(call_credentials), this);
});
}
void PageCloudImpl::OnConnected() {
auto request = google::firestore::v1beta1::ListenRequest();
request.set_database(firestore_service_->GetDatabasePath());
google::firestore::v1beta1::Target::QueryTarget& query_target =
*request.mutable_add_target()->mutable_query();
query_target.set_parent(page_path_);
auto query = MakeCommitQuery(std::move(watcher_timestamp_or_null_));
query_target.mutable_structured_query()->Swap(&query);
listen_call_handler_->Write(std::move(request));
}
void PageCloudImpl::OnResponse(
google::firestore::v1beta1::ListenResponse response) {
if (response.has_target_change()) {
if (response.target_change().target_change_type() ==
google::firestore::v1beta1::TargetChange_TargetChangeType_CURRENT) {
if (set_watcher_callback_) {
set_watcher_callback_(cloud_provider::Status::OK);
set_watcher_callback_ = nullptr;
}
}
return;
}
if (response.has_document_change()) {
std::string timestamp;
std::vector<cloud_provider::CommitPackEntry> commit_entries;
if (!DecodeCommitBatch(response.document_change().document(),
&commit_entries, &timestamp)) {
watcher_->OnError(cloud_provider::Status::PARSE_ERROR);
ShutDownWatcher();
}
cloud_provider::Token token;
token.opaque_id = convert::ToArray(timestamp);
HandleCommits(std::move(commit_entries), std::move(token));
}
}
void PageCloudImpl::OnFinished(grpc::Status status) {
if (status.error_code() == grpc::UNAVAILABLE ||
status.error_code() == grpc::UNAUTHENTICATED) {
if (watcher_) {
watcher_->OnError(cloud_provider::Status::NETWORK_ERROR);
}
return;
}
LogGrpcConnectionError(status);
watcher_.Unbind();
}
void PageCloudImpl::HandleCommits(
std::vector<cloud_provider::CommitPackEntry> commit_entries,
cloud_provider::Token token) {
std::move(commit_entries.begin(), commit_entries.end(),
std::back_inserter(commits_waiting_for_ack_));
token_for_waiting_commits_ = std::move(token);
if (!waiting_for_watcher_to_ack_commits_) {
SendWaitingCommits();
}
}
void PageCloudImpl::SendWaitingCommits() {
FXL_DCHECK(watcher_);
FXL_DCHECK(!commits_waiting_for_ack_.empty());
cloud_provider::Token token = std::move(token_for_waiting_commits_);
cloud_provider::CommitPack commit_pack;
if (!cloud_provider::EncodeCommitPack(commits_waiting_for_ack_,
&commit_pack)) {
watcher_->OnError(cloud_provider::Status::INTERNAL_ERROR);
ShutDownWatcher();
return;
}
watcher_->OnNewCommits(std::move(commit_pack), std::move(token), [this] {
waiting_for_watcher_to_ack_commits_ = false;
if (!commits_waiting_for_ack_.empty()) {
SendWaitingCommits();
}
});
waiting_for_watcher_to_ack_commits_ = true;
commits_waiting_for_ack_.clear();
}
void PageCloudImpl::ShutDownWatcher() {
if (watcher_) {
watcher_.Unbind();
}
if (listen_call_handler_) {
listen_call_handler_.reset();
}
}
} // namespace cloud_provider_firestore