[test_app2] Generate locally aggregated observations in testapp2
Adds a command to generate locally aggregated observations for
a specified day index when using test_app2 in interactive mode,
and adds a day index argument to the LogEvent and LogEventCount
methods of TestApp so that events can be logged for a specified
day index. Also adds a command which deletes all state related
to local aggregation in an instance of the test app.
Change-Id: I1650c6829b4d0f57e51daca8bb50f51e5dfe02b2
diff --git a/logger/logger.h b/logger/logger.h
index 4d2d97f..19f58ea 100644
--- a/logger/logger.h
+++ b/logger/logger.h
@@ -21,6 +21,13 @@
#include "util/clock.h"
namespace cobalt {
+
+namespace internal {
+
+class RealLoggerFactory;
+
+} // namespace internal
+
namespace logger {
// Concrete implementation of LoggerInterface.
@@ -97,6 +104,7 @@
private:
friend class EventLogger;
friend class LoggerTest;
+ friend class cobalt::internal::RealLoggerFactory;
void SetClock(util::ClockInterface* clock) { clock_.reset(clock); }
diff --git a/tools/test_app2/test_app.cc b/tools/test_app2/test_app.cc
index 356389f..92e25ba 100644
--- a/tools/test_app2/test_app.cc
+++ b/tools/test_app2/test_app.cc
@@ -29,9 +29,12 @@
#include "logger/project_context.h"
#include "logger/status.h"
#include "util/clearcut/curl_http_client.h"
+#include "util/clock.h"
#include "util/consistent_proto_store.h"
+#include "util/datetime_util.h"
#include "util/pem_util.h"
#include "util/posix_file_system.h"
+#include "util/status.h"
namespace cobalt {
@@ -55,10 +58,15 @@
using logger::ObservationWriter;
using logger::ProjectContext;
using logger::Status;
+using util::ClockInterface;
using util::ConsistentProtoStore;
using util::EncryptedMessageMaker;
+using util::IncrementingClock;
using util::PemUtil;
using util::PosixFileSystem;
+using util::StatusCode;
+using util::SystemClock;
+using util::TimeToDayIndex;
// There are three modes of operation of the Cobalt TestClient program
// determined by the value of this flag.
@@ -110,26 +118,51 @@
*ostream << "----------------------------------" << std::endl;
*ostream << "help \tPrint this help message."
<< std::endl;
- *ostream << "log <num> event <index> \tLog <num> independent copies "
- "of an EVENT_OCCURRED event with event_code = <index>"
- << std::endl;
- *ostream << "log <num> event_count <index> <component> <duration> <count>"
- << std::endl
- << " \tLog <num> independent copies of an "
- "EVENT_COUNT event."
+ *ostream << "log <num> event <index> <day> \tLog <num> independent copies "
+ "of an EVENT_OCCURRED event."
<< std::endl
<< " \t- The <index> is the event_code of "
- "the EVENT_COUNT event."
+ "the EVENT_OCCURRED event."
<< std::endl
- << " \t- The <component> is the component "
- "name. Pass in \"\" if your metric does not use this field."
+ << " \t- The optional argument <day> is the "
+ "day for which the event should be logged."
<< std::endl
- << " \t- The <duration> specifies the "
- "period of time over which <count> EVENT_COUNT events occurred. "
- << "Pass in 0 if your metric does not use this field." << std::endl
- << " \t- The <count> specifies the number "
- "of times an EVENT_COUNT event occurred."
+ << " \t If provided, it "
+ "should be of the form \"day=<day index>\", \"day=today\", "
+ "\"day=today+<number of days>\", or \"day=today-<number of "
+ "days>\"."
+ << std::endl
+ << " \t The default day is the current day."
<< std::endl;
+
+ *ostream
+ << "log <num> event_count <index> <component> <duration> <count> <day>"
+ << std::endl
+ << " \tLog <num> independent copies of an "
+ "EVENT_COUNT event."
+ << std::endl
+ << " \t- The <index> is the event_code of "
+ "the EVENT_COUNT event."
+ << std::endl
+ << " \t- The <component> is the component "
+ "name. Pass in \"\" if your metric does not use this field."
+ << std::endl
+ << " \t- The <duration> specifies the "
+ "period of time over which <count> EVENT_COUNT events occurred. "
+ << "Pass in 0 if your metric does not use this field." << std::endl
+ << " \t- The <count> specifies the number "
+ "of times an EVENT_COUNT event occurred."
+ << std::endl
+ << " \t- The optional argument <day> is the "
+ "day for which the event should be logged."
+ << std::endl
+ << " \t If provided, it "
+ "should be of the form \"day=<day index>\", \"day=today\", "
+ "\"day=today+<number of days>\", or \"day=today-<number of "
+ "days>\"."
+ << std::endl
+ << " \t The default day is the current day."
+ << std::endl;
*ostream << "log <num> elapsed_time <index> <component> <elapsed_micros>"
<< std::endl
<< " \tLog <num> independent copies of an "
@@ -144,8 +177,7 @@
<< " \t- The <elapsed_micros> specifies how "
"many microseconds have elapsed for the given ELAPSED_TIME event."
<< std::endl;
- *ostream << "log <num> frame_rate <index> <component> <fps>"
- << std::endl
+ *ostream << "log <num> frame_rate <index> <component> <fps>" << std::endl
<< " \tLog <num> independent copies of a "
"FRAME_RATE event."
<< std::endl
@@ -157,8 +189,7 @@
<< std::endl
<< " \t- The <fps> specifies the frame rate."
<< std::endl;
- *ostream << "log <num> memory_usage <index> <component> <bytes>"
- << std::endl
+ *ostream << "log <num> memory_usage <index> <component> <bytes>" << std::endl
<< " \tLog <num> independent copies of a "
"MEMORY_USAGE event."
<< std::endl
@@ -196,6 +227,14 @@
*ostream << " \t- Each <val> is an int or string "
"value or an index <n> if <val>='index=<n>'."
<< std::endl;
+ *ostream << "generate <day> \tGenerate and send observations "
+ "for <day> for all locally aggregated reports. <day> may be a "
+ "day index, \'today\', \'today+N\', or \'today-N\'."
+ << std::endl;
+ *ostream
+ << "reset-aggregation \tDelete all state related to local "
+ "aggregation."
+ << std::endl;
*ostream << "ls \tList current values of "
"parameters."
<< std::endl;
@@ -313,6 +352,13 @@
return tokens;
}
+} // namespace
+
+namespace internal {
+
+// The number of seconds in a day.
+static const int kDay = 86400;
+
class RealLoggerFactory : public LoggerFactory {
public:
virtual ~RealLoggerFactory() = default;
@@ -327,7 +373,11 @@
std::unique_ptr<ConsistentProtoStore> obs_history_proto_store,
std::unique_ptr<SystemDataInterface> system_data);
- std::unique_ptr<LoggerInterface> NewLogger() override;
+ std::unique_ptr<LoggerInterface> NewLogger(uint32_t day_index = 0u) override;
+ size_t ObservationCount() override;
+ void ResetObservationCount() override;
+ void ResetLocalAggregation() override;
+ bool GenerateAggregatedObservations(uint32_t day_index) override;
bool SendAccumulatedObservations() override;
const ProjectContext* project_context() override {
return project_context_.get();
@@ -363,9 +413,7 @@
shipping_manager_(std::move(shipping_manager)),
local_aggregate_proto_store_(std::move(local_aggregate_proto_store)),
obs_history_proto_store_(std::move(obs_history_proto_store)),
- system_data_(std::move(system_data)) {}
-
-std::unique_ptr<LoggerInterface> RealLoggerFactory::NewLogger() {
+ system_data_(std::move(system_data)) {
encoder_.reset(
new Encoder(ClientSecret::GenerateNewSecret(), system_data_.get()));
observation_writer_.reset(
@@ -374,9 +422,43 @@
event_aggregator_.reset(new EventAggregator(
encoder_.get(), observation_writer_.get(),
local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
- return std::unique_ptr<LoggerInterface>(
- new Logger(encoder_.get(), event_aggregator_.get(),
- observation_writer_.get(), project_context_.get()));
+}
+
+std::unique_ptr<LoggerInterface> RealLoggerFactory::NewLogger(
+ uint32_t day_index) {
+ std::unique_ptr<Logger> logger = std::make_unique<Logger>(
+ encoder_.get(), event_aggregator_.get(), observation_writer_.get(),
+ project_context_.get());
+ if (day_index != 0u) {
+ auto mock_clock = new IncrementingClock();
+ mock_clock->set_time(std::chrono::system_clock::time_point(
+ std::chrono::seconds(kDay * day_index)));
+ logger->SetClock(mock_clock);
+ }
+ return std::move(logger);
+}
+
+size_t RealLoggerFactory::ObservationCount() {
+ return observation_store_->num_observations_added();
+}
+
+void RealLoggerFactory::ResetObservationCount() {
+ observation_store_->ResetObservationCounter();
+}
+
+// TODO(pesk): also clear the contents of the ConsistentProtoStores if we
+// implement a mode which uses them.
+void RealLoggerFactory::ResetLocalAggregation() {
+ event_aggregator_.reset(new EventAggregator(
+ encoder_.get(), observation_writer_.get(),
+ local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
+}
+
+bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
+ if (kOK == event_aggregator_->GenerateObservationsNoWorker(day_index)) {
+ return true;
+ }
+ return false;
}
bool RealLoggerFactory::SendAccumulatedObservations() {
@@ -386,7 +468,7 @@
return status.ok();
}
-} // namespace
+} // namespace internal
std::unique_ptr<TestApp> TestApp::CreateFromFlagsOrDie(int argc, char* argv[]) {
std::string config_bin_proto_path = FLAGS_config_bin_proto_path;
@@ -451,7 +533,7 @@
std::make_unique<util::clearcut::CurlHTTPClient>()));
shipping_manager->Start();
- std::unique_ptr<LoggerFactory> logger_factory(new RealLoggerFactory(
+ std::unique_ptr<LoggerFactory> logger_factory(new internal::RealLoggerFactory(
std::move(observation_encrypter), std::move(envelope_encrypter),
std::move(project_context), std::move(observation_store),
std::move(shipping_manager), std::move(local_aggregate_proto_store),
@@ -472,6 +554,7 @@
CHECK(logger_factory_->project_context());
CHECK(ostream_);
CHECK(SetMetric(initial_metric_name));
+ clock_ = new SystemClock();
}
bool TestApp::SetMetric(const std::string& metric_name) {
@@ -480,6 +563,8 @@
(*ostream_) << "There is no metric named '" << metric_name
<< "' in project "
<< logger_factory_->project_context()->DebugString() << "."
+ << std::endl
+ << "You may need to run `./cobaltb.py update_config`."
<< std::endl;
return false;
}
@@ -527,6 +612,16 @@
return true;
}
+ if (command[0] == "generate") {
+ GenerateAggregatedObservations(command);
+ return true;
+ }
+
+ if (command[0] == "reset-aggregation") {
+ ResetLocalAggregation();
+ return true;
+ }
+
if (command[0] == "ls") {
ListParameters();
return true;
@@ -620,14 +715,24 @@
return;
}
+uint32_t TestApp::CurrentDayIndex() {
+ return TimeToDayIndex(std::chrono::system_clock::to_time_t(clock_->now()),
+ MetricDefinition::UTC);
+}
+
// We know that command[0] = "log", command[1] = <num_clients>
void TestApp::LogEvent(uint64_t num_clients,
const std::vector<std::string>& command) {
- if (command.size() != 4) {
- *ostream_ << "Malformed log event command. Expected exactly one more "
+ auto command_size = command.size();
+ if (command_size < 4) {
+ *ostream_ << "Malformed log event command. Expected one more "
"argument for <event_code>."
<< std::endl;
return;
+ } else if (command_size > 5) {
+ *ostream_ << "Malformed log event command: too many arguments."
+ << std::endl;
+ return;
}
int64_t event_code;
@@ -635,23 +740,32 @@
return;
}
- LogEvent(num_clients, event_code);
+ uint32_t day_index = 0;
+ if (command_size == 5 && !ParseDay(command[4], &day_index)) {
+ *ostream_ << "Unable to parse <day> from log command: " << command[4]
+ << std::endl;
+ return;
+ }
+ LogEvent(num_clients, event_code, day_index);
}
-void TestApp::LogEvent(size_t num_clients, uint32_t event_code) {
+void TestApp::LogEvent(size_t num_clients, uint32_t event_code,
+ uint32_t day_index) {
if (!current_metric_) {
*ostream_ << "Cannot LogEvent. There is no current metric set."
<< std::endl;
return;
}
- VLOG(6) << "TestApp::LogEvents(" << num_clients << ", " << event_code << ").";
+ VLOG(6) << "TestApp::LogEvents(" << num_clients << ", " << event_code << ", "
+ << day_index << ").";
for (size_t i = 0; i < num_clients; i++) {
- auto logger = logger_factory_->NewLogger();
+ auto logger = logger_factory_->NewLogger(day_index);
auto status = logger->LogEvent(current_metric_->id(), event_code);
if (status != logger::kOK) {
LOG(ERROR) << "LogEvent() failed with status " << status
<< ". metric=" << current_metric_->metric_name()
- << ". event_code=" << event_code;
+ << ". event_code=" << event_code
+ << ". day_index=" << day_index;
break;
}
}
@@ -659,14 +773,20 @@
}
// We know that command[0] = "log", command[1] = <num_clients>,
-// command[2] = "event_count"
+// command[2] = "event_count".
void TestApp::LogEventCount(uint64_t num_clients,
const std::vector<std::string>& command) {
- if (command.size() != 7) {
- *ostream_ << "Malformed log event_count command. Expected 4 additional "
- << "parameters." << std::endl;
+ auto command_size = command.size();
+ if (command_size < 7) {
+ *ostream_ << "Malformed log event_count command: missing at least one "
+ "required argument."
+ << std::endl;
return;
}
+ if (command_size > 8) {
+ *ostream_ << "Malformed log event_count command: too many arguments."
+ << std::endl;
+ }
int64_t event_code;
if (!ParseNonNegativeInt(command[3], true, &event_code)) {
@@ -688,29 +808,38 @@
<< std::endl;
return;
}
+ uint32_t day_index = 0u;
+ if (command_size == 8 && !ParseDay(command[7], &day_index)) {
+ *ostream_ << "Unable to parse <day> from log command: " << command[7]
+ << std::endl;
+ return;
+ }
- LogEventCount(num_clients, event_code, command[4], duration, count);
+ LogEventCount(num_clients, event_code, command[4], duration, count,
+ day_index);
}
void TestApp::LogEventCount(size_t num_clients, uint32_t event_code,
const std::string& component, int64_t duration,
- int64_t count) {
+ int64_t count, uint32_t day_index) {
if (!current_metric_) {
*ostream_ << "Cannot LogEventCount. There is no current metric set."
<< std::endl;
return;
}
VLOG(6) << "TestApp::LogEventCount(" << num_clients << ", " << event_code
- << ", " << component << ", " << duration << ", " << count << ").";
+ << ", " << component << ", " << duration << ", " << count << ", "
+ << day_index << ").";
for (size_t i = 0; i < num_clients; i++) {
- auto logger = logger_factory_->NewLogger();
+ auto logger = logger_factory_->NewLogger(day_index);
auto status = logger->LogEventCount(current_metric_->id(), event_code,
component, duration, count);
if (status != logger::kOK) {
LOG(ERROR) << "LogEventCount() failed with status " << status
<< ". metric=" << current_metric_->metric_name()
<< ". event_code=" << event_code << ". component=" << component
- << ". duration=" << duration << ". count=" << count;
+ << ". duration=" << duration << ". count=" << count
+ << ". day_index=" << day_index;
break;
}
}
@@ -789,8 +918,8 @@
float fps;
if (!ParseFloat(command[5], true, &fps)) {
- *ostream_ << "Unable to parse <fps> from log command: "
- << command[5] << std::endl;
+ *ostream_ << "Unable to parse <fps> from log command: " << command[5]
+ << std::endl;
return;
}
@@ -809,8 +938,8 @@
<< ", " << component << ", " << fps << ").";
for (size_t i = 0; i < num_clients; i++) {
auto logger = logger_factory_->NewLogger();
- auto status = logger->LogFrameRate(current_metric_->id(), event_code,
- component, fps);
+ auto status =
+ logger->LogFrameRate(current_metric_->id(), event_code, component, fps);
if (status != logger::kOK) {
LOG(ERROR) << "LogFrameRate() failed with status " << status
<< ". metric=" << current_metric_->metric_name()
@@ -841,8 +970,8 @@
int64_t bytes;
if (!ParseNonNegativeInt(command[5], true, &bytes)) {
- *ostream_ << "Unable to parse <bytes> from log command: "
- << command[5] << std::endl;
+ *ostream_ << "Unable to parse <bytes> from log command: " << command[5]
+ << std::endl;
return;
}
@@ -862,7 +991,7 @@
for (size_t i = 0; i < num_clients; i++) {
auto logger = logger_factory_->NewLogger();
auto status = logger->LogMemoryUsage(current_metric_->id(), event_code,
- component, bytes);
+ component, bytes);
if (status != logger::kOK) {
LOG(ERROR) << "LogMemoryUsage() failed with status " << status
<< ". metric=" << current_metric_->metric_name()
@@ -893,15 +1022,15 @@
int64_t bucket;
if (!ParseNonNegativeInt(command[5], true, &bucket)) {
- *ostream_ << "Unable to parse <bucket> from log command: "
- << command[5] << std::endl;
+ *ostream_ << "Unable to parse <bucket> from log command: " << command[5]
+ << std::endl;
return;
}
int64_t count;
if (!ParseNonNegativeInt(command[6], true, &count)) {
- *ostream_ << "Unable to parse <count> from log command: "
- << command[6] << std::endl;
+ *ostream_ << "Unable to parse <count> from log command: " << command[6]
+ << std::endl;
return;
}
@@ -909,7 +1038,8 @@
}
void TestApp::LogIntHistogram(uint64_t num_clients, uint32_t event_code,
- const std::string& component, int64_t bucket, int64_t count) {
+ const std::string& component, int64_t bucket,
+ int64_t count) {
if (!current_metric_) {
*ostream_ << "Cannot LogIntHistogram. There is no current metric set."
<< std::endl;
@@ -928,7 +1058,7 @@
auto logger = logger_factory_->NewLogger();
auto status = logger->LogIntHistogram(current_metric_->id(), event_code,
- component, std::move(histogram_ptr));
+ component, std::move(histogram_ptr));
if (status != logger::kOK) {
LOG(ERROR) << "LogIntHistogram() failed with status " << status
<< ". metric=" << current_metric_->metric_name()
@@ -992,6 +1122,49 @@
*ostream_ << "Done." << std::endl;
}
+// We know that command[0] = "generate"
+void TestApp::GenerateAggregatedObservations(
+ const std::vector<std::string>& command) {
+ if (command.size() > 2) {
+ *ostream_ << "Malformed generate command: too many arguments." << std::endl;
+ return;
+ }
+ uint32_t day_index;
+ if (command.size() < 2) {
+ day_index = CurrentDayIndex();
+ } else if (!ParseDay(command[1], &day_index)) {
+ *ostream_ << "Could not parse argument " << command[1] << " to a day index"
+ << std::endl;
+ return;
+ }
+ GenerateAggregatedObservationsAndSend(day_index);
+ return;
+}
+
+void TestApp::GenerateAggregatedObservationsAndSend(uint32_t day_index) {
+ logger_factory_->NewLogger();
+ logger_factory_->ResetObservationCount();
+ if (logger_factory_->GenerateAggregatedObservations(day_index)) {
+ *ostream_ << "Generated " << logger_factory_->ObservationCount()
+ << " locally aggregated observations for day index " << day_index
+ << std::endl;
+ } else {
+ *ostream_
+ << "Failed to generate locally aggregated observations for day index "
+ << day_index << std::endl;
+ return;
+ }
+ if (!logger_factory_->SendAccumulatedObservations()) {
+ *ostream_ << "Failed to send locally aggregated observations" << std::endl;
+ }
+}
+
+void TestApp::ResetLocalAggregation() {
+ logger_factory_->NewLogger();
+ logger_factory_->ResetLocalAggregation();
+ *ostream_ << "Reset local aggregation." << std::endl;
+}
+
void TestApp::ListParameters() {
std::string metric_name = "No metric set";
if (current_metric_) {
@@ -1159,6 +1332,66 @@
return true;
}
+bool TestApp::ParseDay(const std::string& str, uint32_t* day_index) {
+ CHECK(index);
+ if (str.size() < 5 || str.substr(0, 4) != "day=") {
+ *ostream_ << "Expected prefix 'day='." << std::endl;
+ return false;
+ }
+ auto day_string = str.substr(4);
+
+ // Handle the case where |day_string| is "today", "today+N", or "today-N".
+ if (day_string.size() >= 5 && day_string.substr(0, 5) == "today") {
+ auto current_day_index = CurrentDayIndex();
+ if (day_string.size() == 5) {
+ *day_index = current_day_index;
+ return true;
+ }
+
+ if (day_string.size() > 6) {
+ int64_t offset;
+ if (!ParseNonNegativeInt(day_string.substr(6), true, &offset)) {
+ return false;
+ }
+ auto modifier = day_string.substr(5, 1);
+ if (modifier == "+") {
+ *day_index = (current_day_index + offset);
+ return true;
+ } else if (modifier == "-") {
+ if (offset > current_day_index) {
+ *ostream_
+ << "Negative offset cannot be larger than the current day index."
+ << std::endl;
+ return false;
+ }
+ *day_index = (current_day_index - offset);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ // Handle the case where |day_string| is an integer.
+ std::istringstream iss(day_string);
+ int64_t possible_day_index;
+ iss >> possible_day_index;
+ char c;
+ if (iss.fail() || iss.get(c) || possible_day_index < 0 ||
+ possible_day_index > UINT32_MAX) {
+ if (mode_ == kInteractive) {
+ *ostream_ << "Expected small non-negative integer instead of "
+ << day_string << "." << std::endl;
+ } else {
+ LOG(ERROR) << "Expected small non-negative integer instead of "
+ << day_string;
+ }
+ return false;
+ }
+ *day_index = possible_day_index;
+ return true;
+}
+
// Parses a string of the form <part>:<value> and writes <part> into |part_name|
// and <value> into |value|.
// Returns true if and only if this succeeds.
diff --git a/tools/test_app2/test_app.h b/tools/test_app2/test_app.h
index 1c49e81..48f4710 100644
--- a/tools/test_app2/test_app.h
+++ b/tools/test_app2/test_app.h
@@ -17,6 +17,7 @@
#include "logger/logger.h"
#include "logger/project_context.h"
#include "third_party/googletest/googletest/include/gtest/gtest.h"
+#include "util/clock.h"
namespace cobalt {
@@ -24,10 +25,21 @@
public:
virtual ~LoggerFactory() = default;
- virtual std::unique_ptr<logger::LoggerInterface> NewLogger() = 0;
+ // Creates a new Logger. If a nonzero day index is provided, then the Logger
+ // will log events with that day index.
+ virtual std::unique_ptr<logger::LoggerInterface> NewLogger(
+ uint32_t day_index = 0) = 0;
virtual const logger::ProjectContext* project_context() = 0;
+ virtual size_t ObservationCount() = 0;
+
+ virtual void ResetObservationCount() = 0;
+
+ virtual void ResetLocalAggregation() = 0;
+
+ virtual bool GenerateAggregatedObservations(uint32_t day_index) = 0;
+
virtual bool SendAccumulatedObservations() = 0;
};
@@ -37,9 +49,9 @@
static std::unique_ptr<TestApp> CreateFromFlagsOrDie(int argc, char* argv[]);
// Modes of operation of the Cobalt test application. An instance of
- // TestApp is in interactive mode unless set_mode() is invoked. set_mode()
- // is invoked from CreateFromFlagsOrDie() in order to set the mode to the
- // one specified by the -mode flag.
+ // TestApp is in interactive mode unless set_mode() is invoked.
+ // set_mode() is invoked from CreateFromFlagsOrDie() in order to set the
+ // mode to the one specified by the -mode flag.
enum Mode {
// In this mode the TestApp is controlled via an interactive command-line
// loop.
@@ -75,6 +87,9 @@
// Returns false if an only if the specified command is "quit".
bool ProcessCommandLine(const std::string command_line);
+ // Returns the current day index in UTC according to the test app's clock.
+ uint32_t CurrentDayIndex();
+
private:
// Implements interactive mode.
void CommandLoop();
@@ -89,12 +104,13 @@
void Log(const std::vector<std::string>& command);
void LogEvent(uint64_t num_clients, const std::vector<std::string>& command);
- void LogEvent(size_t num_clients, uint32_t event_code);
+ void LogEvent(size_t num_clients, uint32_t event_code,
+ uint32_t day_index = 0u);
void LogEventCount(uint64_t num_clients,
const std::vector<std::string>& command);
void LogEventCount(size_t num_clients, uint32_t event_code,
const std::string& component, int64_t duration,
- int64_t count);
+ int64_t count, uint32_t day_index = 0u);
void LogElapsedTime(uint64_t num_clients,
const std::vector<std::string>& command);
void LogElapsedTime(uint64_t num_clients, uint32_t event_code,
@@ -118,6 +134,13 @@
const std::vector<std::string>& metric_parts,
const std::vector<std::string>& values);
+ // Generates all aggregated observations for a day index specified by
+ // |command|.
+ void GenerateAggregatedObservations(const std::vector<std::string>& command);
+
+ // Deletes the local aggregates and the history of aggregated observations.
+ void ResetLocalAggregation();
+
void ListParameters();
void SetParameter(const std::vector<std::string>& command);
@@ -132,12 +155,23 @@
bool ParseIndex(const std::string& str, uint32_t* index);
+ // Parses strings of the following forms:
+ // day=today
+ // day=today+N, where N is a nonnegative number
+ // day=today-N, where N is a nonnegative number <= the current day index
+ // day=K, where K is a day index
+ // Computes the day index of that day in UTC, using |clock_| to get the
+ // current day index if |str| begins with "day=today", and writes it to
+ // |day_index|.
+ bool ParseDay(const std::string& str, uint32_t* day_index);
+
bool ParseNonNegativeInt(const std::string& str, bool complain, int64_t* x);
FRIEND_TEST(TestAppTest, ParseInt);
FRIEND_TEST(TestAppTest, ParseFloat);
FRIEND_TEST(TestAppTest, ParseIndex);
FRIEND_TEST(TestAppTest, ParseNonNegativeInt);
+ FRIEND_TEST(TestAppTest, ParseDay);
// Parses a string of the form <part>:<value> and writes <part> into
// |part_name| and <value> into |value|.
@@ -151,12 +185,20 @@
std::vector<std::string> dimension_names,
std::vector<std::string> values);
+ void GenerateAggregatedObservationsAndSend(uint32_t day_index);
+
+ void ResetLocalAggregateStore();
+
+ void ResetAggregatedObservationHistory();
+
const MetricDefinition* current_metric_;
// The TestApp is in interactive mode unless set_mode() is invoked.
Mode mode_ = kInteractive;
std::unique_ptr<LoggerFactory> logger_factory_;
std::ostream* ostream_;
+ util::ClockInterface* clock_;
};
} // namespace cobalt
+
#endif // COBALT_TOOLS_TEST_APP2_TEST_APP_H_
diff --git a/tools/test_app2/test_app_test.cc b/tools/test_app2/test_app_test.cc
index f890c00..2fece9a 100644
--- a/tools/test_app2/test_app_test.cc
+++ b/tools/test_app2/test_app_test.cc
@@ -13,6 +13,7 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#include "logger/logger_test_utils.h"
#include "logger/project_context.h"
#include "third_party/googletest/googletest/include/gtest/gtest.h"
@@ -20,6 +21,7 @@
using logger::LoggerInterface;
using logger::ProjectContext;
+using logger::testing::FakeObservationStore;
DECLARE_uint32(num_clients);
DECLARE_string(values);
@@ -139,8 +141,31 @@
}
}
+metric {
+ metric_name: "FeaturesActive"
+ metric_type: EVENT_OCCURRED
+ customer_id: 1
+ project_id: 1
+ id: 7
+ max_event_code: 9
+ reports: {
+ report_name: "FeaturesActiveUniqueDevices"
+ id: 301
+ report_type: UNIQUE_N_DAY_ACTIVES
+ local_privacy_noise_level: SMALL
+ window_size: 1
+ window_size: 7
+ }
+}
+
)";
+// The number of locally aggregated Observations that should be generated for
+// each day. Since Observation generation is faked here, this number does not
+// need to correspond to a test Metric registry. It just needs to be a positive
+// number.
+static const int kNumAggregatedObservations = 20;
+
bool PopulateMetricDefinitions(MetricDefinitions* metric_definitions) {
google::protobuf::TextFormat::Parser parser;
return parser.ParseFromString(kMetricDefinitions, metric_definitions);
@@ -150,28 +175,67 @@
public:
explicit TestLoggerFactory(const ProjectContext* project_context);
- std::unique_ptr<LoggerInterface> NewLogger() override;
+ std::unique_ptr<LoggerInterface> NewLogger(uint32_t day_index) override;
const ProjectContext* project_context() override;
+
+ size_t ObservationCount() override;
+
+ void ResetObservationCount() override;
+
+ void ResetLocalAggregation() override;
+
+ bool GenerateAggregatedObservations(uint32_t day_index) override;
+
bool SendAccumulatedObservations() override;
private:
const ProjectContext* project_context_;
+ std::unique_ptr<FakeObservationStore> observation_store_;
+ uint32_t last_obs_generation_;
};
TestLoggerFactory::TestLoggerFactory(const ProjectContext* project_context)
- : project_context_(project_context) {}
+ : project_context_(project_context),
+ observation_store_(new FakeObservationStore),
+ last_obs_generation_(0) {}
-std::unique_ptr<LoggerInterface> TestLoggerFactory::NewLogger() {
+std::unique_ptr<LoggerInterface> TestLoggerFactory::NewLogger(
+ uint32_t day_index) {
return nullptr;
}
+size_t TestLoggerFactory::ObservationCount() {
+ return observation_store_->num_observations_added();
+}
+
+void TestLoggerFactory::ResetObservationCount() {
+ return observation_store_->ResetObservationCounter();
+}
+
+void TestLoggerFactory::ResetLocalAggregation() { last_obs_generation_ = 0; }
+
+bool TestLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
+ if (day_index > last_obs_generation_) {
+ for (int i = 0; i < kNumAggregatedObservations; i++) {
+ if (encoder::ObservationStore::kOk !=
+ observation_store_->AddEncryptedObservation(
+ std::make_unique<EncryptedMessage>(),
+ std::make_unique<ObservationMetadata>())) {
+ return false;
+ }
+ }
+ last_obs_generation_ = day_index;
+ }
+ return true;
+}
+
+bool TestLoggerFactory::SendAccumulatedObservations() { return true; }
+
const ProjectContext* TestLoggerFactory::project_context() {
return project_context_;
}
-bool TestLoggerFactory::SendAccumulatedObservations() { return true; }
-
} // namespace
// Tests of the TestApp class.
@@ -326,11 +390,11 @@
uint32_t x;
// Test basic valid inputs.
EXPECT_TRUE(test_app_->ParseIndex("index=1", &x));
- EXPECT_EQ(x, (uint32_t) 1);
+ EXPECT_EQ(x, (uint32_t)1);
EXPECT_TRUE(test_app_->ParseIndex("index=503", &x));
- EXPECT_EQ(x, (uint32_t) 503);
+ EXPECT_EQ(x, (uint32_t)503);
EXPECT_TRUE(test_app_->ParseIndex("index=1534", &x));
- EXPECT_EQ(x, (uint32_t) 1534);
+ EXPECT_EQ(x, (uint32_t)1534);
ClearOutput();
// Input should contain 'index='.
@@ -358,6 +422,49 @@
ClearOutput();
}
+// Tests ParseDay utility function.
+TEST_F(TestAppTest, ParseDay) {
+ uint32_t d;
+ uint32_t today = test_app_->CurrentDayIndex();
+ // Test basic valid inputs.
+ EXPECT_TRUE(test_app_->ParseDay("day=42", &d));
+ EXPECT_EQ(d, (uint32_t)42);
+ EXPECT_TRUE(test_app_->ParseDay("day=today", &d));
+ EXPECT_EQ(d, today);
+ EXPECT_TRUE(test_app_->ParseDay("day=today-1", &d));
+ EXPECT_EQ(d, today - 1);
+ EXPECT_TRUE(test_app_->ParseDay("day=today+1", &d));
+ EXPECT_EQ(d, today + 1);
+ ClearOutput();
+
+ // Input should start with 'day='.
+ EXPECT_FALSE(test_app_->ParseDay("1", &d));
+ EXPECT_TRUE(OutputContains("Expected prefix 'day='."));
+ ClearOutput();
+
+ // Input should contain only one element.
+ EXPECT_FALSE(test_app_->ParseDay("day=1 2", &d));
+ EXPECT_FALSE(test_app_->ParseDay("day=today 2", &d));
+ ClearOutput();
+
+ // Input shouldn't contain non-numerical characters other than prefixes
+ // "day=", "day=today", "day=today+", or "day=today-"
+ EXPECT_FALSE(test_app_->ParseDay("day=yesterday", &d));
+ EXPECT_TRUE(OutputContains(
+ "Expected small non-negative integer instead of yesterday."));
+ ClearOutput();
+ EXPECT_FALSE(test_app_->ParseDay("day=today+two", &d));
+ EXPECT_TRUE(OutputContains("Expected non-negative integer instead of two."));
+ ClearOutput();
+
+ // Disallow input of the form "day=today-N" with N greater than today's day
+ // index.
+ EXPECT_FALSE(test_app_->ParseDay("day=today-20000000000", &d));
+ EXPECT_TRUE(OutputContains(
+ "Negative offset cannot be larger than the current day index."));
+ ClearOutput();
+}
+
// Tests processing a bad command line.
TEST_F(TestAppTest, ProcessCommandLineBad) {
EXPECT_TRUE(test_app_->ProcessCommandLine("this is not a command"));
@@ -549,14 +656,25 @@
ClearOutput();
EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event"));
EXPECT_TRUE(
- OutputContains("Malformed log event command. Expected exactly one more "
+ OutputContains("Malformed log event command. Expected one more "
"argument for <event_code>."));
ClearOutput();
- EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event foo bar"));
+ EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event foo bar baz"));
EXPECT_TRUE(
- OutputContains("Malformed log event command. Expected exactly one more "
- "argument for <event_code>."));
+ OutputContains("Malformed log event command: too many arguments."));
+
+ ClearOutput();
+ EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event_count"));
+ EXPECT_TRUE(
+ OutputContains("Malformed log event_count command: missing at least one "
+ "required argument."));
+
+ ClearOutput();
+ EXPECT_TRUE(test_app_->ProcessCommandLine(
+ "log 100 event_count foo bar baz two three four"));
+ EXPECT_TRUE(
+ OutputContains("Malformed log event_count command: too many arguments."));
ClearOutput();
EXPECT_TRUE(test_app_->ProcessCommandLine("log 100 event foo"));
@@ -603,11 +721,89 @@
"additional parameters."));
}
+// Tests processing of valid generate command lines.
+TEST_F(TestAppTest, ProcessCommandLineGenerate) {
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate"));
+ EXPECT_TRUE(OutputContains("Generated"));
+ EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=today"));
+ EXPECT_TRUE(OutputContains("Generated"));
+ EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=today-5"));
+ EXPECT_TRUE(OutputContains("Generated"));
+ EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=today+5"));
+ EXPECT_TRUE(OutputContains("Generated"));
+ EXPECT_TRUE(OutputContains("locally aggregated observations for day index"));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=150000"));
+ EXPECT_TRUE(OutputContains("Generated"));
+ EXPECT_TRUE(
+ OutputContains("locally aggregated observations for day index 150000"));
+ ClearOutput();
+}
+
+// Tests processing a bad generate command line.
+TEST_F(TestAppTest, ProcessCommandLineGenerateBad) {
+ // generate takes at most 1 argument.
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate foo bar"));
+ EXPECT_TRUE(
+ OutputContains("Malformed generate command: too many arguments."));
+ ClearOutput();
+}
+
+// Tests processing of a valid reset-aggregation command line.
+TEST_F(TestAppTest, ProcessCommandLineResetAggregation) {
+ EXPECT_TRUE(test_app_->ProcessCommandLine("reset-aggregation"));
+ EXPECT_TRUE(OutputContains("Reset local aggregation."));
+ ClearOutput();
+}
+
// Tests processing a bad send command line.
TEST_F(TestAppTest, ProcessCommandLineSendBad) {
EXPECT_TRUE(test_app_->ProcessCommandLine("send foo"));
EXPECT_TRUE(OutputContains("The send command doesn't take any arguments."));
}
+
+// Tests some minimal behavior related to local aggregation:
+// (1) The response to a valid generate command should include the number of
+// generated observations and the day index
+// (2) generate should not produce observations twice for a given day index
+// unless reset-aggregation is run before the second attempt
+TEST_F(TestAppTest, GenerateAndReset) {
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=15000"));
+ std::ostringstream stream;
+ stream << "Generated " << kNumAggregatedObservations
+ << " locally aggregated observations for day index 15000";
+ EXPECT_TRUE(OutputContains(stream.str()));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=15000"));
+ EXPECT_TRUE(OutputContains(
+ "Generated 0 locally aggregated observations for day index 15000"));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=14999"));
+ EXPECT_TRUE(OutputContains(
+ "Generated 0 locally aggregated observations for day index 14999"));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("reset-aggregation"));
+ EXPECT_TRUE(OutputContains("Reset local aggregation."));
+ ClearOutput();
+
+ EXPECT_TRUE(test_app_->ProcessCommandLine("generate day=15000"));
+ EXPECT_TRUE(OutputContains(stream.str()));
+ ClearOutput();
+}
+
} // namespace cobalt
int main(int argc, char** argv) {