blob: 189961763f1e435d4739851ac6c601621dc1feef [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/cloud_provider_firestore/firestore/listen_call.h"
namespace cloud_provider_firestore {
namespace {
class ListenCallHandlerImpl : public ListenCallHandler {
public:
explicit ListenCallHandlerImpl(fxl::WeakPtr<ListenCall> call) : call_(call) {}
~ListenCallHandlerImpl() override {
if (!call_) {
return;
}
call_->OnHandlerGone();
}
void Write(google::firestore::v1beta1::ListenRequest request) override {
// It's an arror to call Write() after OnFinished() is delivered to the
// client (which happens before |call_| is deleted).
FXL_DCHECK(call_);
call_->Write(std::move(request));
}
private:
fxl::WeakPtr<ListenCall> const call_;
FXL_DISALLOW_COPY_AND_ASSIGN(ListenCallHandlerImpl);
};
} // namespace
ListenCall::ListenCall(ListenCallClient* client,
std::unique_ptr<grpc::ClientContext> context,
std::unique_ptr<ListenStream> stream)
: client_(client),
context_(std::move(context)),
stream_(std::move(stream)),
stream_controller_(stream_.get()),
stream_reader_(stream_.get()),
stream_writer_(stream_.get()),
weak_ptr_factory_(this) {
// Configure reading from the stream.
stream_reader_.SetOnError([this] { FinishIfNeeded(); });
stream_reader_.SetOnMessage(
[this](google::firestore::v1beta1::ListenResponse response) {
if (CheckEmpty()) {
return;
}
client_->OnResponse(std::move(response));
if (finish_requested_) {
return;
}
stream_reader_.Read();
});
// Configure writing to the stream.
stream_writer_.SetOnError([this] { FinishIfNeeded(); });
stream_writer_.SetOnSuccess([this] { CheckEmpty(); });
// Finally, start the stream.
stream_controller_.StartCall([this](bool ok) {
if (!ok) {
FXL_LOG(ERROR) << "Failed to establish the stream.";
HandleFinished(grpc::Status(grpc::StatusCode::UNKNOWN, "unknown"));
return;
}
if (CheckEmpty()) {
return;
}
// Notify the client that the connection is now established and start
// reading the server stream.
connected_ = true;
client_->OnConnected();
stream_reader_.Read();
});
}
ListenCall::~ListenCall() { FXL_DCHECK(IsEmpty()); }
void ListenCall::Write(google::firestore::v1beta1::ListenRequest request) {
// It's only valid to perform a write after the connection was established,
// and before the Finish() call was made.
FXL_DCHECK(connected_);
FXL_DCHECK(!finish_requested_);
stream_writer_.Write(std::move(request));
}
void ListenCall::OnHandlerGone() {
// Unset the |client_| pointer, so that no client notifications are made
// after the handler is deleted.
client_ = nullptr;
context_->TryCancel();
CheckEmpty();
}
std::unique_ptr<ListenCallHandler> ListenCall::MakeHandler() {
return std::make_unique<ListenCallHandlerImpl>(
weak_ptr_factory_.GetWeakPtr());
}
void ListenCall::FinishIfNeeded() {
if (!finish_requested_ && client_) {
Finish();
return;
}
CheckEmpty();
}
void ListenCall::Finish() {
FXL_DCHECK(!finish_requested_);
finish_requested_ = true;
stream_controller_.Finish([this](bool ok, grpc::Status status) {
if (!client_) {
CheckEmpty();
return;
}
if (!ok) {
FXL_LOG(ERROR) << "Failed to retrieve the final status of the stream";
HandleFinished(grpc::Status(grpc::StatusCode::UNKNOWN, "unknown"));
return;
}
HandleFinished(status);
});
}
void ListenCall::HandleFinished(grpc::Status status) {
if (client_) {
client_->OnFinished(status);
// No client notifications can be delivered after |OnFinished|.
client_ = nullptr;
}
CheckEmpty();
}
bool ListenCall::IsEmpty() {
return client_ == nullptr && stream_controller_.IsEmpty() &&
stream_reader_.IsEmpty() && stream_writer_.IsEmpty();
}
bool ListenCall::CheckEmpty() {
if (!IsEmpty()) {
return false;
}
if (on_empty_) {
on_empty_();
}
return true;
}
} // namespace cloud_provider_firestore