[EventAggregator] Stop crashing when window size is invalid
Instead of crashing when a report has an invalid
window_size field, log an error and skip generating
observations or garbage collecting aggregated for
that report.
Change-Id: I312d0365c7a970f275a04ab8016ea8416f94ee8b
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
index 879352e..ac3e43f 100644
--- a/logger/event_aggregator.cc
+++ b/logger/event_aggregator.cc
@@ -167,6 +167,7 @@
VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
"with empty LocalAggregateStore. File will be created on "
"first snapshot of the LocalAggregateStore.";
+ locked->local_aggregate_store.Clear();
break;
}
default: {
@@ -176,7 +177,7 @@
<< "\nError message: " << restore_aggregates_status.error_message()
<< "\nError details: " << restore_aggregates_status.error_details()
<< "\nProceeding with empty LocalAggregateStore.";
- locked->local_aggregate_store = LocalAggregateStore();
+ locked->local_aggregate_store.Clear();
}
}
auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
@@ -641,11 +642,22 @@
break;
}
default:
- LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid.";
- return kInvalidConfig;
+ LOG_FIRST_N(ERROR, 10)
+ << "The TimeZonePolicy of this MetricDefinition is invalid.";
+ continue;
+ }
+ if (pair.second.aggregation_config().window_size_size() == 0) {
+ LOG_FIRST_N(ERROR, 10)
+ << "This ReportDefinition does not have a window size.";
+ continue;
}
uint32_t max_window_size = pair.second.aggregation_config().window_size(
pair.second.aggregation_config().window_size_size() - 1);
+ if (max_window_size == 0u || max_window_size > day_index) {
+ LOG_FIRST_N(ERROR, 10) << "The maximum window size " << max_window_size
+ << " of this ReportDefinition is out of range.";
+ continue;
+ }
// For each ReportAggregates, descend to and iterate over the sub-map of
// local aggregates keyed by day index. Keep buckets with day indices
// greater than |day_index| - |backfill_days_| - |max_window_size|, and
@@ -689,7 +701,7 @@
// Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
// generate observations.
auto local_aggregate_store = CopyLocalAggregateStore();
- for (auto pair : local_aggregate_store.by_report_key()) {
+ for (auto& pair : local_aggregate_store.by_report_key()) {
const auto& config = pair.second.aggregation_config();
const auto& metric = config.metric();
@@ -705,8 +717,9 @@
break;
}
default:
- LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid.";
- return kInvalidConfig;
+ LOG_FIRST_N(ERROR, 10)
+ << "The TimeZonePolicy of this MetricDefinition is invalid.";
+ continue;
}
const auto& report = config.report();
@@ -714,11 +727,17 @@
// size and that all window sizes are positive and <=
// kMaxAllowedAggregationWindowSize, and has sorted the elements of
// config.window_size() in increasing order.
- CHECK_GT(config.window_size_size(), 0);
- auto max_window_size = config.window_size(config.window_size_size() - 1);
- CHECK_GT(max_window_size, 0);
- CHECK_GE(final_day_index, max_window_size);
-
+ if (config.window_size_size() == 0u) {
+ LOG_FIRST_N(ERROR, 10) << "This ReportDefinition has no window_size.";
+ continue;
+ }
+ uint32_t max_window_size =
+ config.window_size(config.window_size_size() - 1);
+ if (max_window_size == 0u || max_window_size > final_day_index) {
+ LOG_FIRST_N(ERROR, 10) << "The maximum window size " << max_window_size
+ << " of this ReportDefinition is out of range.";
+ continue;
+ }
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
auto num_event_codes =
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
index 1821a8f..31fa43c 100644
--- a/logger/event_aggregator_test.cc
+++ b/logger/event_aggregator_test.cc
@@ -104,6 +104,10 @@
new MockConsistentProtoStore(kAggregateStoreFilename));
obs_history_proto_store_.reset(
new MockConsistentProtoStore(kObsHistoryFilename));
+ ResetEventAggregator();
+ }
+
+ void ResetEventAggregator() {
event_aggregator_.reset(new EventAggregator(
encoder_.get(), observation_writer_.get(),
local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
@@ -1101,6 +1105,50 @@
EXPECT_EQ(0u, observation_store_->messages_received.size());
}
+// When the LocalAggregateStore contains one ReportAggregates proto and that
+// proto is empty, GenerateObservations should return success but generate no
+// observations.
+TEST_F(EventAggregatorTest, GenerateObservationsFromBadStore) {
+ auto bad_store = std::make_unique<LocalAggregateStore>();
+ (*bad_store->mutable_by_report_key())["some_key"] = ReportAggregates();
+ local_aggregate_proto_store_->set_stored_proto(std::move(bad_store));
+ // Read the bad store in to the EventAggregator.
+ ResetEventAggregator();
+ EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
+ EXPECT_EQ(0u, observation_store_->messages_received.size());
+}
+
+// When the LocalAggregateStore contains one empty ReportAggregates proto and
+// some valid ReportAggregates, GenerateObservations should produce observations
+// for the valid ReportAggregates.
+TEST_F(EventAggregatorTest, GenerateObservationsFromBadStoreMultiReport) {
+ auto bad_store = std::make_unique<LocalAggregateStore>();
+ (*bad_store->mutable_by_report_key())["some_key"] = ReportAggregates();
+ local_aggregate_proto_store_->set_stored_proto(std::move(bad_store));
+ // Read the bad store in to the EventAggregator.
+ ResetEventAggregator();
+ // Provide the all_report_types test registry to the EventAggregator.
+ auto project_context =
+ GetTestProject(testing::all_report_types::kCobaltRegistryBase64);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
+ std::vector<Observation2> observations(0);
+ EXPECT_TRUE(FetchAggregatedObservations(
+ &observations, testing::all_report_types::kExpectedAggregationParams,
+ observation_store_.get(), update_recipient_.get()));
+}
+
+// When the LocalAggregateStore contains one ReportAggregates proto and that
+// proto is empty, GarbageCollect should return success.
+TEST_F(EventAggregatorTest, GarbageCollectBadStore) {
+ auto bad_store = std::make_unique<LocalAggregateStore>();
+ (*bad_store->mutable_by_report_key())["some_key"] = ReportAggregates();
+ local_aggregate_proto_store_->set_stored_proto(std::move(bad_store));
+ // Read the bad store in to the EventAggregator.
+ ResetEventAggregator();
+ EXPECT_EQ(kOK, GarbageCollect(CurrentDayIndex()));
+}
+
// Tests that the LocalAggregateStore is updated as expected when
// EventAggregator::LogUniqueActivesEvent() is called with valid arguments;
// i.e., with a report ID associated to an existing key of the
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
index ecfd488..a013501 100644
--- a/logger/logger_test_utils.h
+++ b/logger/logger_test_utils.h
@@ -119,7 +119,7 @@
int invocation_count = 0;
};
-// A mock ConsistentProtoStore. Its Read() and Write() methods simply increment
+// A mock ConsistentProtoStore. Its Read() and Write() methods increment
// counts of their invocations.
class MockConsistentProtoStore : public ::cobalt::util::ConsistentProtoStore {
public:
@@ -132,6 +132,7 @@
~MockConsistentProtoStore() {}
+ // To set the stored proto in a test, use |set_stored_proto| instead of Write.
::cobalt::util::Status Write(
const google::protobuf::MessageLite& proto) override {
write_count_++;
@@ -139,6 +140,10 @@
}
::cobalt::util::Status Read(google::protobuf::MessageLite* proto) override {
+ if (stored_proto_) {
+ proto->Clear();
+ proto->CheckTypeAndMergeFrom(*stored_proto_);
+ }
read_count_++;
return ::cobalt::util::Status::OK;
}
@@ -150,6 +155,13 @@
int read_count_;
int write_count_;
+
+ void set_stored_proto(std::unique_ptr<google::protobuf::MessageLite> proto) {
+ stored_proto_ = std::move(proto);
+ }
+
+ private:
+ std::unique_ptr<google::protobuf::MessageLite> stored_proto_;
};
// Creates and returns a ProjectContext from a serialized, base64-encoded Cobalt