[test_app2] Generate locally aggregated observations in testapp2

Adds a command to generate locally aggregated observations for
a specified day index when using test_app2 in interactive mode,
and adds a day index argument to the LogEvent and LogEventCount
methods of TestApp so that events can be logged for a specified
day index. Also adds a command which deletes all state related
to local aggregation in an instance of the test app.

Change-Id: I1650c6829b4d0f57e51daca8bb50f51e5dfe02b2
diff --git a/logger/logger.h b/logger/logger.h
index 4d2d97f..19f58ea 100644
--- a/logger/logger.h
+++ b/logger/logger.h
@@ -21,6 +21,13 @@
 #include "util/clock.h"
 
 namespace cobalt {
+
+namespace internal {
+
+class RealLoggerFactory;
+
+}  // namespace internal
+
 namespace logger {
 
 // Concrete implementation of LoggerInterface.
@@ -97,6 +104,7 @@
  private:
   friend class EventLogger;
   friend class LoggerTest;
+  friend class cobalt::internal::RealLoggerFactory;
 
   void SetClock(util::ClockInterface* clock) { clock_.reset(clock); }
 
diff --git a/tools/test_app2/test_app.cc b/tools/test_app2/test_app.cc
index 356389f..92e25ba 100644
--- a/tools/test_app2/test_app.cc
+++ b/tools/test_app2/test_app.cc
@@ -29,9 +29,12 @@
 #include "logger/project_context.h"
 #include "logger/status.h"
 #include "util/clearcut/curl_http_client.h"
+#include "util/clock.h"
 #include "util/consistent_proto_store.h"
+#include "util/datetime_util.h"
 #include "util/pem_util.h"
 #include "util/posix_file_system.h"
+#include "util/status.h"
 
 namespace cobalt {
 
@@ -55,10 +58,15 @@
 using logger::ObservationWriter;
 using logger::ProjectContext;
 using logger::Status;
+using util::ClockInterface;
 using util::ConsistentProtoStore;
 using util::EncryptedMessageMaker;
+using util::IncrementingClock;
 using util::PemUtil;
 using util::PosixFileSystem;
+using util::StatusCode;
+using util::SystemClock;
+using util::TimeToDayIndex;
 
 // There are three modes of operation of the Cobalt TestClient program
 // determined by the value of this flag.
@@ -110,26 +118,51 @@
   *ostream << "----------------------------------" << std::endl;
   *ostream << "help                     \tPrint this help message."
            << std::endl;
-  *ostream << "log <num> event <index>  \tLog <num> independent copies "
-              "of an EVENT_OCCURRED event with event_code = <index>"
-           << std::endl;
-  *ostream << "log <num> event_count <index> <component> <duration> <count>"
-           << std::endl
-           << "                         \tLog <num> independent copies of an "
-              "EVENT_COUNT event."
+  *ostream << "log <num> event <index> <day> \tLog <num> independent copies "
+              "of an EVENT_OCCURRED event."
            << std::endl
            << "                         \t- The <index> is the event_code of "
-              "the EVENT_COUNT event."
+              "the EVENT_OCCURRED event."
            << std::endl
-           << "                         \t- The <component> is the component "
-              "name.  Pass in \"\" if your metric does not use this field."
+           << "                         \t- The optional argument <day> is the "
+              "day for which the event should be logged."
            << std::endl
-           << "                         \t- The <duration> specifies the "
-              "period of time over which <count> EVENT_COUNT events occurred.  "
-           << "Pass in 0 if your metric does not use this field." << std::endl
-           << "                         \t- The <count> specifies the number "
-              "of times an EVENT_COUNT event occurred."
+           << "                         \t  If provided, it "
+              "should be of the form \"day=<day index>\", \"day=today\", "
+              "\"day=today+<number of days>\", or \"day=today-<number of "
+              "days>\"."
+           << std::endl
+           << "                         \t  The default day is the current day."
            << std::endl;
+
+  *ostream
+      << "log <num> event_count <index> <component> <duration> <count> <day>"
+      << std::endl
+      << "                         \tLog <num> independent copies of an "
+         "EVENT_COUNT event."
+      << std::endl
+      << "                         \t- The <index> is the event_code of "
+         "the EVENT_COUNT event."
+      << std::endl
+      << "                         \t- The <component> is the component "
+         "name.  Pass in \"\" if your metric does not use this field."
+      << std::endl
+      << "                         \t- The <duration> specifies the "
+         "period of time over which <count> EVENT_COUNT events occurred.  "
+      << "Pass in 0 if your metric does not use this field." << std::endl
+      << "                         \t- The <count> specifies the number "
+         "of times an EVENT_COUNT event occurred."
+      << std::endl
+      << "                         \t- The optional argument <day> is the "
+         "day for which the event should be logged."
+      << std::endl
+      << "                         \t  If provided, it "
+         "should be of the form \"day=<day index>\", \"day=today\", "
+         "\"day=today+<number of days>\", or \"day=today-<number of "
+         "days>\"."
+      << std::endl
+      << "                         \t  The default day is the current day."
+      << std::endl;
   *ostream << "log <num> elapsed_time <index> <component> <elapsed_micros>"
            << std::endl
            << "                         \tLog <num> independent copies of an "
@@ -144,8 +177,7 @@
            << "                         \t- The <elapsed_micros> specifies how "
               "many microseconds have elapsed for the given ELAPSED_TIME event."
            << std::endl;
-  *ostream << "log <num> frame_rate <index> <component> <fps>"
-           << std::endl
+  *ostream << "log <num> frame_rate <index> <component> <fps>" << std::endl
            << "                         \tLog <num> independent copies of a "
               "FRAME_RATE event."
            << std::endl
@@ -157,8 +189,7 @@
            << std::endl
            << "                         \t- The <fps> specifies the frame rate."
            << std::endl;
-  *ostream << "log <num> memory_usage <index> <component> <bytes>"
-           << std::endl
+  *ostream << "log <num> memory_usage <index> <component> <bytes>" << std::endl
            << "                         \tLog <num> independent copies of a "
               "MEMORY_USAGE event."
            << std::endl
@@ -196,6 +227,14 @@
   *ostream << "                         \t- Each <val> is an int or string "
               "value or an index <n> if <val>='index=<n>'."
            << std::endl;
+  *ostream << "generate <day>                 \tGenerate and send observations "
+              "for <day> for all locally aggregated reports. <day> may be a "
+              "day index, \'today\', \'today+N\', or \'today-N\'."
+           << std::endl;
+  *ostream
+      << "reset-aggregation              \tDelete all state related to local "
+         "aggregation."
+      << std::endl;
   *ostream << "ls                       \tList current values of "
               "parameters."
            << std::endl;
@@ -313,6 +352,13 @@
   return tokens;
 }
 
+}  // namespace
+
+namespace internal {
+
+// The number of seconds in a day.
+static const int kDay = 86400;
+
 class RealLoggerFactory : public LoggerFactory {
  public:
   virtual ~RealLoggerFactory() = default;
@@ -327,7 +373,11 @@
       std::unique_ptr<ConsistentProtoStore> obs_history_proto_store,
       std::unique_ptr<SystemDataInterface> system_data);
 
-  std::unique_ptr<LoggerInterface> NewLogger() override;
+  std::unique_ptr<LoggerInterface> NewLogger(uint32_t day_index = 0u) override;
+  size_t ObservationCount() override;
+  void ResetObservationCount() override;
+  void ResetLocalAggregation() override;
+  bool GenerateAggregatedObservations(uint32_t day_index) override;
   bool SendAccumulatedObservations() override;
   const ProjectContext* project_context() override {
     return project_context_.get();
@@ -363,9 +413,7 @@
       shipping_manager_(std::move(shipping_manager)),
       local_aggregate_proto_store_(std::move(local_aggregate_proto_store)),
       obs_history_proto_store_(std::move(obs_history_proto_store)),
-      system_data_(std::move(system_data)) {}
-
-std::unique_ptr<LoggerInterface> RealLoggerFactory::NewLogger() {
+      system_data_(std::move(system_data)) {
   encoder_.reset(
       new Encoder(ClientSecret::GenerateNewSecret(), system_data_.get()));
   observation_writer_.reset(
@@ -374,9 +422,43 @@
   event_aggregator_.reset(new EventAggregator(
       encoder_.get(), observation_writer_.get(),
       local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
-  return std::unique_ptr<LoggerInterface>(
-      new Logger(encoder_.get(), event_aggregator_.get(),
-                 observation_writer_.get(), project_context_.get()));
+}
+
+std::unique_ptr<LoggerInterface> RealLoggerFactory::NewLogger(
+    uint32_t day_index) {
+  std::unique_ptr<Logger> logger = std::make_unique<Logger>(
+      encoder_.get(), event_aggregator_.get(), observation_writer_.get(),
+      project_context_.get());
+  if (day_index != 0u) {
+    auto mock_clock = new IncrementingClock();
+    mock_clock->set_time(std::chrono::system_clock::time_point(
+        std::chrono::seconds(kDay * day_index)));
+    logger->SetClock(mock_clock);
+  }
+  return std::move(logger);
+}
+
+size_t RealLoggerFactory::ObservationCount() {
+  return observation_store_->num_observations_added();
+}
+
+void RealLoggerFactory::ResetObservationCount() {
+  observation_store_->ResetObservationCounter();
+}
+
+// TODO(pesk): also clear the contents of the ConsistentProtoStores if we
+// implement a mode which uses them.
+void RealLoggerFactory::ResetLocalAggregation() {
+  event_aggregator_.reset(new EventAggregator(
+      encoder_.get(), observation_writer_.get(),
+      local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
+}
+
+bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
+  if (kOK == event_aggregator_->GenerateObservationsNoWorker(day_index)) {
+    return true;
+  }
+  return false;
 }
 
 bool RealLoggerFactory::SendAccumulatedObservations() {
@@ -386,7 +468,7 @@
   return status.ok();
 }
 
-}  // namespace
+}  // namespace internal
 
 std::unique_ptr<TestApp> TestApp::CreateFromFlagsOrDie(int argc, char* argv[]) {
   std::string config_bin_proto_path = FLAGS_config_bin_proto_path;
@@ -451,7 +533,7 @@
           std::make_unique<util::clearcut::CurlHTTPClient>()));
   shipping_manager->Start();
 
-  std::unique_ptr<LoggerFactory> logger_factory(new RealLoggerFactory(
+  std::unique_ptr<LoggerFactory> logger_factory(new internal::RealLoggerFactory(
       std::move(observation_encrypter), std::move(envelope_encrypter),
       std::move(project_context), std::move(observation_store),
       std::move(shipping_manager), std::move(local_aggregate_proto_store),
@@ -472,6 +554,7 @@
   CHECK(logger_factory_->project_context());
   CHECK(ostream_);
   CHECK(SetMetric(initial_metric_name));
+  clock_ = new SystemClock();
 }
 
 bool TestApp::SetMetric(const std::string& metric_name) {
@@ -480,6 +563,8 @@
     (*ostream_) << "There is no metric named '" << metric_name
                 << "' in  project "
                 << logger_factory_->project_context()->DebugString() << "."
+                << std::endl
+                << "You may need to run `./cobaltb.py update_config`."
                 << std::endl;
     return false;
   }
@@ -527,6 +612,16 @@
     return true;
   }
 
+  if (command[0] == "generate") {
+    GenerateAggregatedObservations(command);
+    return true;
+  }
+
+  if (command[0] == "reset-aggregation") {
+    ResetLocalAggregation();
+    return true;
+  }
+
   if (command[0] == "ls") {
     ListParameters();
     return true;
@@ -620,14 +715,24 @@
   return;
 }
 
+uint32_t TestApp::CurrentDayIndex() {
+  return TimeToDayIndex(std::chrono::system_clock::to_time_t(clock_->now()),
+                        MetricDefinition::UTC);
+}
+
 // We know that command[0] = "log", command[1] = <num_clients>
 void TestApp::LogEvent(uint64_t num_clients,
                        const std::vector<std::string>& command) {
-  if (command.size() != 4) {
-    *ostream_ << "Malformed log event command. Expected exactly one more "
+  auto command_size = command.size();
+  if (command_size < 4) {
+    *ostream_ << "Malformed log event command. Expected one more "
                  "argument for <event_code>."
               << std::endl;
     return;
+  } else if (command_size > 5) {
+    *ostream_ << "Malformed log event command: too many arguments."
+              << std::endl;
+    return;
   }
 
   int64_t event_code;
@@ -635,23 +740,32 @@
     return;
   }
 
-  LogEvent(num_clients, event_code);
+  uint32_t day_index = 0;
+  if (command_size == 5 && !ParseDay(command[4], &day_index)) {
+    *ostream_ << "Unable to parse <day> from log command: " << command[4]
+              << std::endl;
+    return;
+  }
+  LogEvent(num_clients, event_code, day_index);
 }
 
-void TestApp::LogEvent(size_t num_clients, uint32_t event_code) {
+void TestApp::LogEvent(size_t num_clients, uint32_t event_code,
+                       uint32_t day_index) {
   if (!current_metric_) {
     *ostream_ << "Cannot LogEvent. There is no current metric set."
               << std::endl;
     return;
   }
-  VLOG(6) << "TestApp::LogEvents(" << num_clients << ", " << event_code << ").";
+  VLOG(6) << "TestApp::LogEvents(" << num_clients << ", " << event_code << ", "
+          << day_index << ").";
   for (size_t i = 0; i < num_clients; i++) {
-    auto logger = logger_factory_->NewLogger();
+    auto logger = logger_factory_->NewLogger(day_index);
     auto status = logger->LogEvent(current_metric_->id(), event_code);
     if (status != logger::kOK) {
       LOG(ERROR) << "LogEvent() failed with status " << status
                  << ". metric=" << current_metric_->metric_name()
-                 << ". event_code=" << event_code;
+                 << ". event_code=" << event_code
+                 << ". day_index=" << day_index;
       break;
     }
   }
@@ -659,14 +773,20 @@
 }
 
 // We know that command[0] = "log", command[1] = <num_clients>,
-// command[2] = "event_count"
+// command[2] = "event_count".
 void TestApp::LogEventCount(uint64_t num_clients,
                             const std::vector<std::string>& command) {
-  if (command.size() != 7) {
-    *ostream_ << "Malformed log event_count command. Expected 4 additional "
-              << "parameters." << std::endl;
+  auto command_size = command.size();
+  if (command_size < 7) {
+    *ostream_ << "Malformed log event_count command: missing at least one "
+                 "required argument."
+              << std::endl;
     return;
   }
+  if (command_size > 8) {
+    *ostream_ << "Malformed log event_count command: too many arguments."
+              << std::endl;
+  }
 
   int64_t event_code;
   if (!ParseNonNegativeInt(command[3], true, &event_code)) {
@@ -688,29 +808,38 @@
               << std::endl;
     return;
   }
+  uint32_t day_index = 0u;
+  if (command_size == 8 && !ParseDay(command[7], &day_index)) {
+    *ostream_ << "Unable to parse <day> from log command: " << command[7]
+              << std::endl;
+    return;
+  }
 
-  LogEventCount(num_clients, event_code, command[4], duration, count);
+  LogEventCount(num_clients, event_code, command[4], duration, count,
+                day_index);
 }
 
 void TestApp::LogEventCount(size_t num_clients, uint32_t event_code,
                             const std::string& component, int64_t duration,
-                            int64_t count) {
+                            int64_t count, uint32_t day_index) {
   if (!current_metric_) {
     *ostream_ << "Cannot LogEventCount. There is no current metric set."
               << std::endl;
     return;
   }
   VLOG(6) << "TestApp::LogEventCount(" << num_clients << ", " << event_code
-          << ", " << component << ", " << duration << ", " << count << ").";
+          << ", " << component << ", " << duration << ", " << count << ", "
+          << day_index << ").";
   for (size_t i = 0; i < num_clients; i++) {
-    auto logger = logger_factory_->NewLogger();
+    auto logger = logger_factory_->NewLogger(day_index);
     auto status = logger->LogEventCount(current_metric_->id(), event_code,
                                         component, duration, count);
     if (status != logger::kOK) {
       LOG(ERROR) << "LogEventCount() failed with status " << status
                  << ". metric=" << current_metric_->metric_name()
                  << ". event_code=" << event_code << ". component=" << component
-                 << ". duration=" << duration << ". count=" << count;
+                 << ". duration=" << duration << ". count=" << count
+                 << ". day_index=" << day_index;
       break;
     }
   }
@@ -789,8 +918,8 @@
 
   float fps;
   if (!ParseFloat(command[5], true, &fps)) {
-    *ostream_ << "Unable to parse <fps> from log command: "
-              << command[5] << std::endl;
+    *ostream_ << "Unable to parse <fps> from log command: " << command[5]
+              << std::endl;
     return;
   }
 
@@ -809,8 +938,8 @@
           << ", " << component << ", " << fps << ").";
   for (size_t i = 0; i < num_clients; i++) {
     auto logger = logger_factory_->NewLogger();
-    auto status = logger->LogFrameRate(current_metric_->id(), event_code,
-                                       component, fps);
+    auto status =
+        logger->LogFrameRate(current_metric_->id(), event_code, component, fps);
     if (status != logger::kOK) {
       LOG(ERROR) << "LogFrameRate() failed with status " << status
                  << ". metric=" << current_metric_->metric_name()
@@ -841,8 +970,8 @@
 
   int64_t bytes;
   if (!ParseNonNegativeInt(command[5], true, &bytes)) {
-    *ostream_ << "Unable to parse <bytes> from log command: "
-              << command[5] << std::endl;
+    *ostream_ << "Unable to parse <bytes> from log command: " << command[5]
+              << std::endl;
     return;
   }
 
@@ -862,7 +991,7 @@
   for (size_t i = 0; i < num_clients; i++) {
     auto logger = logger_factory_->NewLogger();
     auto status = logger->LogMemoryUsage(current_metric_->id(), event_code,
-                                       component, bytes);
+                                         component, bytes);
     if (status != logger::kOK) {
       LOG(ERROR) << "LogMemoryUsage() failed with status " << status
                  << ". metric=" << current_metric_->metric_name()
@@ -893,15 +1022,15 @@
 
   int64_t bucket;
   if (!ParseNonNegativeInt(command[5], true, &bucket)) {
-    *ostream_ << "Unable to parse <bucket> from log command: "
-              << command[5] << std::endl;
+    *ostream_ << "Unable to parse <bucket> from log command: " << command[5]
+              << std::endl;
     return;
   }
 
   int64_t count;
   if (!ParseNonNegativeInt(command[6], true, &count)) {
-    *ostream_ << "Unable to parse <count> from log command: "
-              << command[6] << std::endl;
+    *ostream_ << "Unable to parse <count> from log command: " << command[6]
+              << std::endl;
     return;
   }
 
@@ -909,7 +1038,8 @@
 }
 
 void TestApp::LogIntHistogram(uint64_t num_clients, uint32_t event_code,
-      const std::string& component, int64_t bucket, int64_t count) {
+                              const std::string& component, int64_t bucket,
+                              int64_t count) {
   if (!current_metric_) {
     *ostream_ << "Cannot LogIntHistogram. There is no current metric set."
               << std::endl;
@@ -928,7 +1058,7 @@
 
     auto logger = logger_factory_->NewLogger();
     auto status = logger->LogIntHistogram(current_metric_->id(), event_code,
-                                         component, std::move(histogram_ptr));
+                                          component, std::move(histogram_ptr));
     if (status != logger::kOK) {
       LOG(ERROR) << "LogIntHistogram() failed with status " << status
                  << ". metric=" << current_metric_->metric_name()
@@ -992,6 +1122,49 @@
   *ostream_ << "Done." << std::endl;
 }
 
+// We know that command[0] = "generate"
+void TestApp::GenerateAggregatedObservations(
+    const std::vector<std::string>& command) {
+  if (command.size() > 2) {
+    *ostream_ << "Malformed generate command: too many arguments." << std::endl;
+    return;
+  }
+  uint32_t day_index;
+  if (command.size() < 2) {
+    day_index = CurrentDayIndex();
+  } else if (!ParseDay(command[1], &day_index)) {
+    *ostream_ << "Could not parse argument " << command[1] << " to a day index"
+              << std::endl;
+    return;
+  }
+  GenerateAggregatedObservationsAndSend(day_index);
+  return;
+}
+
+void TestApp::GenerateAggregatedObservationsAndSend(uint32_t day_index) {
+  logger_factory_->NewLogger();
+  logger_factory_->ResetObservationCount();
+  if (logger_factory_->GenerateAggregatedObservations(day_index)) {
+    *ostream_ << "Generated " << logger_factory_->ObservationCount()
+              << " locally aggregated observations for day index " << day_index
+              << std::endl;
+  } else {
+    *ostream_
+        << "Failed to generate locally aggregated observations for day index "
+        << day_index << std::endl;
+    return;
+  }
+  if (!logger_factory_->SendAccumulatedObservations()) {
+    *ostream_ << "Failed to send locally aggregated observations" << std::endl;
+  }
+}
+
+void TestApp::ResetLocalAggregation() {
+  logger_factory_->NewLogger();
+  logger_factory_->ResetLocalAggregation();
+  *ostream_ << "Reset local aggregation." << std::endl;
+}
+
 void TestApp::ListParameters() {
   std::string metric_name = "No metric set";
   if (current_metric_) {
@@ -1159,6 +1332,66 @@
   return true;
 }
 
+bool TestApp::ParseDay(const std::string& str, uint32_t* day_index) {
+  CHECK(index);
+  if (str.size() < 5 || str.substr(0, 4) != "day=") {
+    *ostream_ << "Expected prefix 'day='." << std::endl;
+    return false;
+  }
+  auto day_string = str.substr(4);
+
+  // Handle the case where |day_string| is "today", "today+N", or "today-N".
+  if (day_string.size() >= 5 && day_string.substr(0, 5) == "today") {
+    auto current_day_index = CurrentDayIndex();
+    if (day_string.size() == 5) {
+      *day_index = current_day_index;
+      return true;
+    }
+
+    if (day_string.size() > 6) {
+      int64_t offset;
+      if (!ParseNonNegativeInt(day_string.substr(6), true, &offset)) {
+        return false;
+      }
+      auto modifier = day_string.substr(5, 1);
+      if (modifier == "+") {
+        *day_index = (current_day_index + offset);
+        return true;
+      } else if (modifier == "-") {
+        if (offset > current_day_index) {
+          *ostream_
+              << "Negative offset cannot be larger than the current day index."
+              << std::endl;
+          return false;
+        }
+        *day_index = (current_day_index - offset);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  // Handle the case where |day_string| is an integer.
+  std::istringstream iss(day_string);
+  int64_t possible_day_index;
+  iss >> possible_day_index;
+  char c;
+  if (iss.fail() || iss.get(c) || possible_day_index < 0 ||
+      possible_day_index > UINT32_MAX) {
+    if (mode_ == kInteractive) {
+      *ostream_ << "Expected small non-negative integer instead of "
+                << day_string << "." << std::endl;
+    } else {
+      LOG(ERROR) << "Expected small non-negative integer instead of  "
+                 << day_string;
+    }
+    return false;
+  }
+  *day_index = possible_day_index;
+  return true;
+}
+
 // Parses a string of the form <part>:<value> and writes <part> into |part_name|
 // and <value> into |value|.
 // Returns true if and only if this succeeds.
diff --git a/tools/test_app2/test_app.h b/tools/test_app2/test_app.h
index 1c49e81..48f4710 100644
--- a/tools/test_app2/test_app.h
+++ b/tools/test_app2/test_app.h
@@ -17,6 +17,7 @@
 #include "logger/logger.h"
 #include "logger/project_context.h"
 #include "third_party/googletest/googletest/include/gtest/gtest.h"
+#include "util/clock.h"
 
 namespace cobalt {
 
@@ -24,10 +25,21 @@
  public:
   virtual ~LoggerFactory() = default;
 
-  virtual std::unique_ptr<logger::LoggerInterface> NewLogger() = 0;
+  // Creates a new Logger. If a nonzero day index is provided, then the Logger
+  // will log events with that day index.
+  virtual std::unique_ptr<logger::LoggerInterface> NewLogger(
+      uint32_t day_index = 0) = 0;
 
   virtual const logger::ProjectContext* project_context() = 0;
 
+  virtual size_t ObservationCount() = 0;
+
+  virtual void ResetObservationCount() = 0;
+
+  virtual void ResetLocalAggregation() = 0;
+
+  virtual bool GenerateAggregatedObservations(uint32_t day_index) = 0;
+
   virtual bool SendAccumulatedObservations() = 0;
 };
 
@@ -37,9 +49,9 @@
   static std::unique_ptr<TestApp> CreateFromFlagsOrDie(int argc, char* argv[]);
 
   // Modes of operation of the Cobalt test application. An instance of
-  // TestApp is in interactive mode unless set_mode() is invoked. set_mode()
-  // is invoked from CreateFromFlagsOrDie() in order to set the mode to the
-  // one specified by the -mode flag.
+  // TestApp is in interactive mode unless set_mode() is invoked.
+  // set_mode() is invoked from CreateFromFlagsOrDie() in order to set the
+  // mode to the one specified by the -mode flag.
   enum Mode {
     // In this mode the TestApp is controlled via an interactive command-line
     // loop.
@@ -75,6 +87,9 @@
   // Returns false if an only if the specified command is "quit".
   bool ProcessCommandLine(const std::string command_line);
 
+  // Returns the current day index in UTC according to the test app's clock.
+  uint32_t CurrentDayIndex();
+
  private:
   // Implements interactive mode.
   void CommandLoop();
@@ -89,12 +104,13 @@
 
   void Log(const std::vector<std::string>& command);
   void LogEvent(uint64_t num_clients, const std::vector<std::string>& command);
-  void LogEvent(size_t num_clients, uint32_t event_code);
+  void LogEvent(size_t num_clients, uint32_t event_code,
+                uint32_t day_index = 0u);
   void LogEventCount(uint64_t num_clients,
                      const std::vector<std::string>& command);
   void LogEventCount(size_t num_clients, uint32_t event_code,
                      const std::string& component, int64_t duration,
-                     int64_t count);
+                     int64_t count, uint32_t day_index = 0u);
   void LogElapsedTime(uint64_t num_clients,
                       const std::vector<std::string>& command);
   void LogElapsedTime(uint64_t num_clients, uint32_t event_code,
@@ -118,6 +134,13 @@
                       const std::vector<std::string>& metric_parts,
                       const std::vector<std::string>& values);
 
+  // Generates all aggregated observations for a day index specified by
+  // |command|.
+  void GenerateAggregatedObservations(const std::vector<std::string>& command);
+
+  // Deletes the local aggregates and the history of aggregated observations.
+  void ResetLocalAggregation();
+
   void ListParameters();
 
   void SetParameter(const std::vector<std::string>& command);
@@ -132,12 +155,23 @@
 
   bool ParseIndex(const std::string& str, uint32_t* index);
 
+  // Parses strings of the following forms:
+  // day=today
+  // day=today+N, where N is a nonnegative number
+  // day=today-N, where N is a nonnegative number <= the current day index
+  // day=K, where K is a day index
+  // Computes the day index of that day in UTC, using |clock_| to get the
+  // current day index if |str| begins with "day=today", and writes it to
+  // |day_index|.
+  bool ParseDay(const std::string& str, uint32_t* day_index);
+
   bool ParseNonNegativeInt(const std::string& str, bool complain, int64_t* x);
 
   FRIEND_TEST(TestAppTest, ParseInt);
   FRIEND_TEST(TestAppTest, ParseFloat);
   FRIEND_TEST(TestAppTest, ParseIndex);
   FRIEND_TEST(TestAppTest, ParseNonNegativeInt);
+  FRIEND_TEST(TestAppTest, ParseDay);
 
   // Parses a string of the form <part>:<value> and writes <part> into
   // |part_name| and <value> into |value|.
@@ -151,12 +185,20 @@
       std::vector<std::string> dimension_names,
       std::vector<std::string> values);
 
+  void GenerateAggregatedObservationsAndSend(uint32_t day_index);
+
+  void ResetLocalAggregateStore();
+
+  void ResetAggregatedObservationHistory();
+
   const MetricDefinition* current_metric_;
   // The TestApp is in interactive mode unless set_mode() is invoked.
   Mode mode_ = kInteractive;
   std::unique_ptr<LoggerFactory> logger_factory_;
   std::ostream* ostream_;
+  util::ClockInterface* clock_;
 };
 
 }  // namespace cobalt
+
 #endif  // COBALT_TOOLS_TEST_APP2_TEST_APP_H_
diff --git a/tools/test_app2/test_app_test.cc b/tools/test_app2/test_app_test.cc
index f890c00..2fece9a 100644
--- a/tools/test_app2/test_app_test.cc
+++ b/tools/test_app2/test_app_test.cc
@@ -13,6 +13,7 @@
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
+#include "logger/logger_test_utils.h"
 #include "logger/project_context.h"
 #include "third_party/googletest/googletest/include/gtest/gtest.h"
 
@@ -20,6 +21,7 @@
 
 using logger::LoggerInterface;
 using logger::ProjectContext;
+using logger::testing::FakeObservationStore;
 
 DECLARE_uint32(num_clients);
 DECLARE_string(values);
@@ -139,8 +141,31 @@
   }
 }
 
+metric {
+  metric_name: "FeaturesActive"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 7
+  max_event_code: 9
+  reports: {
+    report_name: "FeaturesActiveUniqueDevices"
+    id: 301
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: SMALL
+    window_size: 1
+    window_size: 7
+  }
+}
+
 )";
 
+// The number of locally aggregated Observations that should be generated for
+// each day. Since Observation generation is faked here, this number does not
+// need to correspond to a test Metric registry. It just needs to be a positive
+// number.
+static const int kNumAggregatedObservations = 20;
+
 bool PopulateMetricDefinitions(MetricDefinitions* metric_definitions) {
   google::protobuf::TextFormat::Parser parser;
   return parser.ParseFromString(kMetricDefinitions, metric_definitions);
@@ -150,28 +175,67 @@
  public:
   explicit TestLoggerFactory(const ProjectContext* project_context);
 
-  std::unique_ptr<LoggerInterface> NewLogger() override;
+  std::unique_ptr<LoggerInterface> NewLogger(uint32_t day_index) override;
 
   const ProjectContext* project_context() override;
+
+  size_t ObservationCount() override;
+
+  void ResetObservationCount() override;
+
+  void ResetLocalAggregation() override;
+
+  bool GenerateAggregatedObservations(uint32_t day_index) override;
+
   bool SendAccumulatedObservations() override;
 
  private:
   const ProjectContext* project_context_;
+  std::unique_ptr<FakeObservationStore> observation_store_;
+  uint32_t last_obs_generation_;
 };
 
 TestLoggerFactory::TestLoggerFactory(const ProjectContext* project_context)
-    : project_context_(project_context) {}
+    : project_context_(project_context),
+      observation_store_(new FakeObservationStore),
+      last_obs_generation_(0) {}
 
-std::unique_ptr<LoggerInterface> TestLoggerFactory::NewLogger() {
+std::unique_ptr<LoggerInterface> TestLoggerFactory::NewLogger(
+    uint32_t day_index) {
   return nullptr;
 }
 
+size_t TestLoggerFactory::ObservationCount() {
+  return observation_store_->num_observations_added();
+}
+
+void TestLoggerFactory::ResetObservationCount() {
+  return observation_store_->ResetObservationCounter();
+}
+
+void TestLoggerFactory::ResetLocalAggregation() { last_obs_generation_ = 0; }
+
+bool TestLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
+  if (day_index > last_obs_generation_) {
+    for (int i = 0; i < kNumAggregatedObservations; i++) {
+      if (encoder::ObservationStore::kOk !=
+          observation_store_->AddEncryptedObservation(
+              std::make_unique<EncryptedMessage>(),
+              std::make_unique<ObservationMetadata>())) {
+        return false;
+      }
+    }
+    last_obs_generation_ = day_index;
+  }
+  return true;
+}
+
+bool TestLoggerFactory::SendAccumulatedObservations() { return true; }
+
 const ProjectContext* TestLoggerFactory::project_context() {
   return project_context_;
 }
 
-bool TestLoggerFactory::SendAccumulatedObservations() { return true; }
-
 }  // namespace
 
 // Tests of the TestApp class.
@@ -326,11 +390,11 @@
   uint32_t x;
   // Test basic valid inputs.
   EXPECT_TRUE(test_app_->ParseIndex("index=1", &x));
-  EXPECT_EQ(x, (uint32_t) 1);
+  EXPECT_EQ(x, (uint32_t)1);
   EXPECT_TRUE(test_app_->ParseIndex("index=503", &x));
-  EXPECT_EQ(x, (uint32_t) 503);
+  EXPECT_EQ(x, (uint32_t)503);
   EXPECT_TRUE(test_app_->ParseIndex("index=1534", &x));
-  EXPECT_EQ(x, (uint32_t) 1534);
+  EXPECT_EQ(x, (uint32_t)1534);
   ClearOutput();
 
   // Input should contain 'index='.
@@ -358,6 +422,49 @@
   ClearOutput();
 }
 
+// Tests ParseDay utility function.
+TEST_F(TestAppTest, ParseDay) {
+  uint32_t d;
+  uint32_t today = test_app_->CurrentDayIndex();
+  // Test basic valid inputs.
+  EXPECT_TRUE(test_app_->ParseDay("day=42", &d));
+  EXPECT_EQ(d, (uint32_t)42);
+  EXPECT_TRUE(test_app_->ParseDay("day=today", &d));
+  EXPECT_EQ(d, today);
+  EXPECT_TRUE(test_app_->ParseDay("day=today-1", &d));
+  EXPECT_EQ(d, today - 1);
+  EXPECT_TRUE(test_app_->ParseDay("day=today+1", &d));
+  EXPECT_EQ(d, today + 1);
+  ClearOutput();
+
+  // Input should start with 'day='.
+  EXPECT_FALSE(test_app_->ParseDay("1", &d));
+  EXPECT_TRUE(OutputContains("Expected prefix 'day='."));
+  ClearOutput();
+
+  // Input should contain only one element.
+  EXPECT_FALSE(test_app_->ParseDay("day=1 2", &d));
+  EXPECT_FALSE(test_app_->ParseDay("day=today 2", &d));
+  ClearOutput();
+
+  // Input shouldn't contain non-numerical characters other than prefixes
+  // "day=", "day=today", "day=today+", or "day=today-"
+  EXPECT_FALSE(test_app_->ParseDay("day=yesterday", &d));
+  EXPECT_TRUE(OutputContains(
+      "Expected small non-negative integer instead of yesterday."));
+  ClearOutput();
+  EXPECT_FALSE(test_app_->ParseDay("day=today+two", &d));
+  EXPECT_TRUE(OutputContains("Expected non-negative integer instead of two."));
+  ClearOutput();
+
+  // Disallow input of the form "day=today-N" with N greater than today's day
+  // index.
+  EXPECT_FALSE(test_app_->ParseDay("day=today-20000000000", &d));
+  EXPECT_TRUE(OutputContains(
+      "Negative offset cannot be larger than the current day index."));
+  ClearOutput();
+}
+
 // Tests processing a bad command line.
 TEST_F(TestAppTest, ProcessCommandLineBad) {
   EXPECT_TRUE(test_app_->ProcessCommandLine("this is not a command"));
@@ -549,14 +656,25 @@
   ClearOutput();
   EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event"));
   EXPECT_TRUE(
-      OutputContains("Malformed log event command. Expected exactly one more "
+      OutputContains("Malformed log event command. Expected one more "
                      "argument for <event_code>."));
 
   ClearOutput();
-  EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event foo bar"));
+  EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event foo bar baz"));
   EXPECT_TRUE(
-      OutputContains("Malformed log event command. Expected exactly one more "
-                     "argument for <event_code>."));
+      OutputContains("Malformed log event command: too many arguments."));
+
+  ClearOutput();
+  EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event_count"));
+  EXPECT_TRUE(
+      OutputContains("Malformed log event_count command: missing at least one "
+                     "required argument."));
+
+  ClearOutput();
+  EXPECT_TRUE(test_app_->ProcessCommandLine(
+      "log 100 event_count foo bar baz two three four"));
+  EXPECT_TRUE(
+      OutputContains("Malformed log event_count command: too many arguments."));
 
   ClearOutput();
   EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event foo"));
@@ -603,11 +721,89 @@
                      "additional parameters."));
 }
 
+// Tests processing of valid generate command lines.
+TEST_F(TestAppTest, ProcessCommandLineGenerate) {
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate"));
+  EXPECT_TRUE(OutputContains("Generated"));
+  EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=today"));
+  EXPECT_TRUE(OutputContains("Generated"));
+  EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=today-5"));
+  EXPECT_TRUE(OutputContains("Generated"));
+  EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=today+5"));
+  EXPECT_TRUE(OutputContains("Generated"));
+  EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=150000"));
+  EXPECT_TRUE(OutputContains("Generated"));
+  EXPECT_TRUE(
+      OutputContains("locally aggregated observations for day index 150000"));
+  ClearOutput();
+}
+
+// Tests processing a bad generate command line.
+TEST_F(TestAppTest, ProcessCommandLineGenerateBad) {
+  // generate takes at most 1 argument.
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate foo bar"));
+  EXPECT_TRUE(
+      OutputContains("Malformed generate command: too many arguments."));
+  ClearOutput();
+}
+
+// Tests processing of a valid reset-aggregation command line.
+TEST_F(TestAppTest, ProcessCommandLineResetAggregation) {
+  EXPECT_TRUE(test_app_->ProcessCommandLine("reset-aggregation"));
+  EXPECT_TRUE(OutputContains("Reset local aggregation."));
+  ClearOutput();
+}
+
 // Tests processing a bad send command line.
 TEST_F(TestAppTest, ProcessCommandLineSendBad) {
   EXPECT_TRUE(test_app_->ProcessCommandLine("send foo"));
   EXPECT_TRUE(OutputContains("The send command doesn't take any arguments."));
 }
+
+// Tests some minimal behavior related to local aggregation:
+// (1) The response to a valid generate command should include the number of
+// generated observations and the day index
+// (2) generate should not produce observations twice for a given day index
+// unless reset-aggregation is run before the second attempt
+TEST_F(TestAppTest, GenerateAndReset) {
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=15000"));
+  std::ostringstream stream;
+  stream << "Generated " << kNumAggregatedObservations
+         << " locally aggregated observations for day index 15000";
+  EXPECT_TRUE(OutputContains(stream.str()));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=15000"));
+  EXPECT_TRUE(OutputContains(
+      "Generated 0 locally aggregated observations for day index 15000"));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=14999"));
+  EXPECT_TRUE(OutputContains(
+      "Generated 0 locally aggregated observations for day index 14999"));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("reset-aggregation"));
+  EXPECT_TRUE(OutputContains("Reset local aggregation."));
+  ClearOutput();
+
+  EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=15000"));
+  EXPECT_TRUE(OutputContains(stream.str()));
+  ClearOutput();
+}
+
 }  // namespace cobalt
 
 int main(int argc, char** argv) {