blob: 3087cbfc04da974140897bb92339470b90bddf95 [file] [log] [blame]
* Copyright 2018 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
namespace grpc {
namespace load_reporter {
// Async load reporting service. It's mainly responsible for controlling the
// procedure of incoming requests. The real business logic is handed off to the
// LoadReporter. There should be at most one instance of this service on a
// server to avoid spreading the load data into multiple places.
class LoadReporterAsyncServiceImpl
: public grpc::lb::v1::LoadReporter::AsyncService {
explicit LoadReporterAsyncServiceImpl(
std::unique_ptr<ServerCompletionQueue> cq);
// Starts the working thread.
void StartThread();
// Not copyable nor movable.
LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
class ReportLoadHandler;
// A tag that can be called with a bool argument. It's tailored for
// ReportLoadHandler's use. Before being used, it should be constructed with a
// method of ReportLoadHandler and a shared pointer to the handler. The
// shared pointer will be moved to the invoked function and the function can
// only be invoked once. That makes ref counting of the handler easier,
// because the shared pointer is not bound to the function and can be gone
// once the invoked function returns (if not used any more).
class CallableTag {
using HandlerFunction =
std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
CallableTag() {}
CallableTag(HandlerFunction func,
std::shared_ptr<ReportLoadHandler> handler)
: handler_function_(std::move(func)), handler_(std::move(handler)) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
// Runs the tag. This should be called only once. The handler is no longer
// owned by this tag after this method is invoked.
void Run(bool ok);
// Releases and returns the shared pointer to the handler.
std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
return std::move(handler_);
HandlerFunction handler_function_ = nullptr;
std::shared_ptr<ReportLoadHandler> handler_;
// Each handler takes care of one load reporting stream. It contains
// per-stream data and it will access the members of the parent class (i.e.,
// LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
class ReportLoadHandler {
// Instantiates a ReportLoadHandler and requests the next load reporting
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
LoadReporterAsyncServiceImpl* service,
LoadReporter* load_reporter);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
ReportLoadHandler(ServerCompletionQueue* cq,
LoadReporterAsyncServiceImpl* service,
LoadReporter* load_reporter);
// After the handler has a call request delivered, it starts reading the
// initial request. Also, a new handler is spawned so that we can keep
// servicing future calls.
void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
// The first Read() is expected to succeed, after which the handler starts
// sending load reports back to the balancer. The second Read() is
// expected to fail, which happens when the balancer half-closes the
// stream to signal that it's no longer interested in the load reports. For
// the latter case, the handler will then close the stream.
void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
// The report sending operations are sequential as: send report -> send
// done, schedule the next send -> waiting for the alarm to fire -> alarm
// fires, send report -> ...
void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
// The key fields of the stream.
grpc::string lb_id_;
grpc::string load_balanced_hostname_;
grpc::string load_key_;
uint64_t load_report_interval_ms_;
// The data for RPC communication with the load reportee.
ServerContext ctx_;
::grpc::lb::v1::LoadReportRequest request_;
// The members passed down from LoadReporterAsyncServiceImpl.
ServerCompletionQueue* cq_;
LoadReporterAsyncServiceImpl* service_;
LoadReporter* load_reporter_;
// The status of the RPC progress.
enum CallStatus {
} call_status_;
bool shutdown_{false};
bool done_notified_{false};
bool is_cancelled_{false};
CallableTag on_done_notified_;
CallableTag on_finish_done_;
CallableTag next_inbound_;
CallableTag next_outbound_;
std::unique_ptr<Alarm> next_report_alarm_;
// Handles the incoming requests and drives the completion queue in a loop.
static void Work(void* arg);
// Schedules the next data fetching from Census and LB feedback sampling.
void ScheduleNextFetchAndSample();
// Fetches data from Census and samples LB feedback.
void FetchAndSample(bool ok);
std::unique_ptr<ServerCompletionQueue> cq_;
// To synchronize the operations related to shutdown state of cq_, so that we
// don't enqueue new tags into cq_ after it is already shut down.
grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_;
std::unique_ptr<LoadReporter> load_reporter_;
std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
} // namespace load_reporter
} // namespace grpc