blob: ba0af98b676635e3c0088386f314f3ffee1a4efc [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "analyzer/report_master/report_executor.h"
#include <atomic>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "glog/logging.h"
#include "util/log_based_metrics.h"
namespace cobalt {
namespace analyzer {
using store::ReportStore;
// Stackdriver metric constants
namespace {
const char kCheckReportIdChainFailure[] = "check-report-id-chain-failure";
const char kCheckQueueSizeFailure[] =
"report-executor-check-queue-size-failure";
const char kEnqueueFailure[] = "report-executor-enqueue-failure";
const char kProcessDependencyChainFailure[] =
"report-executor-process-dependency-chain-failure";
const char kProcessReportIdFailure[] =
"report-executor-process-report-id-failure";
const char kGetMetadataFailure[] = "report-executor-get-metadata-failure";
const char kStartDependentReportFailure[] =
"report-executor-start-dependent-report-failure";
const char kEndReportFailure[] = "report-executor-end-report-failure";
} // namespace
namespace {
// If the worker queue grows larger than this we will stop accepting new
// Enqueue requests.
const size_t kMaxQueueSize = 50000;
// Checks that report_id_chain is not empty and contains only complete
// ReportIds.
grpc::Status CheckReportIdChain(const std::vector<ReportId>& report_id_chain) {
if (report_id_chain.empty()) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckReportIdChainFailure)
<< "report_id_chain is empty";
return grpc::Status(grpc::INVALID_ARGUMENT, "report_id_chain is empty");
}
for (const ReportId& report_id : report_id_chain) {
// When a client first creates a ReportId it is incomplete because
// instance_id and creation_time_seconds are not set. These values are only
// set by virtue of the client invoking ReportStore::StartNewReport(),
// thereby creating a complete ReportId.
if (report_id.instance_id() == 0 ||
report_id.creation_time_seconds() == 0) {
std::ostringstream stream;
stream << "Not a complete ReportId: " << ReportStore::ToString(report_id);
std::string message = stream.str();
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckReportIdChainFailure)
<< message;
return grpc::Status(grpc::INVALID_ARGUMENT, message);
}
}
return grpc::Status::OK;
}
} // namespace
ReportExecutor::ReportExecutor(
std::shared_ptr<store::ReportStore> report_store,
std::unique_ptr<ReportGenerator> report_generator)
: report_store_(report_store),
report_generator_(std::move(report_generator)),
shut_down_(false) {}
void ReportExecutor::Start() {
{
std::lock_guard<std::mutex> lock(mutex_);
// We set idle_ to false since we are about to start the worker thread.
// The worker thread will set idle_ to true just before it becomes
// idle.
idle_ = false;
}
std::thread t([this] { this->Run(); });
worker_thread_ = std::move(t);
}
ReportExecutor::~ReportExecutor() {
if (!worker_thread_.joinable()) {
return;
}
{
std::lock_guard<std::mutex> lock(mutex_);
shut_down_ = true;
worker_notifier_.notify_all();
}
worker_thread_.join();
}
grpc::Status ReportExecutor::EnqueueReportGeneration(
std::vector<ReportId> report_id_chain) {
auto status = CheckReportIdChain(report_id_chain);
if (!status.ok()) {
return status;
}
status = CheckQueueSize();
if (!status.ok()) {
return status;
}
return Enqueue(std::move(report_id_chain));
}
grpc::Status ReportExecutor::CheckQueueSize() {
bool too_long = false;
{
std::lock_guard<std::mutex> lock(mutex_);
too_long = work_queue_.size() >= kMaxQueueSize;
}
if (too_long) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckQueueSizeFailure)
<< "Work queue too long!";
return grpc::Status(grpc::ABORTED,
"Can't enqueue reports: queue too long!");
}
return grpc::Status::OK;
}
grpc::Status ReportExecutor::Enqueue(std::vector<ReportId> report_id_chain) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (shut_down_) {
std::string message = "Shutting down. Not enqueuing.";
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kEnqueueFailure) << message;
return grpc::Status(grpc::ABORTED, message);
}
work_queue_.emplace_back(std::move(report_id_chain));
// Set idle_ false because any thread that invokes WaitUntilIdle() after
// this should wait until the |report_id_chain| just enqueued is
// processed.
idle_ = false;
}
worker_notifier_.notify_all();
return grpc::Status::OK;
}
void ReportExecutor::Run() {
while (!shut_down_) {
std::vector<ReportId> dependency_chain;
if (!WaitAndTakeFirst(&dependency_chain)) {
return;
}
ProcessDependencyChain(dependency_chain);
}
}
bool ReportExecutor::WaitAndTakeFirst(std::vector<ReportId>* chain_out) {
CHECK(chain_out);
std::unique_lock<std::mutex> lock(mutex_);
if (shut_down_) {
return false;
}
if (work_queue_.empty()) {
// Notify observers that the the worker thread is now idle;
idle_ = true;
idle_notifier_.notify_all();
// Wait until the condition variable is notified and either shut_down_
// is set or the work_queue_ is not empty.
worker_notifier_.wait(lock, [this] {
return (this->shut_down_ || !this->work_queue_.empty());
});
}
idle_ = false;
if (shut_down_) {
return false;
}
CHECK(!work_queue_.empty());
chain_out->swap(work_queue_[0]);
work_queue_.pop_front();
return true;
}
void ReportExecutor::WaitUntilIdle() {
std::unique_lock<std::mutex> lock(mutex_);
if (idle_) {
return;
}
// Wait until the condition variable is notified and idle_ is true.
idle_notifier_.wait(lock, [this] { return (this->idle_); });
}
void ReportExecutor::ProcessDependencyChain(
const std::vector<ReportId>& chain) {
DCHECK(!chain.empty());
bool chain_failed = false;
for (const ReportId& report_id : chain) {
if (shut_down_) {
LOG(INFO) << "Shutting down.";
return;
}
if (chain_failed) {
std::ostringstream stream;
stream << "Skipping report generation for report_id="
<< ReportStore::ToString(report_id)
<< " because an earlier report in its dependency chain failed.";
std::string message = stream.str();
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kProcessDependencyChainFailure)
<< message;
EndReport(report_id, false, message);
} else {
chain_failed = !ProcessReportId(report_id);
}
}
}
bool ReportExecutor::ProcessReportId(const ReportId& report_id) {
ReportMetadataLite metadata;
if (!GetMetadata(report_id, &metadata)) {
EndReport(report_id, false, "Unable to fetch metadata for report.");
return false;
}
switch (metadata.state()) {
case WAITING_TO_START: {
if (!StartDependentReport(report_id)) {
EndReport(report_id, false, "Unable to start dependent report.");
return false;
}
break;
}
case IN_PROGRESS:
break;
default: {
// Already in a terminal state.
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kProcessReportIdFailure)
<< "Unexpected state: " << metadata.state()
<< " for report_id=" << ReportStore::ToString(report_id);
return false;
}
}
auto status = report_generator_->GenerateReport(report_id);
std::string message = (status.ok() ? "" : status.error_message());
// End the report and then return true only if both GenerateReport and
// EndReport succeeded.
return EndReport(report_id, status.ok(), message) && status.ok();
}
bool ReportExecutor::GetMetadata(const ReportId& report_id,
ReportMetadataLite* metadata_out) {
auto status = report_store_->GetMetadata(report_id, metadata_out);
if (status != store::kOK) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kGetMetadataFailure)
<< "GetMetadata failed with status=" << status
<< " for report_id=" << ReportStore::ToString(report_id);
return false;
}
return true;
}
bool ReportExecutor::StartDependentReport(const ReportId& report_id) {
auto status = report_store_->StartDependentReport(report_id);
if (status != store::kOK) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kStartDependentReportFailure)
<< "StartDependentReport failed with status=" << status
<< " for report_id=" << ReportStore::ToString(report_id);
return false;
}
return true;
}
bool ReportExecutor::EndReport(const ReportId& report_id, bool success,
std::string message) {
auto status =
report_store_->EndReport(report_id, success, std::move(message));
if (status != store::kOK) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kEndReportFailure)
<< "EndReport failed with status=" << status
<< " for report_id=" << ReportStore::ToString(report_id);
return false;
}
return true;
}
} // namespace analyzer
} // namespace cobalt