| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "peridot/bin/cloud_provider_firestore/app/page_cloud_impl.h" |
| |
| #include <lib/callback/scoped_callback.h> |
| #include <lib/fidl/cpp/optional.h> |
| #include <lib/fit/function.h> |
| #include <lib/fsl/socket/strings.h> |
| #include <lib/fsl/vmo/sized_vmo.h> |
| #include <lib/fsl/vmo/strings.h> |
| #include <lib/fxl/functional/make_copyable.h> |
| #include <lib/fxl/strings/concatenate.h> |
| |
| #include "peridot/bin/cloud_provider_firestore/app/grpc_status.h" |
| #include "peridot/bin/cloud_provider_firestore/firestore/encoding.h" |
| #include "peridot/lib/convert/convert.h" |
| |
| namespace cloud_provider_firestore { |
| namespace { |
| |
| constexpr char kSeparator[] = "/"; |
| constexpr char kObjectCollection[] = "objects"; |
| constexpr char kCommitLogCollection[] = "commit-log"; |
| constexpr char kDataKey[] = "data"; |
| constexpr char kTimestampField[] = "timestamp"; |
| constexpr size_t kFirestoreMaxDocumentSize = 1'000'000; |
| // Ledger stores objects chunked to ~64k, so even 500kB is more than should ever |
| // be needed. |
| constexpr size_t kMaxObjectSize = kFirestoreMaxDocumentSize / 2; |
| |
| std::string GetObjectPath(fxl::StringView page_path, |
| fxl::StringView object_id) { |
| std::string encoded_object_id = EncodeKey(object_id); |
| return fxl::Concatenate({page_path, kSeparator, kObjectCollection, kSeparator, |
| encoded_object_id}); |
| } |
| |
| std::string GetCommitBatchPath(fxl::StringView page_path, |
| fxl::StringView batch_id) { |
| std::string encoded_batch_id = EncodeKey(batch_id); |
| return fxl::Concatenate({page_path, kSeparator, kCommitLogCollection, |
| kSeparator, encoded_batch_id}); |
| } |
| |
| google::firestore::v1beta1::StructuredQuery MakeCommitQuery( |
| std::unique_ptr<google::protobuf::Timestamp> timestamp_or_null) { |
| google::firestore::v1beta1::StructuredQuery query; |
| |
| // Sub-collections to be queried. |
| google::firestore::v1beta1::StructuredQuery::CollectionSelector& selector = |
| *query.add_from(); |
| selector.set_collection_id(kCommitLogCollection); |
| selector.set_all_descendants(false); |
| |
| // Ordering. |
| google::firestore::v1beta1::StructuredQuery::Order& order_by = |
| *query.add_order_by(); |
| order_by.mutable_field()->set_field_path(kTimestampField); |
| |
| // Filtering. |
| if (timestamp_or_null) { |
| google::firestore::v1beta1::StructuredQuery::Filter& filter = |
| *query.mutable_where(); |
| google::firestore::v1beta1::StructuredQuery::FieldFilter& field_filter = |
| *filter.mutable_field_filter(); |
| |
| field_filter.mutable_field()->set_field_path(kTimestampField); |
| field_filter.set_op( |
| google::firestore::v1beta1:: |
| StructuredQuery_FieldFilter_Operator_GREATER_THAN_OR_EQUAL); |
| field_filter.mutable_value()->mutable_timestamp_value()->Swap( |
| timestamp_or_null.get()); |
| } |
| return query; |
| } |
| |
| } // namespace |
| |
| PageCloudImpl::PageCloudImpl( |
| std::string page_path, rng::Random* random, |
| CredentialsProvider* credentials_provider, |
| FirestoreService* firestore_service, |
| fidl::InterfaceRequest<cloud_provider::PageCloud> request) |
| : page_path_(std::move(page_path)), |
| random_(random), |
| credentials_provider_(credentials_provider), |
| firestore_service_(firestore_service), |
| binding_(this, std::move(request)), |
| weak_ptr_factory_(this) { |
| // The class shuts down when the client connection is disconnected. |
| binding_.set_error_handler([this](zx_status_t status) { |
| if (on_empty_) { |
| on_empty_(); |
| } |
| }); |
| } |
| |
| PageCloudImpl::~PageCloudImpl() {} |
| |
| void PageCloudImpl::ScopedGetCredentials( |
| fit::function<void(std::shared_ptr<grpc::CallCredentials>)> callback) { |
| credentials_provider_->GetCredentials(callback::MakeScoped( |
| weak_ptr_factory_.GetWeakPtr(), std::move(callback))); |
| } |
| |
| void PageCloudImpl::AddCommits(cloud_provider::CommitPack commits, |
| AddCommitsCallback callback) { |
| std::vector<cloud_provider::CommitPackEntry> commit_pack_entries; |
| if (!cloud_provider::DecodeCommitPack(commits, &commit_pack_entries)) { |
| callback(cloud_provider::Status::ARGUMENT_ERROR); |
| return; |
| } |
| |
| auto request = google::firestore::v1beta1::CommitRequest(); |
| request.set_database(firestore_service_->GetDatabasePath()); |
| |
| // Set the document name to a new UUID. Firestore Commit() API doesn't allow |
| // to request the ID to be assigned by the server. |
| const std::string document_name = GetCommitBatchPath( |
| page_path_, convert::ToHex(random_->RandomUniqueBytes())); |
| |
| // The commit batch is added in a single commit containing multiple writes. |
| // |
| // First write adds the document containing the encoded commit batch. |
| google::firestore::v1beta1::Write& add_batch_write = *(request.add_writes()); |
| EncodeCommitBatch(commits, add_batch_write.mutable_update()); |
| (*add_batch_write.mutable_update()->mutable_name()) = document_name; |
| // Ensure that the write doesn't overwrite an existing document. |
| add_batch_write.mutable_current_document()->set_exists(false); |
| |
| // The second write sets the timestamp field to the server-side request |
| // timestamp. |
| google::firestore::v1beta1::Write& set_timestamp_write = |
| *(request.add_writes()); |
| (*set_timestamp_write.mutable_transform()->mutable_document()) = |
| document_name; |
| |
| google::firestore::v1beta1::DocumentTransform_FieldTransform& transform = |
| *(set_timestamp_write.mutable_transform()->add_field_transforms()); |
| *(transform.mutable_field_path()) = kTimestampField; |
| transform.set_set_to_server_value( |
| google::firestore::v1beta1:: |
| DocumentTransform_FieldTransform_ServerValue_REQUEST_TIME); |
| |
| ScopedGetCredentials( |
| [this, request = std::move(request), |
| callback = std::move(callback)](auto call_credentials) mutable { |
| firestore_service_->Commit( |
| std::move(request), std::move(call_credentials), |
| [callback = std::move(callback)](auto status, auto result) { |
| if (LogGrpcRequestError(status)) { |
| callback(ConvertGrpcStatus(status.error_code())); |
| return; |
| } |
| callback(cloud_provider::Status::OK); |
| }); |
| }); |
| } |
| |
| void PageCloudImpl::GetCommits( |
| std::unique_ptr<cloud_provider::Token> min_position_token, |
| GetCommitsCallback callback) { |
| std::unique_ptr<google::protobuf::Timestamp> timestamp_or_null; |
| if (min_position_token) { |
| timestamp_or_null = std::make_unique<google::protobuf::Timestamp>(); |
| if (!timestamp_or_null->ParseFromString( |
| convert::ToString(min_position_token->opaque_id))) { |
| callback(cloud_provider::Status::ARGUMENT_ERROR, nullptr, nullptr); |
| return; |
| } |
| } |
| |
| auto request = google::firestore::v1beta1::RunQueryRequest(); |
| request.set_parent(page_path_); |
| auto query = MakeCommitQuery(std::move(timestamp_or_null)); |
| request.mutable_structured_query()->Swap(&query); |
| |
| ScopedGetCredentials([this, request = std::move(request), |
| callback = std::move(callback)]( |
| auto call_credentials) mutable { |
| firestore_service_->RunQuery( |
| std::move(request), std::move(call_credentials), |
| [callback = std::move(callback)](auto status, auto result) { |
| if (LogGrpcRequestError(status)) { |
| callback(ConvertGrpcStatus(status.error_code()), nullptr, nullptr); |
| return; |
| } |
| |
| std::vector<cloud_provider::CommitPackEntry> commit_entries; |
| std::string timestamp; |
| |
| for (const auto& response : result) { |
| if (!response.has_document()) { |
| continue; |
| } |
| |
| std::vector<cloud_provider::CommitPackEntry> batch_entries; |
| if (!DecodeCommitBatch(response.document(), &batch_entries, |
| ×tamp)) { |
| callback(cloud_provider::Status::PARSE_ERROR, nullptr, nullptr); |
| return; |
| } |
| |
| std::move(batch_entries.begin(), batch_entries.end(), |
| std::back_inserter(commit_entries)); |
| } |
| |
| cloud_provider::CommitPack commit_pack; |
| if (!cloud_provider::EncodeCommitPack(commit_entries, &commit_pack)) { |
| callback(cloud_provider::Status::INTERNAL_ERROR, nullptr, nullptr); |
| return; |
| } |
| |
| std::unique_ptr<cloud_provider::Token> token; |
| if (!commit_entries.empty()) { |
| token = std::make_unique<cloud_provider::Token>(); |
| token->opaque_id = convert::ToArray(timestamp); |
| } |
| callback(cloud_provider::Status::OK, |
| fidl::MakeOptional(std::move(commit_pack)), |
| std::move(token)); |
| }); |
| }); |
| } |
| |
| void PageCloudImpl::AddObject(std::vector<uint8_t> id, |
| fuchsia::mem::Buffer data, |
| AddObjectCallback callback) { |
| std::string data_str; |
| fsl::SizedVmo vmo; |
| if (!fsl::StringFromVmo(data, &data_str) || |
| data_str.size() > kMaxObjectSize) { |
| callback(cloud_provider::Status::ARGUMENT_ERROR); |
| return; |
| } |
| |
| auto request = google::firestore::v1beta1::CreateDocumentRequest(); |
| request.set_parent(page_path_); |
| request.set_collection_id(kObjectCollection); |
| google::firestore::v1beta1::Document* document = request.mutable_document(); |
| request.set_document_id(EncodeKey(convert::ToString(id))); |
| *((*document->mutable_fields())[kDataKey].mutable_bytes_value()) = |
| std::move(data_str); |
| |
| ScopedGetCredentials( |
| [this, request = std::move(request), |
| callback = std::move(callback)](auto call_credentials) mutable { |
| firestore_service_->CreateDocument( |
| std::move(request), std::move(call_credentials), |
| [callback = std::move(callback)](auto status, auto result) { |
| if (status.error_code() == grpc::ALREADY_EXISTS) { |
| callback(cloud_provider::Status::OK); |
| return; |
| } |
| if (LogGrpcRequestError(status)) { |
| callback(ConvertGrpcStatus(status.error_code())); |
| return; |
| } |
| callback(cloud_provider::Status::OK); |
| }); |
| }); |
| } |
| |
| void PageCloudImpl::GetObject(std::vector<uint8_t> id, |
| GetObjectCallback callback) { |
| auto request = google::firestore::v1beta1::GetDocumentRequest(); |
| request.set_name(GetObjectPath(page_path_, convert::ToString(id))); |
| |
| ScopedGetCredentials( |
| [this, request = std::move(request), |
| callback = std::move(callback)](auto call_credentials) mutable { |
| firestore_service_->GetDocument( |
| std::move(request), std::move(call_credentials), |
| [callback = std::move(callback)](auto status, auto result) { |
| if (LogGrpcRequestError(status)) { |
| callback(ConvertGrpcStatus(status.error_code()), nullptr); |
| return; |
| } |
| |
| if (result.fields().count(kDataKey) != 1) { |
| FXL_LOG(ERROR) |
| << "Incorrect format of the retrieved object document"; |
| callback(cloud_provider::Status::PARSE_ERROR, nullptr); |
| return; |
| } |
| |
| const std::string& bytes = |
| result.fields().at(kDataKey).bytes_value(); |
| ::fuchsia::mem::Buffer buffer; |
| if (!fsl::VmoFromString(bytes, &buffer)) { |
| callback(cloud_provider::Status::INTERNAL_ERROR, nullptr); |
| return; |
| } |
| callback(cloud_provider::Status::OK, |
| fidl::MakeOptional(std::move(buffer))); |
| }); |
| }); |
| } |
| |
| void PageCloudImpl::SetWatcher( |
| std::unique_ptr<cloud_provider::Token> min_position_token, |
| fidl::InterfaceHandle<cloud_provider::PageCloudWatcher> watcher, |
| SetWatcherCallback callback) { |
| std::unique_ptr<google::protobuf::Timestamp> timestamp_or_null; |
| if (min_position_token) { |
| timestamp_or_null = std::make_unique<google::protobuf::Timestamp>(); |
| if (!timestamp_or_null->ParseFromString( |
| convert::ToString(min_position_token->opaque_id))) { |
| callback(cloud_provider::Status::ARGUMENT_ERROR); |
| return; |
| } |
| } |
| |
| watcher_ = watcher.Bind(); |
| watcher_.set_error_handler([this](zx_status_t status) { ShutDownWatcher(); }); |
| watcher_timestamp_or_null_ = std::move(timestamp_or_null); |
| set_watcher_callback_ = std::move(callback); |
| |
| ScopedGetCredentials([this](auto call_credentials) mutable { |
| // Initiate the listen RPC. We will receive a call on OnConnected() when the |
| // listen stream is ready. |
| listen_call_handler_ = |
| firestore_service_->Listen(std::move(call_credentials), this); |
| }); |
| } |
| |
| void PageCloudImpl::OnConnected() { |
| auto request = google::firestore::v1beta1::ListenRequest(); |
| request.set_database(firestore_service_->GetDatabasePath()); |
| google::firestore::v1beta1::Target::QueryTarget& query_target = |
| *request.mutable_add_target()->mutable_query(); |
| query_target.set_parent(page_path_); |
| auto query = MakeCommitQuery(std::move(watcher_timestamp_or_null_)); |
| query_target.mutable_structured_query()->Swap(&query); |
| listen_call_handler_->Write(std::move(request)); |
| } |
| |
| void PageCloudImpl::OnResponse( |
| google::firestore::v1beta1::ListenResponse response) { |
| if (response.has_target_change()) { |
| if (response.target_change().target_change_type() == |
| google::firestore::v1beta1::TargetChange_TargetChangeType_CURRENT) { |
| if (set_watcher_callback_) { |
| set_watcher_callback_(cloud_provider::Status::OK); |
| set_watcher_callback_ = nullptr; |
| } |
| } |
| return; |
| } |
| |
| if (response.has_document_change()) { |
| std::string timestamp; |
| |
| std::vector<cloud_provider::CommitPackEntry> commit_entries; |
| if (!DecodeCommitBatch(response.document_change().document(), |
| &commit_entries, ×tamp)) { |
| watcher_->OnError(cloud_provider::Status::PARSE_ERROR); |
| ShutDownWatcher(); |
| } |
| |
| cloud_provider::Token token; |
| token.opaque_id = convert::ToArray(timestamp); |
| HandleCommits(std::move(commit_entries), std::move(token)); |
| } |
| } |
| |
| void PageCloudImpl::OnFinished(grpc::Status status) { |
| if (status.error_code() == grpc::UNAVAILABLE || |
| status.error_code() == grpc::UNAUTHENTICATED) { |
| if (watcher_) { |
| watcher_->OnError(cloud_provider::Status::NETWORK_ERROR); |
| } |
| return; |
| } |
| LogGrpcConnectionError(status); |
| watcher_.Unbind(); |
| } |
| |
| void PageCloudImpl::HandleCommits( |
| std::vector<cloud_provider::CommitPackEntry> commit_entries, |
| cloud_provider::Token token) { |
| std::move(commit_entries.begin(), commit_entries.end(), |
| std::back_inserter(commits_waiting_for_ack_)); |
| token_for_waiting_commits_ = std::move(token); |
| |
| if (!waiting_for_watcher_to_ack_commits_) { |
| SendWaitingCommits(); |
| } |
| } |
| |
| void PageCloudImpl::SendWaitingCommits() { |
| FXL_DCHECK(watcher_); |
| FXL_DCHECK(!commits_waiting_for_ack_.empty()); |
| cloud_provider::Token token = std::move(token_for_waiting_commits_); |
| cloud_provider::CommitPack commit_pack; |
| if (!cloud_provider::EncodeCommitPack(commits_waiting_for_ack_, |
| &commit_pack)) { |
| watcher_->OnError(cloud_provider::Status::INTERNAL_ERROR); |
| ShutDownWatcher(); |
| return; |
| } |
| watcher_->OnNewCommits(std::move(commit_pack), std::move(token), [this] { |
| waiting_for_watcher_to_ack_commits_ = false; |
| if (!commits_waiting_for_ack_.empty()) { |
| SendWaitingCommits(); |
| } |
| }); |
| waiting_for_watcher_to_ack_commits_ = true; |
| commits_waiting_for_ack_.clear(); |
| } |
| |
| void PageCloudImpl::ShutDownWatcher() { |
| if (watcher_) { |
| watcher_.Unbind(); |
| } |
| if (listen_call_handler_) { |
| listen_call_handler_.reset(); |
| } |
| } |
| |
| } // namespace cloud_provider_firestore |