[EventAggregator] Stop crashing when window size is invalid

Instead of crashing when a report has an invalid
window_size field, log an error and skip generating
observations or garbage collecting aggregated for
that report.

Change-Id: I312d0365c7a970f275a04ab8016ea8416f94ee8b
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
index 879352e..ac3e43f 100644
--- a/logger/event_aggregator.cc
+++ b/logger/event_aggregator.cc
@@ -167,6 +167,7 @@
       VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
                  "with empty LocalAggregateStore. File will be created on "
                  "first snapshot of the LocalAggregateStore.";
+      locked->local_aggregate_store.Clear();
       break;
     }
     default: {
@@ -176,7 +177,7 @@
           << "\nError message: " << restore_aggregates_status.error_message()
           << "\nError details: " << restore_aggregates_status.error_details()
           << "\nProceeding with empty LocalAggregateStore.";
-      locked->local_aggregate_store = LocalAggregateStore();
+      locked->local_aggregate_store.Clear();
     }
   }
   auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
@@ -641,11 +642,22 @@
         break;
       }
       default:
-        LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid.";
-        return kInvalidConfig;
+        LOG_FIRST_N(ERROR, 10)
+            << "The TimeZonePolicy of this MetricDefinition is invalid.";
+        continue;
+    }
+    if (pair.second.aggregation_config().window_size_size() == 0) {
+      LOG_FIRST_N(ERROR, 10)
+          << "This ReportDefinition does not have a window size.";
+      continue;
     }
     uint32_t max_window_size = pair.second.aggregation_config().window_size(
         pair.second.aggregation_config().window_size_size() - 1);
+    if (max_window_size == 0u || max_window_size > day_index) {
+      LOG_FIRST_N(ERROR, 10) << "The maximum window size " << max_window_size
+                             << " of this ReportDefinition is out of range.";
+      continue;
+    }
     // For each ReportAggregates, descend to and iterate over the sub-map of
     // local aggregates keyed by day index. Keep buckets with day indices
     // greater than |day_index| - |backfill_days_| - |max_window_size|, and
@@ -689,7 +701,7 @@
   // Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
   // generate observations.
   auto local_aggregate_store = CopyLocalAggregateStore();
-  for (auto pair : local_aggregate_store.by_report_key()) {
+  for (auto& pair : local_aggregate_store.by_report_key()) {
     const auto& config = pair.second.aggregation_config();
 
     const auto& metric = config.metric();
@@ -705,8 +717,9 @@
         break;
       }
       default:
-        LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid.";
-        return kInvalidConfig;
+        LOG_FIRST_N(ERROR, 10)
+            << "The TimeZonePolicy of this MetricDefinition is invalid.";
+        continue;
     }
 
     const auto& report = config.report();
@@ -714,11 +727,17 @@
     // size and that all window sizes are positive and <=
     // kMaxAllowedAggregationWindowSize, and has sorted the elements of
     // config.window_size() in increasing order.
