| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "analyzer/report_master/report_scheduler.h" |
| |
| #include <condition_variable> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "analyzer/report_master/report_master_service.h" |
| #include "analyzer/store/memory_store.h" |
| #include "config/config_text_parser.h" |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| #include "third_party/googletest/googletest/include/gtest/gtest.h" |
| #include "util/datetime_util.h" |
| |
| namespace cobalt { |
| namespace analyzer { |
| |
| // This flag controls the number of days in the past that the ReportScheduler |
| // will look for reports that were supposed to be run but were not. |
| DECLARE_uint32(daily_report_makeup_days); |
| |
| using config::EncodingRegistry; |
| using config::MetricRegistry; |
| using config::ReportRegistry; |
| using store::DataStore; |
| using store::MemoryStore; |
| using store::ReportStore; |
| using util::IncrementingClock; |
| |
| namespace { |
| const uint32_t kFirstDayIndex = 12345; |
| const uint32_t kStartingTimeSeconds = |
| kFirstDayIndex * util::kNumUnixSecondsPerDay; |
| const uint32_t kTenMinutes = 600; |
| |
| const uint32_t kCustomerId = 1; |
| const uint32_t kProjectId = 1; |
| const uint32_t kReportConfigId = 42; |
| const uint32_t kReportConfigId2 = 43; |
| const uint32_t kReportConfigId3 = 44; |
| const uint32_t kReportConfigId4 = 45; |
| const uint32_t kReportFinalizationDays = 3; |
| const uint32_t kReportFinalizationDays2 = 2; |
| const uint32_t kReportFinalizationDays3 = 1; |
| const uint32_t kReportFinalizationDays4 = 0; |
| const char* kReportConfigText = R"( |
| element { |
| customer_id: 1 |
| project_id: 1 |
| id: 42 |
| metric_id: 1 |
| report_type: HISTOGRAM |
| scheduling { |
| report_finalization_days: 3 |
| aggregation_epoch_type: DAY |
| } |
| } |
| |
| element { |
| customer_id: 1 |
| project_id: 1 |
| id: 43 |
| metric_id: 1 |
| report_type: HISTOGRAM |
| scheduling { |
| report_finalization_days: 2 |
| aggregation_epoch_type: DAY |
| } |
| } |
| |
| element { |
| customer_id: 1 |
| project_id: 1 |
| id: 44 |
| metric_id: 1 |
| report_type: HISTOGRAM |
| scheduling { |
| report_finalization_days: 1 |
| aggregation_epoch_type: DAY |
| } |
| } |
| |
| element { |
| customer_id: 1 |
| project_id: 1 |
| id: 45 |
| metric_id: 1 |
| report_type: HISTOGRAM |
| scheduling { |
| report_finalization_days: 0 |
| aggregation_epoch_type: DAY |
| } |
| } |
| |
| )"; |
| } // namespace |
| |
| // An implementation of ReportStarterInterface that registers reports in |
| // the ReportStore as started (and optionally as completed) but does not |
| // actually run any reports. It also records the values of all of the |
| // parameters it was invoked with for checking by a test. |
| class FakeReportStarter : public ReportStarterInterface { |
| public: |
| explicit FakeReportStarter(std::shared_ptr<ReportStore> report_store) |
| : report_store_(report_store) {} |
| virtual ~FakeReportStarter() = default; |
| |
| grpc::Status StartReport(const ReportConfig& report_config, |
| uint32_t first_day_index, uint32_t last_day_index, |
| const std::string& export_name, bool in_store, |
| ReportId* report_id_out) override { |
| report_id_out->Clear(); |
| report_id_out->set_customer_id(kCustomerId); |
| report_id_out->set_project_id(kProjectId); |
| report_id_out->set_report_config_id(report_config.id()); |
| EXPECT_EQ(store::kOK, |
| report_store_->StartNewReport( |
| first_day_index, last_day_index, false, export_name, in_store, |
| report_config.report_type(), {0}, report_id_out)); |
| if (should_complete_reports_) { |
| EXPECT_EQ(store::kOK, report_store_->EndReport(*report_id_out, true, "")); |
| } |
| started_report_ids_.push_back(*report_id_out); |
| first_day_indices_.push_back(first_day_index); |
| last_day_indices_.push_back(last_day_index); |
| export_names_.push_back(export_name); |
| if (notifier_f_) { |
| notifier_f_(started_report_ids_.size()); |
| } |
| return grpc::Status::OK; |
| } |
| |
| std::vector<ReportId> TakeStartedReportIds() { |
| auto temp = std::move(started_report_ids_); |
| started_report_ids_.clear(); |
| return temp; |
| } |
| |
| std::vector<uint32_t> TakeFirstDayIndices() { |
| auto temp = std::move(first_day_indices_); |
| first_day_indices_.clear(); |
| return temp; |
| } |
| |
| std::vector<uint32_t> TakeLastDayIndices() { |
| auto temp = std::move(last_day_indices_); |
| last_day_indices_.clear(); |
| return temp; |
| } |
| |
| std::vector<std::string> TakeExportNames() { |
| auto temp = std::move(export_names_); |
| export_names_.clear(); |
| return temp; |
| } |
| |
| void set_notifier_func(std::function<void(size_t)> f) { notifier_f_ = f; } |
| void set_should_complete_reports(bool b) { should_complete_reports_ = b; } |
| |
| private: |
| std::vector<ReportId> started_report_ids_; |
| std::vector<uint32_t> first_day_indices_; |
| std::vector<uint32_t> last_day_indices_; |
| std::vector<std::string> export_names_; |
| std::shared_ptr<ReportStore> report_store_; |
| std::function<void(size_t)> notifier_f_; |
| bool should_complete_reports_ = false; |
| }; |
| |
| class ReportSchedulerTest : public ::testing::Test { |
| public: |
| void SetUp() { |
| auto report_parse_result = |
| config::FromString<RegisteredReports>(kReportConfigText, nullptr); |
| EXPECT_EQ(config::kOK, report_parse_result.second); |
| report_registry_.reset((report_parse_result.first.release())); |
| |
| auto encoding_parse_result = |
| config::FromString<RegisteredEncodings>("", nullptr); |
| EXPECT_EQ(config::kOK, encoding_parse_result.second); |
| std::shared_ptr<config::EncodingRegistry> encoding_registry( |
| encoding_parse_result.first.release()); |
| |
| auto metric_parse_result = |
| config::FromString<RegisteredMetrics>("", nullptr); |
| EXPECT_EQ(config::kOK, metric_parse_result.second); |
| std::shared_ptr<config::MetricRegistry> metric_registry( |
| metric_parse_result.first.release()); |
| |
| // Make an AnalyzerConfig |
| std::shared_ptr<config::AnalyzerConfig> analyzer_config( |
| new config::AnalyzerConfig(encoding_registry, metric_registry, |
| report_registry_)); |
| |
| // Make an AnalyzerConfigManager |
| std::shared_ptr<config::AnalyzerConfigManager> analyzer_config_manager( |
| new config::AnalyzerConfigManager(analyzer_config)); |
| |
| data_store_.reset(new MemoryStore()); |
| data_store_->DeleteAllRows(DataStore::kReportMetadata); |
| report_store_.reset(new ReportStore(data_store_)); |
| report_starter_.reset(new FakeReportStarter(report_store_)); |
| scheduler_.reset(new ReportScheduler(analyzer_config_manager, report_store_, |
| report_starter_, |
| std::chrono::milliseconds(1))); |
| clock_.reset(new IncrementingClock()); |
| clock_->set_time(util::FromUnixSeconds(kStartingTimeSeconds)); |
| clock_->set_increment(std::chrono::seconds(1)); |
| scheduler_->clock_ = clock_; |
| report_store_->set_clock(clock_); |
| } |
| |
| const ReportConfig* GetReportConfig() { |
| return report_registry_->Get(kCustomerId, kProjectId, kReportConfigId); |
| } |
| |
| // Performs the main logic for the ProcessOneReport test below. |
| std::vector<ReportId> DoProcessOneReportTest( |
| uint32_t current_day_index, std::vector<uint32_t> expected_day_indices) { |
| ProcessOneReport(*GetReportConfig(), current_day_index); |
| auto started_report_ids = report_starter_->TakeStartedReportIds(); |
| auto first_day_indices = report_starter_->TakeFirstDayIndices(); |
| auto last_day_indices = report_starter_->TakeLastDayIndices(); |
| auto export_names = report_starter_->TakeExportNames(); |
| size_t expected_num = expected_day_indices.size(); |
| EXPECT_EQ(expected_num, started_report_ids.size()); |
| EXPECT_EQ(expected_num, first_day_indices.size()); |
| EXPECT_EQ(expected_num, last_day_indices.size()); |
| EXPECT_EQ(expected_num, export_names.size()); |
| for (size_t i = 0; i < expected_num; i++) { |
| uint32_t expected_day_index = expected_day_indices[i]; |
| std::ostringstream stream; |
| stream << "report_1_1_42_"; |
| util::CalendarDate cd = util::DayIndexToCalendarDate(expected_day_index); |
| stream << cd.year << (cd.month / 10) << (cd.month % 10) |
| << (cd.day_of_month / 10) << (cd.day_of_month % 10); |
| std::string expected_export_name = stream.str(); |
| EXPECT_EQ(expected_day_index, first_day_indices[i]); |
| EXPECT_EQ(expected_day_index, last_day_indices[i]); |
| EXPECT_EQ(expected_export_name, export_names[i]); |
| } |
| return started_report_ids; |
| } |
| |
| void SetSchedulerClock(std::shared_ptr<util::ClockInterface> clock) { |
| scheduler_->clock_ = clock; |
| } |
| |
| // Performs the logic for checking the results at the end of DoRunTest() |
| // below. |
| void CheckRunResults(uint32_t report_config_id, uint32_t finalization_days) { |
| // Query for all instances of the given report config. |
| auto response = report_store_->QueryReports( |
| kCustomerId, kProjectId, report_config_id, 0, UINT64_MAX, 10000, ""); |
| EXPECT_EQ(store::kOK, response.status); |
| |
| // Accumulate the counts of the number of instances of the report config for |
| // each day. |
| std::map<uint32_t, size_t> day_counts; |
| for (const auto& r : response.results) { |
| day_counts[r.report_metadata.first_day_index()]++; |
| } |
| |
| // During the makeup period, prior to the finalization cutoff for the |
| // first day, there should be exactly one report per day. This is because |
| // for days prior to the finalization cutoff we only run the report once. |
| for (uint32_t day_index = kFirstDayIndex - FLAGS_daily_report_makeup_days; |
| day_index <= kFirstDayIndex - finalization_days; day_index++) { |
| EXPECT_EQ(1u, day_counts[day_index]) << "day_index=" << day_index; |
| } |
| |
| // After the first day there should be exactly finalization_days * 6 |
| // reports per day. This is because for each day we run 6 reports for every |
| // day that has not yet been finalized. An edge case is if |
| // finalization_days == 0 in which case there should be one report per day. |
| size_t expected_count = |
| (finalization_days == 0 ? 1 : finalization_days * 6); |
| for (uint32_t day_index = kFirstDayIndex + 1; |
| day_index <= kFirstDayIndex + 10; day_index++) { |
| EXPECT_EQ(expected_count, day_counts[day_index]) |
| << "report_config_id=" << report_config_id |
| << " day_index=" << day_index; |
| } |
| |
| // The number of reports run on the first day for the days that have not |
| // yet been finalized is messy so during the pre-finalization period for |
| // the first day we are only doing a sanity check: There should be more |
| // than one and at most finalization_days*6 reports. Note that if |
| // finalization_days=0 this is vacuous. |
| for (uint32_t day_index = kFirstDayIndex - finalization_days + 1; |
| day_index <= kFirstDayIndex; day_index++) { |
| EXPECT_GT(day_counts[day_index], 1u); |
| EXPECT_LE(day_counts[day_index], finalization_days * 6); |
| } |
| } |
| |
| // Tests the full operation of the scheduler thread. We invoke start() in |
| // order to start the sdheduler thread. We arrange for the scheduler thread |
| // to stop after 1000 iterations of the run loop. We then check the results |
| // by inspecting the contents of the ReportStore. |
| void DoRunTest() { |
| // We give the ReportScheduler its own IncrementingClock with an increment |
| // of 4 hours. This means that every 6 iterations through the Run() |
| // loop will increment the current day index, so that each report |
| // may be executed up to 6 times per day. |
| std::shared_ptr<IncrementingClock> clock(new IncrementingClock()); |
| clock->set_time(util::FromUnixSeconds(kStartingTimeSeconds)); |
| clock->set_increment(std::chrono::seconds(60 * 60 * 4)); |
| SetSchedulerClock(clock); |
| |
| // We arrange for the ReportStarter to not only start reports but also |
| // complete them successfully. If we did not do this then the |
| // ReportScheduler would refuse to reschedule a report again on the same day |
| // becuase it would think that the previous execution had not completed. |
| report_starter_->set_should_complete_reports(true); |
| |
| // We arrange for the scheduler thread to notify this thread after |
| // 1000 reports have been generated. |
| std::mutex mu; |
| std::condition_variable cv; |
| bool done = false; |
| report_starter_->set_notifier_func( |
| [&cv, &mu, &done](size_t num_reports_started) { |
| if (num_reports_started > 1000) { |
| std::lock_guard<std::mutex> lock(mu); |
| done = true; |
| cv.notify_all(); |
| } |
| }); |
| |
| // We start the scheduler thread. |
| scheduler_->Start(); |
| |
| // We wait for the scheduler thread to notify this thread that 1000 reports |
| // have been generated. |
| { |
| std::unique_lock<std::mutex> lock(mu); |
| cv.wait(lock, [&done] { return done; }); |
| } |
| |
| // We delete the ReportScheduler, which stops the scheduler thread. |
| scheduler_.reset(); |
| |
| // We check the restuls for our five report configs. |
| CheckRunResults(kReportConfigId, kReportFinalizationDays); |
| CheckRunResults(kReportConfigId2, kReportFinalizationDays2); |
| CheckRunResults(kReportConfigId3, kReportFinalizationDays3); |
| CheckRunResults(kReportConfigId4, kReportFinalizationDays4); |
| } |
| |
| protected: |
| std::shared_ptr<DataStore> data_store_; |
| std::shared_ptr<ReportStore> report_store_; |
| std::shared_ptr<ReportRegistry> report_registry_; |
| std::shared_ptr<FakeReportStarter> report_starter_; |
| std::unique_ptr<ReportScheduler> scheduler_; |
| std::shared_ptr<IncrementingClock> clock_; |
| |
| void ProcessOneReport(const ReportConfig& report_config, |
| uint32_t current_day_index) { |
| return scheduler_->ProcessOneReport(report_config, current_day_index); |
| } |
| }; |
| |
| // Test the function ProcessOneReport. In this test we are not using the |
| // scheduler thread of the ReportScheduler--we never start it. Instead we |
| // directly invoke the private function ProcessOneReport() and check |
| // its results by interogating the FakeReportStarter. |
| TEST_F(ReportSchedulerTest, ProcessOneReport) { |
| // The first time we run ProcessOneReport(), the ReportStore and the |
| // ReportHistoryCache are empty. We should start one report for the current |
| // day and one for each of the makeup days. |
| uint32_t current_day_index = kFirstDayIndex; |
| std::vector<uint32_t> expected_day_indices; |
| for (uint32_t day_index = current_day_index - FLAGS_daily_report_makeup_days; |
| day_index <= current_day_index; day_index++) { |
| expected_day_indices.push_back(day_index); |
| } |
| std::vector<ReportId> started_report_ids; |
| { |
| SCOPED_TRACE(""); |
| started_report_ids = |
| DoProcessOneReportTest(current_day_index, expected_day_indices); |
| } |
| |
| // Now advance time by 10 minutes. |
| int64_t current_time = kStartingTimeSeconds + kTenMinutes; |
| clock_->set_time(util::FromUnixSeconds(current_time)); |
| |
| // Its still the same day and none of the previously started reports have |
| // completed, so this time ProcessOneReport() should not start any reports. |
| expected_day_indices.clear(); |
| { |
| SCOPED_TRACE(""); |
| DoProcessOneReportTest(current_day_index, expected_day_indices); |
| } |
| |
| // Now complete all of the previously started reports. Suppose the first |
| // one failed but all other ones succeeded. |
| bool success = false; |
| for (const auto& report_id : started_report_ids) { |
| EXPECT_EQ(store::kOK, report_store_->EndReport(report_id, success, "")); |
| success = true; |
| } |
| |
| // Advance time by 10 minutes again. |
| current_time += kTenMinutes; |
| clock_->set_time(util::FromUnixSeconds(current_time)); |
| |
| // This time ProcessOneReport() should only start a new report for the |
| // days that have not yet been finalized, and one for the report that failed. |
| expected_day_indices.clear(); |
| // This is for the day that failed. |
| expected_day_indices.push_back(current_day_index - |
| FLAGS_daily_report_makeup_days); |
| // This is for the days that have not yet been finalized. |
| for (uint32_t day_index = current_day_index - kReportFinalizationDays + 1; |
| day_index <= current_day_index; day_index++) { |
| expected_day_indices.push_back(day_index); |
| } |
| { |
| SCOPED_TRACE(""); |
| started_report_ids = |
| DoProcessOneReportTest(current_day_index, expected_day_indices); |
| } |
| |
| // Now successfully complete all of the previously started reports. |
| for (const auto& report_id : started_report_ids) { |
| EXPECT_EQ(store::kOK, report_store_->EndReport(report_id, true, "")); |
| } |
| |
| // Advance time by 10 minutes again. |
| current_time += kTenMinutes; |
| clock_->set_time(util::FromUnixSeconds(current_time)); |
| |
| // This time ProcessOneReport() should only start a new report for the |
| // days that have not yet been finalized. |
| expected_day_indices.clear(); |
| for (uint32_t day_index = current_day_index - kReportFinalizationDays + 1; |
| day_index <= current_day_index; day_index++) { |
| expected_day_indices.push_back(day_index); |
| } |
| { |
| SCOPED_TRACE(""); |
| started_report_ids = |
| DoProcessOneReportTest(current_day_index, expected_day_indices); |
| } |
| |
| // Now advance time by 24 hours. |
| current_time += util::kNumUnixSecondsPerDay; |
| clock_->set_time(util::FromUnixSeconds(current_time)); |
| current_day_index++; |
| |
| // None of the previously started reports from yesterday have completed. |
| // This time ProcessOneReport() should only start a new report for the new |
| // day. |
| expected_day_indices.clear(); |
| expected_day_indices.push_back(current_day_index); |
| { |
| SCOPED_TRACE(""); |
| DoProcessOneReportTest(current_day_index, expected_day_indices); |
| } |
| |
| // Now successfully complete all of the reports started yesterday. |
| for (const auto& report_id : started_report_ids) { |
| EXPECT_EQ(store::kOK, report_store_->EndReport(report_id, true, "")); |
| } |
| |
| // Advance time by 10 minutes again. |
| current_time += kTenMinutes; |
| clock_->set_time(util::FromUnixSeconds(current_time)); |
| |
| // This time ProcessOneReport() should only start a new report for the |
| // days that have not yet been finalized, excluding the current day since |
| // the report we started 10 minutes ago never finished. |
| expected_day_indices.clear(); |
| for (uint32_t day_index = current_day_index - kReportFinalizationDays + 1; |
| day_index < current_day_index; day_index++) { |
| expected_day_indices.push_back(day_index); |
| } |
| { |
| SCOPED_TRACE(""); |
| started_report_ids = |
| DoProcessOneReportTest(current_day_index, expected_day_indices); |
| } |
| } |
| |
| // Tests the Run method using the default value of |
| // FLAGS_daily_report_makeup_days |
| TEST_F(ReportSchedulerTest, Run) { DoRunTest(); } |
| |
| // Tests the Run method using |
| // FLAGS_daily_report_makeup_days = 2 |
| TEST_F(ReportSchedulerTest, Run2) { |
| gflags::FlagSaver s1; |
| FLAGS_daily_report_makeup_days = 2; |
| SetUp(); |
| DoRunTest(); |
| } |
| |
| // Tests the Run method using |
| // FLAGS_daily_report_makeup_days = 1 |
| TEST_F(ReportSchedulerTest, Run1) { |
| gflags::FlagSaver s1; |
| FLAGS_daily_report_makeup_days = 1; |
| SetUp(); |
| DoRunTest(); |
| } |
| |
| // Tests the Run method using |
| // FLAGS_daily_report_makeup_days = 0 |
| TEST_F(ReportSchedulerTest, Run0) { |
| gflags::FlagSaver s1; |
| FLAGS_daily_report_makeup_days = 0; |
| SetUp(); |
| DoRunTest(); |
| } |
| |
| } // namespace analyzer |
| } // namespace cobalt |