blob: fca23fc3b603912613b99543fd90e176f2ff2016 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "analyzer/report_master/report_scheduler.h"
#include <iomanip>
#include <string>
#include <utility>
#include "config/encodings.pb.h"
#include "config/metrics.pb.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "util/clock.h"
#include "util/datetime_util.h"
#include "util/log_based_metrics.h"
namespace cobalt {
namespace analyzer {
using config::ReportRegistry;
using util::SystemClock;
using util::TimeToDayIndex;
DEFINE_uint32(daily_report_makeup_days, 30,
"The number of days in the past that the ReportMaster should "
"look to find missed scheduled reports to make up. Must be less "
"than 100 or we will CHECK fail.");
// Stackdriver metric constants
namespace {
const char kStartReportNowFailure[] = "report-scheduler-start-report-failure";
const char kProcessReportFailure[] =
"report-scheduler-process-report-failure-for-";
const std::string ProcessReportFailMetricName(const std::string& suffix) {
std::ostringstream stream;
stream << kProcessReportFailure;
stream << suffix;
return stream.str();
}
} // namespace
namespace {
// Returns a human-readable respresentation of the report config ID.
// Used in forming error messages.
// TODO(rudominer) This function has been copied multiple times throughout the
// code. We should centralize it in a utility.
std::string IdString(const ReportConfig& report_config) {
std::ostringstream stream;
stream << "(" << report_config.customer_id() << ","
<< report_config.project_id() << "," << report_config.id() << ")";
return stream.str();
}
// Builds a string of the form YYYYMMDD to represent the date corresponding to
// the given day_index. This is the standard format used at Google for the
// suffix of a file name for files containing data for dated tables.
std::string DateSuffix(uint32_t day_index) {
util::CalendarDate cd = util::DayIndexToCalendarDate(day_index);
std::ostringstream stream;
stream << std::setfill('0') << std::setw(4) << cd.year << std::setw(2)
<< cd.month << std::setw(2) << cd.day_of_month;
return stream.str();
}
} // namespace
ReportScheduler::ReportScheduler(
std::shared_ptr<config::AnalyzerConfigManager> config_manager,
std::shared_ptr<store::ReportStore> report_store,
std::shared_ptr<ReportStarterInterface> report_starter,
std::chrono::milliseconds sleep_interval)
: clock_(new SystemClock()),
config_manager_(config_manager),
report_starter_(report_starter),
report_history_(new ReportHistoryCache(
CurrentDayIndex() - FLAGS_daily_report_makeup_days, report_store)),
sleep_interval_(sleep_interval),
shut_down_(false) {
CHECK_LT(FLAGS_daily_report_makeup_days, 100);
}
ReportScheduler::~ReportScheduler() {
{
std::lock_guard<std::mutex> lock(mutex_);
shut_down_ = true;
if (!scheduler_thread_.joinable()) {
return;
}
}
scheduler_thread_.join();
}
void ReportScheduler::Start() {
std::lock_guard<std::mutex> lock(mutex_);
std::thread t([this] { this->Run(); });
scheduler_thread_ = std::move(t);
}
void ReportScheduler::Run() {
while (!shut_down_) {
Sleep();
if (shut_down_) {
return;
}
ProcessReports();
}
}
void ReportScheduler::Sleep() {
// Note: We invoke the real system clock here, not clock_->now().
// This is because even in a test we want to use the real system clock to
// compute wakeup_time because std::condition_varaible::wait_until() always
// uses the real system clock. A test is able to control the sleep time by
// setting the value of sleep_interval_.
auto wakeup_time = std::chrono::system_clock::now() + sleep_interval_;
VLOG(3) << "ReportScheduler sleeping for " << sleep_interval_.count() << "ms";
std::unique_lock<std::mutex> lock(mutex_);
// Sleep until wakeup_time or shut_down_ = true.
shut_down_notifier_.wait_until(lock, wakeup_time,
[this] { return this->shut_down_.load(); });
}
void ReportScheduler::ProcessReports() {
static const int kTimeoutSeconds = 60;
config_manager_->Update(kTimeoutSeconds);
uint32_t current_day_index = CurrentDayIndex();
std::shared_ptr<config::ReportRegistry> report_registry =
config_manager_->GetCurrent()->report_registry();
for (const ReportConfig& report_config : *report_registry) {
if (shut_down_) {
return;
}
ProcessOneReport(report_config, current_day_index);
}
}
void ReportScheduler::ProcessOneReport(const ReportConfig& report_config,
uint32_t current_day_index) {
LOG(INFO) << "ReportScheduler processing report_config "
<< IdString(report_config);
if (!report_config.has_scheduling()) {
LOG(INFO) << "Skpping report_config " << IdString(report_config)
<< " because it has no SchedulingConfig.";
return;
}
switch (report_config.scheduling().aggregation_epoch_type()) {
case DAY:
ProcessDailyReport(report_config, current_day_index);
return;
case WEEK:
ProcessWeeklyReport(report_config, current_day_index);
return;
case MONTH:
ProcessMonthlyReport(report_config, current_day_index);
return;
default: {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("one"))
<< "Unrecognized aggregatoin_epoch_type: "
<< report_config.scheduling().aggregation_epoch_type()
<< "In ReportConfig " << IdString(report_config);
return;
}
}
}
void ReportScheduler::ProcessDailyReport(const ReportConfig& report_config,
uint32_t current_day_index) {
// Look back a number of days equal to the maximum of daily_report_makeup_days
// and report_finalization_days.
auto scheduling = report_config.scheduling();
if (report_config.scheduling().report_finalization_days() > 20) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("daily"))
<< "Invalid ReportConfig: " << IdString(report_config)
<< " report_finalization_days too large: "
<< report_config.scheduling().report_finalization_days();
return;
}
uint32_t finalization_days = scheduling.report_finalization_days();
uint32_t lookback_days = FLAGS_daily_report_makeup_days >= finalization_days
? FLAGS_daily_report_makeup_days
: finalization_days;
uint32_t period_start =
(current_day_index >= lookback_days ? (current_day_index - lookback_days)
: 0u);
VLOG(4) << "ReportScheduler considering days in the interval ["
<< period_start << ", " << current_day_index << "]";
for (uint32_t day_index = period_start; day_index <= current_day_index;
day_index++) {
if (shut_down_) {
return;
}
if (ShouldStartDailyReportNow(report_config, day_index,
current_day_index)) {
StartReportNow(report_config, day_index, day_index);
} else {
VLOG(4) << "ShouldStartDailyReportNow() returned false for report_config "
<< IdString(report_config) << " day_index=" << day_index
<< " current_day_index=" << current_day_index;
}
}
}
void ReportScheduler::ProcessWeeklyReport(const ReportConfig& report_config,
uint32_t current_day_index) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("weekly"))
<< "Scheduling of weekly reports is not yet implemented. ReportConfig: "
<< IdString(report_config);
return;
}
void ReportScheduler::ProcessMonthlyReport(const ReportConfig& report_config,
uint32_t current_day_index) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("monthly"))
<< "Scheduling of monthly reports is not yet implemented. ReportConfig: "
<< IdString(report_config);
return;
}
bool ReportScheduler::ShouldStartDailyReportNow(
const ReportConfig& report_config, uint32_t day_index,
uint32_t current_day_index) {
if (day_index > current_day_index) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("daily"))
<< "Unexpected condition: " << day_index
<< " = day_index > current_day_index = " << current_day_index
<< " for ReportConfig " << IdString(report_config);
return false;
}
if (day_index > current_day_index -
report_config.scheduling().report_finalization_days()) {
// We want to generate the report repeatedly during the report finalization
// period, but we don't want to start it again now if we previously started
// it and that hasn't completed.
return !report_history_->InProgress(report_config, day_index, day_index);
}
// After the report finalization period we only want to run the report once.
// If it was ever successfully completed don't run it again. Also if we
// previously started the report and that attempt hasn't finished yet,
// don't start it again.
return !(report_history_->CompletedSuccessfullyOrInProgress(
report_config, day_index, day_index));
}
void ReportScheduler::StartReportNow(const ReportConfig& report_config,
uint32_t first_day_index,
uint32_t last_day_index) {
const std::string export_name =
ReportExportName(report_config, first_day_index, last_day_index);
// We want to store the rows of the report in the report store in all cases
// except when it is a RAW_DUMP report.
bool in_store = (report_config.report_type() != RAW_DUMP);
ReportId report_id;
LOG(INFO) << "ReportScheduler starting report " << IdString(report_config)
<< " [" << first_day_index << ", " << last_day_index << "]";
auto status = report_starter_->StartReport(report_config, first_day_index,
last_day_index, export_name,
in_store, &report_id);
if (!status.ok()) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kStartReportNowFailure)
<< "ReportScheduler was unable to start a report for ReportConfig "
<< IdString(report_config) << " first_day_index=" << first_day_index
<< " last_day_index=" << last_day_index
<< " error code=" << status.error_code()
<< " error message=" << status.error_message();
return;
}
report_history_->RecordStart(report_config, first_day_index, last_day_index,
report_id);
}
std::string ReportScheduler::ReportExportName(const ReportConfig& report_config,
uint32_t first_day_index,
uint32_t last_day_index) {
std::ostringstream stream;
stream << "report_" << report_config.customer_id() << "_"
<< report_config.project_id() << "_" << report_config.id() << "_"
<< DateSuffix(first_day_index);
if (last_day_index != first_day_index) {
stream << "_" << DateSuffix(last_day_index);
}
return stream.str();
}
uint32_t ReportScheduler::CurrentDayIndex() {
CHECK(clock_);
std::time_t current_time =
std::chrono::system_clock::to_time_t(clock_->now());
return TimeToDayIndex(current_time, Metric::UTC);
}
ReportStarter::ReportStarter(ReportMasterService* report_master_service)
: report_master_service_(report_master_service) {}
grpc::Status ReportStarter::StartReport(const ReportConfig& report_config,
uint32_t first_day_index,
uint32_t last_day_index,
const std::string& export_name,
bool in_store,
ReportId* report_id_out) {
StartReportRequest start_request;
start_request.set_customer_id(report_config.customer_id());
start_request.set_project_id(report_config.project_id());
start_request.set_report_config_id(report_config.id());
start_request.set_first_day_index(first_day_index);
start_request.set_last_day_index(last_day_index);
StartReportResponse response_not_used;
// This is not a one-off report generation. Rather it is scheduled.
bool one_off = false;
return report_master_service_->StartReportNoAuth(
&start_request, one_off, export_name, in_store, report_id_out,
&response_not_used);
}
} // namespace analyzer
} // namespace cobalt