-    CHECK_GT(config.window_size_size(), 0);
-    auto max_window_size = config.window_size(config.window_size_size() - 1);
-    CHECK_GT(max_window_size, 0);
-    CHECK_GE(final_day_index, max_window_size);
-
+    if (config.window_size_size() == 0u) {
+      LOG_FIRST_N(ERROR, 10) << "This ReportDefinition has no window_size.";
+      continue;
+    }
+    uint32_t max_window_size =
+        config.window_size(config.window_size_size() - 1);
+    if (max_window_size == 0u || max_window_size > final_day_index) {
+      LOG_FIRST_N(ERROR, 10) << "The maximum window size " << max_window_size
+                             << " of this ReportDefinition is out of range.";
+      continue;
+    }
     switch (metric.metric_type()) {
       case MetricDefinition::EVENT_OCCURRED: {
         auto num_event_codes =
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
index 1821a8f..31fa43c 100644
--- a/logger/event_aggregator_test.cc
+++ b/logger/event_aggregator_test.cc
@@ -104,6 +104,10 @@
         new MockConsistentProtoStore(kAggregateStoreFilename));
     obs_history_proto_store_.reset(
         new MockConsistentProtoStore(kObsHistoryFilename));
+    ResetEventAggregator();
+  }
+
+  void ResetEventAggregator() {
     event_aggregator_.reset(new EventAggregator(
         encoder_.get(), observation_writer_.get(),
         local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
@@ -1101,6 +1105,50 @@
   EXPECT_EQ(0u, observation_store_->messages_received.size());
 }
 
+// When the LocalAggregateStore contains one ReportAggregates proto and that
+// proto is empty, GenerateObservations should return success but generate no
+// observations.
+TEST_F(EventAggregatorTest, GenerateObservationsFromBadStore) {
+  auto bad_store = std::make_unique<LocalAggregateStore>();
+  (*bad_store->mutable_by_report_key())["some_key"] = ReportAggregates();
+  local_aggregate_proto_store_->set_stored_proto(std::move(bad_store));
+  // Read the bad store in to the EventAggregator.
+  ResetEventAggregator();
+  EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
+  EXPECT_EQ(0u, observation_store_->messages_received.size());
+}
+
+// When the LocalAggregateStore contains one empty ReportAggregates proto and
+// some valid ReportAggregates, GenerateObservations should produce observations
+// for the valid ReportAggregates.
+TEST_F(EventAggregatorTest, GenerateObservationsFromBadStoreMultiReport) {
+  auto bad_store = std::make_unique<LocalAggregateStore>();
+  (*bad_store->mutable_by_report_key())["some_key"] = ReportAggregates();
+  local_aggregate_proto_store_->set_stored_proto(std::move(bad_store));
+  // Read the bad store in to the EventAggregator.
+  ResetEventAggregator();
+  // Provide the all_report_types test registry to the EventAggregator.
+  auto project_context =
+      GetTestProject(testing::all_report_types::kCobaltRegistryBase64);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
+  std::vector<Observation2> observations(0);
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &observations, testing::all_report_types::kExpectedAggregationParams,
+      observation_store_.get(), update_recipient_.get()));
+}
+
+// When the LocalAggregateStore contains one ReportAggregates proto and that
+// proto is empty, GarbageCollect should return success.
+TEST_F(EventAggregatorTest, GarbageCollectBadStore) {
+  auto bad_store = std::make_unique<LocalAggregateStore>();
+  (*bad_store->mutable_by_report_key())["some_key"] = ReportAggregates();
+  local_aggregate_proto_store_->set_stored_proto(std::move(bad_store));
+  // Read the bad store in to the EventAggregator.
+  ResetEventAggregator();
+  EXPECT_EQ(kOK, GarbageCollect(CurrentDayIndex()));
+}
+
 // Tests that the LocalAggregateStore is updated as expected when
 // EventAggregator::LogUniqueActivesEvent() is called with valid arguments;
 // i.e., with a report ID associated to an existing key of the
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
index ecfd488..a013501 100644
--- a/logger/logger_test_utils.h
+++ b/logger/logger_test_utils.h
@@ -119,7 +119,7 @@
   int invocation_count = 0;
 };
 
-// A mock ConsistentProtoStore. Its Read() and Write() methods simply increment
+// A mock ConsistentProtoStore. Its Read() and Write() methods increment
 // counts of their invocations.
 class MockConsistentProtoStore : public ::cobalt::util::ConsistentProtoStore {
  public:
@@ -132,6 +132,7 @@
 
   ~MockConsistentProtoStore() {}
 
+  // To set the stored proto in a test, use |set_stored_proto| instead of Write.
   ::cobalt::util::Status Write(
       const google::protobuf::MessageLite& proto) override {
     write_count_++;
@@ -139,6 +140,10 @@
   }
 
   ::cobalt::util::Status Read(google::protobuf::MessageLite* proto) override {
+    if (stored_proto_) {
+      proto->Clear();
+      proto->CheckTypeAndMergeFrom(*stored_proto_);
+    }
     read_count_++;
     return ::cobalt::util::Status::OK;
   }
@@ -150,6 +155,13 @@
 
   int read_count_;
   int write_count_;
+
+  void set_stored_proto(std::unique_ptr<google::protobuf::MessageLite> proto) {
+    stored_proto_ = std::move(proto);
+  }
+
+ private:
+  std::unique_ptr<google::protobuf::MessageLite> stored_proto_;
 };
 
 // Creates and returns a ProjectContext from a serialized, base64-encoded Cobalt