New version of end-to-end test.
We delete cgen as it is no longer needed.
We introduce a new version of the end-to-end test that
uses the same tools as are used manually when
running the demo.
We start running this new end-to-end test by default as part of
cobaltb.py test.
Change-Id: Ifc8783eb580caf87ed5fb45cf174d462cfcd115a
diff --git a/cobaltb.py b/cobaltb.py
index 257ea8d..69f1cd4 100755
--- a/cobaltb.py
+++ b/cobaltb.py
@@ -157,6 +157,16 @@
"--bigtable_project_name=%s" % args.bigtable_project_name,
"--bigtable_instance_name=%s" % args.bigtable_instance_name
]
+ if (test_dir == 'e2e_tests'):
+ test_args = [
+ "-analyzer_uri=localhost:%d" % DEFAULT_ANALYZER_SERVICE_PORT,
+ "-shuffler_uri=localhost:%d" % DEFAULT_SHUFFLER_PORT,
+ "-report_master_uri=localhost:%d" % DEFAULT_REPORT_MASTER_PORT,
+ ("-observation_querier_path=%s" %
+ process_starter.OBSERVATION_QUERIER_PATH),
+ "-test_app_path=%s" % process_starter.TEST_APP_PATH,
+ "-sub_process_v=%d"%_verbose_count
+ ]
print '********************************************************'
success = (test_runner.run_all_tests(
test_dir, start_bt_emulator=start_bt_emulator,
@@ -361,8 +371,8 @@
help='Runs Cobalt tests. You must build first.')
sub_parser.set_defaults(func=_test)
sub_parser.add_argument('--tests', choices=TEST_FILTERS,
- help='Specify a subset of tests to run. Default=noe2e',
- default='noe2e')
+ help='Specify a subset of tests to run. Default=all',
+ default='all')
sub_parser.add_argument('--bigtable_project_name',
help='Specify a Cloud project against which to run the cloud_bt tests.'
' Optional. Default=google.com:shuffler-test.'
diff --git a/config/registered/registered_reports.txt b/config/registered/registered_reports.txt
index be1c16e..9006643 100644
--- a/config/registered/registered_reports.txt
+++ b/config/registered/registered_reports.txt
@@ -39,7 +39,7 @@
customer_id: 1
project_id: 1
id: 2
- name: "Fuschsia Usage by Hour"
+ name: "Fuchsia Usage by Hour"
description: "This is a fictional report used for the development of Cobalt."
metric_id: 2
variable {
@@ -54,7 +54,7 @@
customer_id: 1
project_id: 1
id: 3
- name: "Fuschsia Fruit Consumption and Rating Joint Analysis"
+ name: "Fuchsia Fruit Consumption and Rating Joint Analysis"
description: "This is a fictional report used for the development of Cobalt."
metric_id: 3
variable {
diff --git a/end_to_end_tests/CMakeLists.txt b/end_to_end_tests/CMakeLists.txt
index db7e838..b96bf71 100644
--- a/end_to_end_tests/CMakeLists.txt
+++ b/end_to_end_tests/CMakeLists.txt
@@ -12,30 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+set(REPORT_MASTER_PB_GO "${CMAKE_BINARY_DIR}/go-proto-gen/src/analyzer/report_master/report_master.pb.go")
+set_source_files_properties(${REPORT_MASTER_PB_GO} PROPERTIES GENERATED TRUE)
+
# Build end-to-end tests
-set(E2E_TESTS_SRC "${CMAKE_CURRENT_SOURCE_DIR}/src")
-set(COBALT_TEST_SRC "${E2E_TESTS_SRC}/cobalt_test.go")
-set(COBALT_TEST_BIN ${DIR_END_TO_END_TESTS}/cobalt_test)
+set(REPORT_CLIENT_SRC "${CMAKE_SOURCE_DIR}/tools/go/src/report_client/report_client.go")
+set(COBALT_E2E_TEST_SRC "${CMAKE_CURRENT_SOURCE_DIR}/src/cobalt_e2e_test.go")
+set(COBALT_E2E_TEST_BINARY ${DIR_END_TO_END_TESTS}/cobalt_e2e_test)
-set(COBALT_SHUFFLER_CONF_SRC
- "${CMAKE_CURRENT_SOURCE_DIR}/config/shuffler_default.conf")
-set(COBALT_TEST_CONF_BIN ${CMAKE_BINARY_DIR}/config/shuffler_default.conf)
-
-add_custom_command(OUTPUT ${COBALT_TEST_CONF_BIN}
- COMMAND cp ${COBALT_SHUFFLER_CONF_SRC} ${CMAKE_BINARY_DIR}/config
- DEPENDS ${COBALT_SHUFFLER_CONF_SRC}
+add_custom_command(OUTPUT ${COBALT_E2E_TEST_BINARY}
+ COMMAND ${GO_BIN} test -c -o ${COBALT_E2E_TEST_BINARY}
+ ${COBALT_E2E_TEST_SRC}
+ DEPENDS ${COBALT_E2E_TEST_SRC}
+ DEPENDS ${REPORT_CLIENT_SRC}
+ DEPENDS ${REPORT_MASTER_PB_GO}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)
-add_custom_command(OUTPUT ${COBALT_TEST_BIN}
- COMMAND ${GO_BIN} test -c -o ${COBALT_TEST_BIN}
- ${COBALT_TEST_SRC}
- DEPENDS ${COBALT_TEST_SRC}
- DEPENDS ${COBALT_SHUFFLER_CONF_SRC}
- WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
-)
-
-add_custom_target(test_all ALL
- DEPENDS ${COBALT_TEST_BIN}
- DEPENDS ${COBALT_TEST_CONF_BIN}
+add_custom_target(build_e2e_tests ALL
+ DEPENDS ${COBALT_E2E_TEST_BINARY}
)
diff --git a/end_to_end_tests/src/cobalt_e2e_test.go b/end_to_end_tests/src/cobalt_e2e_test.go
new file mode 100644
index 0000000..3ab38d9
--- /dev/null
+++ b/end_to_end_tests/src/cobalt_e2e_test.go
@@ -0,0 +1,467 @@
+// Copyright 2017 The Fuchsia Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+An end-to-end test of Cobalt. This test assumes the existence of a running
+Cobalt system. The URIs of the Shuffler, the Analyzer Service and the
+ReportMaster are passed in to the test as flags. Usually this test is
+invoked via the command "cobaltb.py test --tests=e2e" which invokes
+"tools/test_runner.py" which uses "tools/process_starter.py" to start the
+Cobalt processes prior to invoking this test.
+
+The test uses the |cobalt_test_app| program in order to encode values into
+observations and send those observations to the Shuffler. It then uses
+the |query_observations| tool to query the Observation Store and it waits
+until all of the observations have arrived at the Observation Store. It
+then uses the report_client to contact the ReportMaster and request
+that a report be generated and wait for the report to be generated. It then
+validates the report.
+
+The test assumes particular contents of the Cobalt registration system.
+By default "tools/process_starter.py" will cause the Cobalt processes to
+use the configuration files located in <source root>/config/registered. Thus
+this test must be kept in sync with the contents of those files. Here
+we include a copy of the relevant parts of those files for reference:
+
+#### Metric (1, 1, 1)
+element {
+ customer_id: 1
+ project_id: 1
+ id: 1
+ name: "Fuchsia Popular URLs"
+ description: "This is a fictional metric used for the development of Cobalt."
+ time_zone_policy: LOCAL
+ parts {
+ key: "url"
+ value {
+ description: "A URL."
+ data_type: STRING
+ }
+ }
+}
+
+#### Metric (1, 1, 2)
+element {
+ customer_id: 1
+ project_id: 1
+ id: 2
+ name: "Fuchsia Usage by Hour"
+ description: "This is a fictional metric used for the development of Cobalt."
+ time_zone_policy: LOCAL
+ parts {
+ key: "hour"
+ value {
+ description: "An integer from 0 to 23 representing the hour of the day."
+ data_type: INT
+ }
+ }
+}
+
+#### Encoding (1, 1, 1)
+element {
+ customer_id: 1
+ project_id: 1
+ id: 1
+ forculus {
+ threshold: 20
+ epoch_type: DAY
+ }
+}
+
+#### Encoding (1, 1, 2)
+element {
+ customer_id: 1
+ project_id: 1
+ id: 2
+ basic_rappor {
+ prob_0_becomes_1: 0.1
+ prob_1_stays_1: 0.9
+ int_range_categories: {
+ first: 0
+ last: 23
+ }
+ }
+}
+
+#### ReportConfig (1, 1, 1)
+element {
+ customer_id: 1
+ project_id: 1
+ id: 1
+ name: "Fuchsia Popular URLs"
+ description: "This is a fictional report used for the development of Cobalt."
+ metric_id: 1
+ variable {
+ metric_part: "url"
+ }
+ aggregation_epoch_type: DAY
+ report_delay_days: 1
+}
+
+#### ReportConfig (1, 1, 2)
+element {
+ customer_id: 1
+ project_id: 1
+ id: 2
+ name: "Fuchsia Usage by Hour"
+ description: "This is a fictional report used for the development of Cobalt."
+ metric_id: 2
+ variable {
+ metric_part: "hour"
+ }
+ aggregation_epoch_type: WEEK
+ report_delay_days: 5
+}
+
+*/
+
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "os/exec"
+ "strconv"
+ "testing"
+ "time"
+
+ "analyzer/report_master"
+ "github.com/golang/glog"
+ "report_client"
+)
+
+const (
+ customerId = 1
+ projectId = 1
+
+ urlMetricId = 1
+ hourMetricId = 2
+
+ forculusEncodingConfigId = 1
+ basicRapporEncodingConfigId = 2
+
+ forculusUrlReportConfigId = 1
+ basicRapporHourReportConfigId = 2
+
+ hourMetricPartName = "hour"
+ urlMetricPartName = "url"
+)
+
+var (
+ observationQuerierPath = flag.String("observation_querier_path", "", "The full path to the Observation querier binary.")
+ testAppPath = flag.String("test_app_path", "", "The full path to the Cobalt test app binary.")
+
+ analyzerUri = flag.String("analyzer_uri", "", "The URI of the Analyzer Service.")
+ reportMasterUri = flag.String("report_master_uri", "", "The URI of the Report Master.")
+ shufflerUri = flag.String("shuffler_uri", "", "The URI of the Shuffler.")
+
+ subProcessVerbosity = flag.Int("sub_process_v", 0, "-v verbosity level to pass to sub-processes")
+
+ reportClient *report_client.ReportClient
+)
+
+func init() {
+ flag.Parse()
+
+ reportClient = report_client.NewReportClient(customerId, projectId, *reportMasterUri, false, "")
+}
+
+// A ValuePart represents part of an input to the Cobalt encoder. It specifies
+// that the given integer or string should be encoded using the given
+// EncodingConfig and associated with the given metric part name.
+type ValuePart struct {
+ // The name of the metric part this value is for.
+ partName string
+
+ // The string representation of the value. If the value is of integer
+ // type this should be the representation using strconv.Itoa.
+ repr string
+
+ // The EncodingConfig id.
+ encoding uint32
+}
+
+// String returns a string representation of the ValuePart in the form
+// <partName>:<repr>:<encoding>. This is the form accepted as a flag to
+// the Cobalt test application.
+func (p *ValuePart) String() string {
+ return p.partName + ":" + p.repr + ":" + strconv.Itoa(int(p.encoding))
+}
+
+// FlagString builds a string appropriate to use as a flag value to the Cobalt
+// test application.
+func flagString(values []ValuePart) string {
+ var buffer bytes.Buffer
+ for i := 0; i < len(values); i++ {
+ if i > 0 {
+ buffer.WriteString(",")
+ }
+ buffer.WriteString(values[i].String())
+ }
+ return buffer.String()
+}
+
+// Invokes the "query_observations" command in order to query the ObservationStore
+// to determine the number of Observations contained in the store for the
+// given metric. |maxNum| bounds the query so that the returned value will
+// always be less than or equal to maxNum.
+func getNumObservations(metricId uint32, maxNum uint32) (uint32, error) {
+ cmd := exec.Command(*observationQuerierPath,
+ "-nointeractive",
+ "-for_testing_only_use_bigtable_emulator",
+ "-logtostderr", fmt.Sprintf("-v=%d", *subProcessVerbosity),
+ "-metric", strconv.Itoa(int(metricId)),
+ "-max_num", strconv.Itoa(int(maxNum)))
+ out, err := cmd.Output()
+ if err != nil {
+ stdErrMessage := ""
+ if exitError, ok := err.(*exec.ExitError); ok {
+ stdErrMessage = string(exitError.Stderr)
+ }
+ return 0, fmt.Errorf("Error returned from query_observations process: [%v] %s", err, stdErrMessage)
+ }
+ num, err := strconv.Atoi(string(out))
+ if err != nil {
+ return 0, fmt.Errorf("Unable to parse output of query_observations as an integer: error=[%v] output=[%v]", err, out)
+ }
+ if num < 0 {
+ return 0, fmt.Errorf("Expected non-negative integer received %d", num)
+ }
+ return uint32(num), nil
+}
+
+// Queries the Observation Store for the number of observations for the given
+// metric. If there is an error or the number of observations found is greater
+// then the |expectedNum| returns a non-nil error. If the number of observations
+// found is equal to the |expectedNum| returns nil (indicating success.) Otherwise
+// the number of observations found is less than the |expectedNum|. In this case
+// this function sleeps for one second and then tries again, repeating that for
+// up to 30 attempts and finally returns a non-nil error.
+func waitForObservations(metricId uint32, expectedNum uint32) error {
+ for trial := 0; trial < 30; trial++ {
+ num, err := getNumObservations(metricId, expectedNum+1)
+ if err != nil {
+ return err
+ }
+ if num == expectedNum {
+ return nil
+ }
+ if num > expectedNum {
+ return fmt.Errorf("Expected %d got %d", expectedNum, num)
+ }
+ glog.V(3).Infof("Observation store has %d observations. Waiting for %d...", num, expectedNum)
+ time.Sleep(time.Second)
+ }
+ return fmt.Errorf("After 30 attempts the number of observations was still not the expected number of %d", expectedNum)
+}
+
+// sendObservations uses the cobalt_test_app to encode the given values into observations and send the
+// observations to the Shuffler or the Analyzer.
+func sendObservations(metricId uint32, values []ValuePart, skipShuffler bool, numClients uint) error {
+ cmd := exec.Command(*testAppPath,
+ "-mode", "send-once",
+ "-analyzer_uri", *analyzerUri,
+ "-shuffler_uri", *shufflerUri,
+ "-logtostderr", fmt.Sprintf("-v=%d", *subProcessVerbosity),
+ "-metric", strconv.Itoa(int(metricId)),
+ "-num_clients", strconv.Itoa(int(numClients)),
+ "-values", flagString(values))
+ stdoutStderr, err := cmd.CombinedOutput()
+ message := string(stdoutStderr)
+ if len(message) > 0 {
+ fmt.Printf("%s", stdoutStderr)
+ }
+ return err
+}
+
+// sendForculusUrlObservations sends Observations containing a Forculus encryption of the
+// given |url| to the Shuffler. |numClients| different, independent
+// observations will be sent.
+func sendForculusUrlObservations(url string, numClients uint, t *testing.T) {
+ const skipShuffler = false
+ values := []ValuePart{
+ ValuePart{
+ urlMetricPartName,
+ url,
+ forculusEncodingConfigId,
+ },
+ }
+ if err := sendObservations(urlMetricId, values, skipShuffler, numClients); err != nil {
+ t.Fatalf("url=%s, numClient=%d, err=%v", url, numClients, err)
+ }
+}
+
+// sendBasicRapporHourObservations sends Observations containing a Basic RAPPOR encoding of the
+// given |hour| to the Shuffler. |numClients| different, independent observations
+// will be sent.
+func sendBasicRapporHourObservations(hour int, numClients uint, t *testing.T) {
+ const skipShuffler = false
+ values := []ValuePart{
+ ValuePart{
+ hourMetricPartName,
+ strconv.Itoa(hour),
+ basicRapporEncodingConfigId,
+ },
+ }
+ if err := sendObservations(hourMetricId, values, skipShuffler, numClients); err != nil {
+ t.Fatalf("hour=%d, numClient=%d, err=%v", hour, numClients, err)
+ }
+}
+
+// getReport asks the ReportMaster to start a new report for the given |reportConfigId|
+// that spans all day indices. It then waits for the report generation to complete
+// and returns the Report.
+func getReport(reportConfigId uint32, includeStdErr bool, t *testing.T) *report_master.Report {
+ reportId, err := reportClient.StartCompleteReport(reportConfigId)
+ if err != nil {
+ t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
+ }
+
+ report, err := reportClient.GetReport(reportId, 10*time.Second)
+ if err != nil {
+ t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
+ }
+
+ return report
+}
+
+// getCSVReport asks the ReportMaster to start a new report for the given |reportConfigId|
+// that spans all day indices. It then waits for the report generation to complete
+// and returns the report in CSV format.
+func getCSVReport(reportConfigId uint32, includeStdErr bool, t *testing.T) string {
+ report := getReport(reportConfigId, includeStdErr, t)
+
+ csv, err := report_client.WriteCSVReportToString(report, includeStdErr)
+ if err != nil {
+ t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
+ }
+ return csv
+}
+
+// We run the full Cobalt pipeline using Metric 1, Encoding Config 1 and
+// Report Config 1. This uses Forculus with a threshold of 20 to count
+// URLs.
+func TestForculusEncodingOfUrls(t *testing.T) {
+ // We send some observations to the Shuffler.
+ sendForculusUrlObservations("www.AAAA.com", 18, t)
+ sendForculusUrlObservations("www.BBBB.com", 19, t)
+ sendForculusUrlObservations("www.CCCC.com", 20, t)
+ sendForculusUrlObservations("www.DDDD.com", 21, t)
+
+ // We have not yet sent 100 observations and the Shuffler's threshold is
+ // set to 100 so we except no observations to have been sent to the
+ // Analyzer yet.
+ numObservations, err := getNumObservations(1, 10)
+ if err != nil {
+ t.Fatalf("Error returned from getNumObservations[%v]", err)
+ }
+ if numObservations != 0 {
+ t.Fatalf("Expected no observations in the Observation store yet but got %d", numObservations)
+ }
+
+ // We send additional observations to the Shuffler. This crosses the Shuffler's
+ // threshold and so all observations should now be sent to the Analyzer.
+ sendForculusUrlObservations("www.EEEE.com", 22, t)
+ sendForculusUrlObservations("www.FFFF.com", 23, t)
+
+ // There should now be 123 Observations sent to the Analyzer for metric 1.
+ // We wait for them.
+ if err := waitForObservations(1, 123); err != nil {
+ t.Fatalf("%s", err)
+ }
+
+ // Finally we will run a report. This is the expected output of the report.
+ const expectedCSV = `www.CCCC.com,20.000
+www.DDDD.com,21.000
+www.EEEE.com,22.000
+www.FFFF.com,23.000
+`
+
+ // Generate the report, fetch it as a CSV, check it.
+ csv := getCSVReport(forculusUrlReportConfigId, false, t)
+ if csv != expectedCSV {
+ t.Errorf("Got csv:[%s]", csv)
+ }
+}
+
+// We run the full Cobalt pipeline using Metric 2, Encoding Config 2 and
+// Report Config 2. This uses Basic RAPPOR with integer categories for the
+// 24 hours of the day.
+func TestBasicRapporEncodingOfHours(t *testing.T) {
+ sendBasicRapporHourObservations(8, 501, t)
+ sendBasicRapporHourObservations(9, 1002, t)
+ sendBasicRapporHourObservations(10, 503, t)
+ sendBasicRapporHourObservations(16, 504, t)
+ sendBasicRapporHourObservations(17, 1005, t)
+ sendBasicRapporHourObservations(18, 506, t)
+
+ // There should now be 4021 Observations sent to the Analyzer for metric 2.
+ // We wait for them.
+ if err := waitForObservations(2, 4021); err != nil {
+ t.Fatalf("%s", err)
+ }
+
+ report := getReport(basicRapporHourReportConfigId, true, t)
+ if report.Metadata.State != report_master.ReportState_COMPLETED_SUCCESSFULLY {
+ t.Fatalf("report.Metadata.State=%v", report.Metadata.State)
+ }
+ rows := report_client.ReportToStrings(report, true)
+ if rows == nil {
+ t.Fatalf("rows is nil")
+ }
+ if len(rows) != 24 {
+ t.Fatalf("len(rows)=%d", len(rows))
+ }
+
+ for hour := 0; hour <= 23; hour++ {
+ if len(rows[hour]) != 3 {
+ t.Fatalf("len(rows[hour])=%d", len(rows[hour]))
+ }
+ if rows[hour][0] != strconv.Itoa(hour) {
+ t.Errorf("Expected %s, got %s", strconv.Itoa(hour), rows[hour][0])
+ }
+ val, err := strconv.ParseFloat(rows[hour][1], 32)
+ if err != nil {
+ t.Errorf("Error parsing %s as float: %v", rows[hour][1], err)
+ continue
+ }
+ switch hour {
+ case 8:
+ fallthrough
+ case 10:
+ fallthrough
+ case 16:
+ fallthrough
+ case 18:
+ if val < 10.0 || val > 1000.0 {
+ t.Errorf("For hour %d unexpected val: %v", hour, val)
+ }
+ case 9:
+ fallthrough
+ case 17:
+ if val < 500.0 || val > 2000.0 {
+ t.Errorf("For hour %d unexpected val: %v", hour, val)
+ }
+ default:
+ if val > 100.0 {
+ t.Errorf("Val larger than expected: %v", val)
+ continue
+ }
+ }
+ if rows[hour][2] != "23.779" {
+ t.Errorf("rows[hour][2]=%s", rows[hour][2])
+ }
+ }
+}
diff --git a/end_to_end_tests/src/cobalt_test.go b/end_to_end_tests/src/cobalt_test.go
deleted file mode 100644
index 1db777f..0000000
--- a/end_to_end_tests/src/cobalt_test.go
+++ /dev/null
@@ -1,338 +0,0 @@
-// Copyright 2016 The Fuchsia Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// This test stands up the Bigtable emulator, the analzyer, and uses cgen to
-// generate fake reports. It then verifies the presence of the reports in
-// Bigtable.
-
-package main
-
-import (
- "bufio"
- "bytes"
- "errors"
- "log"
- "net"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "syscall"
- "testing"
- "time"
-
- "cloud.google.com/go/bigtable"
- "golang.org/x/net/context"
- "google.golang.org/api/option"
- "google.golang.org/grpc"
-)
-
-// This fixture stands up the bigtable emulator.
-type BigtableFixture struct {
- cmd *exec.Cmd
- host string
- project string
- instance string
- table string
-}
-
-func NewBigtableFixture() (*BigtableFixture, error) {
- f := new(BigtableFixture)
- // These strings must coincide with the ones in bigtable_emulator_helper.h
- f.project = "TestProject"
- f.instance = "TestInstance"
- // This name must coincide with the one in bigtable_names.h
- f.table = "observations"
-
- sysRootDir, _ := filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "../../sysroot"))
- bin := filepath.Join(sysRootDir, "gcloud", "google-cloud-sdk", "platform", "bigtable-emulator", "cbtemulator")
- f.cmd = exec.Command(bin)
-
- stdout, _ := f.cmd.StdoutPipe()
- reader := bufio.NewReader(stdout)
-
- // Create a process group so we can kill children
- f.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
- log.Printf("Starting Bigtable Emulator: %v", bin)
- f.cmd.Start()
-
- // Wait for bigtable to start
- for {
- line, err := reader.ReadString('\n')
- if err != nil {
- return nil, err
- }
-
- if strings.Contains(line, "running") {
- fields := strings.Fields(line)
- f.host = fields[len(fields)-1]
- break
- }
- }
- log.Printf("Bigtable Emulator started with host: %v", f.host)
-
- return f, nil
-}
-
-func (f *BigtableFixture) Close() {
- // Kill process group
- pgid, _ := syscall.Getpgid(f.cmd.Process.Pid)
- syscall.Kill(-pgid, syscall.SIGTERM)
-
- f.cmd.Wait()
-}
-
-func (f *BigtableFixture) CountRows() int {
- conn, err := grpc.Dial(f.host, grpc.WithInsecure())
- if err != nil {
- log.Printf("Error while attempting to connect to the Bigtable Emulator: %v", err)
- return -1
- }
-
- ctx := context.Background()
- client, err := bigtable.NewClient(ctx, f.project, f.instance, option.WithGRPCConn(conn))
- if err != nil {
- log.Printf("Error while attempting to create a Bigtable Client: %v", err)
- return -1
- }
- defer client.Close()
- tbl := client.Open(f.table)
-
- count := 0
- err = tbl.ReadRows(ctx, bigtable.InfiniteRange(""),
- func(row bigtable.Row) bool {
- count++
- return true
- })
- if err != nil {
- log.Printf("Error while attempting to count rows in Bigtable: %v", err)
- return -1
- }
-
- return count
-}
-
-// This fixture stands up the Analyzer. It depends on Bigtable being up.
-type AnalyzerFixture struct {
- cmd *exec.Cmd
- cgen string
- outdir string
- bigtable *BigtableFixture
-}
-
-func DaemonStarted(dst string) bool {
- for i := 0; i < 10; i++ {
- conn, err := net.Dial("tcp", dst)
- if err == nil {
- conn.Close()
- return true
- }
- time.Sleep(10 * time.Millisecond)
- }
-
- return false
-}
-
-func NewAnalyzerFixture() (*AnalyzerFixture, error) {
- // Create Bigtable first
- bigtable, err := NewBigtableFixture()
- if err != nil {
- return nil, err
- }
-
- // Create the Analyzer
- f := new(AnalyzerFixture)
- f.bigtable = bigtable
-
- f.outdir, _ = filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "../../out"))
- abin := filepath.Join(f.outdir, "analyzer", "analyzer")
-
- configDir, _ := filepath.Abs(filepath.Join(f.outdir, "..", "config", "registered"))
- f.cmd = exec.Command(abin, "--port=8080", "-for_testing_only_use_bigtable_emulator",
- "--cobalt_config_dir", configDir, "-logtostderr")
- var out bytes.Buffer
- f.cmd.Stdout = &out
- var serr bytes.Buffer
- f.cmd.Stderr = &serr
-
- log.Printf("Starting Analyzer: %v", strings.Join(f.cmd.Args, " "))
- err = f.cmd.Start()
- if err != nil {
- log.Printf("Command finished with error:[%v] with stdout:[%s] and stderr:[%s]", err, out.String(), serr.String())
- log.Fatal(err)
- }
-
- // Wait for it to start
- if !DaemonStarted("127.0.0.1:8080") {
- f.Close()
- return nil, errors.New("Unable to start the analyzer")
- }
-
- // get path to cgen
- f.cgen = filepath.Join(f.outdir, "/tools/cgen")
-
- return f, nil
-}
-
-func (f *AnalyzerFixture) Close() {
- f.cmd.Process.Kill()
- f.cmd.Wait()
- f.bigtable.Close()
-}
-
-// Fixture to start the Shuffler. It depends on the Analyzer fixture, which in
-// turn depends on the Bigtable fixture. The Shuffler fixture will start the
-// entire backend system.
-type ShufflerFixture struct {
- cmd *exec.Cmd
- analyzer *AnalyzerFixture
- outdir string
-}
-
-func NewShufflerFixture() (*ShufflerFixture, error) {
- // Create Analyzer first
- analyzer, err := NewAnalyzerFixture()
- if err != nil {
- return nil, err
- }
-
- // Create the Shuffler
- f := new(ShufflerFixture)
- f.analyzer = analyzer
- f.outdir = filepath.Join(filepath.Dir(os.Args[0]), "../../out")
-
- bin, _ := filepath.Abs(filepath.Join(f.analyzer.outdir, "shuffler", "shuffler"))
-
- shufflerTestConfig, _ := filepath.Abs(filepath.Join(f.outdir, "config", "shuffler_default.conf"))
- f.cmd = exec.Command(bin,
- "-config_file", shufflerTestConfig,
- "-batch_size", strconv.Itoa(100),
- "-vmodule=receiver=2,dispatcher=2,store=2",
- "-logtostderr")
-
- var out bytes.Buffer
- f.cmd.Stdout = &out
- var serr bytes.Buffer
- f.cmd.Stderr = &serr
-
- log.Printf("Starting Shuffler: %v", strings.Join(f.cmd.Args, " "))
- err = f.cmd.Start()
- if err != nil {
- log.Printf("Command finished with error:[%v] with stdout:[%s] and stderr:[%s]", err, out.String(), serr.String())
- log.Fatal(err)
- }
-
- if !DaemonStarted("127.0.0.1:50051") {
- f.Close()
- return nil, errors.New("Unable to start the shuffler")
- }
-
- return f, nil
-}
-
-func (f *ShufflerFixture) Close() {
- f.cmd.Process.Kill()
- f.cmd.Wait()
- f.analyzer.Close()
-}
-
-// This test depends on the AnalyzerFixture.
-//
-// It uses cgen to create 2 fake reports.
-// It then asserts that 2 reports exist in Bigtable.
-func OTestAnalyzerAddObservations(t *testing.T) {
- // Start the Analyzer and Bigtable emulator
- f, err := NewAnalyzerFixture()
- if err != nil {
- t.Error("Fixture failed:", err)
- return
- }
- defer f.Close()
-
- // Run cgen on the analyzer to create 2 observations
- num := 2
- cmd := exec.Command(f.cgen,
- "-analyzer_uri", "localhost:8080",
- "-num_observations", strconv.Itoa(num))
- if cmd.Run() != nil {
- t.Error("cgen failed")
- return
- }
-
- // Grab the observations from bigtable
- count := f.bigtable.CountRows()
- if count == -1 {
- t.Error("Can't read rows")
- return
- }
-
- if count != num {
- t.Errorf("Unexpected number of rows got %v want %v", count, num)
- return
- }
-}
-
-// This test depends on the ShufflerFixture.
-//
-// It uses cgen to create 2 fake reports.
-// It then asserts that 2 reports exist in Bigtable.
-func TestShufflerProcess(t *testing.T) {
- // Start the entire system.
- f, err := NewShufflerFixture()
- if err != nil {
- t.Error("Fixture failed:", err)
- return
- }
- defer f.Close()
-
- // Run cgen on the analyzer to create 2 observations
- num := 2
- cmd := exec.Command(f.analyzer.cgen,
- "-shuffler_uri", "localhost:50051",
- "-analyzer_uri", "localhost:8080",
- "-num_observations", strconv.Itoa(num),
- "-num_rpcs", strconv.Itoa(num))
-
- log.Printf("Running cgen: %v", strings.Join(cmd.Args, " "))
- if cmd.Run() != nil {
- t.Error("cgen failed")
- return
- }
- log.Printf("cgen completed")
-
- var rows int
-
- // The shuffler RPC is async so it could take a while before the data
- // reaches bigtable. Try multiple times.
- for i := 0; i < 5; i++ {
- rows = f.analyzer.bigtable.CountRows()
- if rows == -1 {
- t.Error("Can't read rows")
- return
- }
-
- if rows == num {
- break
- }
-
- time.Sleep(10 * time.Millisecond)
- }
-
- if rows != num {
- t.Errorf("Unexpected number of rows got %v want %v", rows, num)
- return
- }
-
-}
diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt
index 0b5a46a..0f54e18 100644
--- a/tools/CMakeLists.txt
+++ b/tools/CMakeLists.txt
@@ -12,14 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-add_executable(cgen cgen.cc)
-
-target_link_libraries(cgen
- analyzer_service_lib
- shuffler_grpc_client
- forculus_encrypter
- gflags)
-
add_subdirectory(go/src)
add_subdirectory(observation_querier)
add_subdirectory(test_app)
diff --git a/tools/cgen.cc b/tools/cgen.cc
deleted file mode 100644
index d80a89e..0000000
--- a/tools/cgen.cc
+++ /dev/null
@@ -1,352 +0,0 @@
-// Copyright 2016 The Fuchsia Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Cobalt traffic generator.
-//
-// This is a test and debug client to send data to cobalt components
-
-#include <assert.h>
-#include <err.h>
-#include <libgen.h>
-#include <limits.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <sys/time.h>
-#include <unistd.h>
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-
-#include "./observation.pb.h"
-#include "algorithms/forculus/forculus_encrypter.h"
-#include "analyzer/analyzer_service/analyzer_service.h"
-#include "config/encoding_config.h"
-#include "config/encodings.pb.h"
-#include "shuffler/shuffler.grpc.pb.h"
-
-using cobalt::Envelope;
-using cobalt::ObservationBatch;
-using cobalt::analyzer::Analyzer;
-using cobalt::config::EncodingRegistry;
-using cobalt::encoder::ClientSecret;
-using cobalt::forculus::ForculusEncrypter;
-using cobalt::shuffler::Shuffler;
-using grpc::Channel;
-using grpc::ClientContext;
-using grpc::Status;
-using google::protobuf::Empty;
-
-namespace cobalt {
-namespace cgen {
-
-DEFINE_string(analyzer_uri, "", "The URI of the Analyzer");
-DEFINE_string(shuffler_uri, "", "The URI of the Shuffler");
-DEFINE_int32(num_rpcs, 1, "Number of RPCs to send");
-DEFINE_int32(num_observations, 1, "Number of Observations to generate");
-DEFINE_uint32(customer, 1, "Customer ID");
-DEFINE_uint32(project, 1, "Project ID");
-DEFINE_uint32(metric, 1, "Metric ID");
-DEFINE_uint32(encoding, 1, "Encoding ID");
-DEFINE_string(registry, "", "Registry path for registered_encodings.txt etc.");
-DEFINE_string(part, "city", "Observation part name");
-DEFINE_string(payload, "hello", "Observation payload");
-
-// Measures time between start and stop. Useful for benchmarking.
-class Timer {
- public:
- Timer() : start_(0), stop_(0) {}
-
- void start() { start_ = now(); }
-
- void stop() { stop_ = now(); }
-
- // returns time in microseconds
- uint64_t now() {
- struct timeval tv;
-
- gettimeofday(&tv, NULL);
-
- return tv.tv_sec * 1000UL * 1000UL + tv.tv_usec;
- }
-
- uint64_t elapsed() {
- assert(start_ && stop_);
- assert(stop_ >= start_);
-
- return stop_ - start_;
- }
-
- private:
- uint64_t start_;
- uint64_t stop_;
-}; // class Timer
-
-// Encapsulates Observations.
-struct GenObservation {
- Observation observation;
- EncryptedMessage encrypted;
- ObservationMetadata metadata;
-};
-
-// Generates observations and RPCs to Cobalt components
-class CGen {
- public:
- void setup(int argc, char* argv[]) {
- google::SetUsageMessage("Cobalt gRPC generator");
- google::ParseCommandLineFlags(&argc, &argv, true);
-
- customer_id_ = FLAGS_customer;
- project_id_ = FLAGS_project;
- metric_id_ = FLAGS_metric;
- part_name_ = FLAGS_part;
-
- std::string registry_path = FLAGS_registry;
-
- // If no path is given, try to deduce it from the cgen location.
- if (registry_path == "") {
- char path[PATH_MAX], path2[PATH_MAX];
-
- // Get the directory of cgen.
- if (!realpath(argv[0], path)) LOG(FATAL) << "realpath(): " << argv[0];
-
- char* dir = dirname(path);
-
- // Set the relative path to the registry.
- snprintf(path2, sizeof(path2), "%s/../../config/registered", dir);
-
- // Get the absolute path to the registry.
- if (!realpath(path2, path)) LOG(FATAL) << "realpath(): " << path2;
-
- registry_path = path;
- }
-
- load_registries(registry_path);
- }
-
- void load_registries(const std::string& path) {
- char fname[PATH_MAX];
-
- snprintf(fname, sizeof(fname), "%s/registered_encodings.txt", path.c_str());
-
- auto encodings = EncodingRegistry::FromFile(fname, nullptr);
- if (encodings.second != config::kOK)
- LOG(FATAL) << "Can't load encodings configuration";
-
- encodings_ = std::move(encodings.first);
- }
-
- void start() {
- generate_observations();
-
- if (!FLAGS_shuffler_uri.empty() && !FLAGS_analyzer_uri.empty()) {
- send_shuffler();
- } else if (!FLAGS_analyzer_uri.empty()) {
- send_analyzer();
- }
- }
-
- private:
- // Creates a bunch of fake observations that can be sent to shufflers or
- // analyzers.
- void generate_observations() {
- // Metadata setup.
- ObservationMetadata metadata;
-
- metadata.set_customer_id(customer_id_);
- metadata.set_project_id(project_id_);
- metadata.set_metric_id(metric_id_);
- metadata.set_day_index(4);
-
- // encode the observation.
- const EncodingConfig* const enc =
- encodings_->Get(customer_id_, project_id_, FLAGS_encoding);
-
- if (!enc) LOG(FATAL) << "Unkown encoding: " << FLAGS_encoding;
-
- // TODO(bittau): add support for algorithms other than forculus.
- if (!enc->has_forculus()) LOG(FATAL) << "Unsupported encoding";
-
- ForculusConfig config;
- ClientSecret client_secret = ClientSecret::GenerateNewSecret();
-
- config.set_threshold(enc->forculus().threshold());
-
- ForculusEncrypter forculus(config, customer_id_, project_id_, metric_id_,
- part_name_, client_secret);
-
- for (int i = 0; i < FLAGS_num_observations; i++) {
- Observation obs;
- ObservationPart part;
-
- part.set_encoding_config_id(FLAGS_encoding);
-
- ForculusObservation* forc_obs = part.mutable_forculus();
- uint32_t day_index = 0;
-
- if (forculus.Encrypt(FLAGS_payload, day_index, forc_obs) !=
- forculus::ForculusEncrypter::kOK) {
- LOG(FATAL) << "Forculus encryption failed";
- }
-
- // TODO(bittau): need to specify what key-value to use for
- // single-dimension metrics. Using DEFAULT for now.
- (*obs.mutable_parts())[part_name_] = part;
-
- // Encrypt the observation.
- std::string cleartext, encrypted;
-
- obs.SerializeToString(&cleartext);
- encrypt(cleartext, &encrypted);
-
- EncryptedMessage em;
-
- em.set_ciphertext(encrypted);
-
- // Add this to the list of fake observations.
- GenObservation o;
- o.observation = obs;
- o.encrypted = em;
- o.metadata = metadata;
-
- observations_.push_back(o);
- }
- }
-
- // Send observations to the analyzer.
- void send_analyzer() {
- CHECK(!FLAGS_analyzer_uri.empty());
- std::shared_ptr<Channel> chan = grpc::CreateChannel(
- FLAGS_analyzer_uri, grpc::InsecureChannelCredentials());
-
- std::unique_ptr<Analyzer::Stub> analyzer(Analyzer::NewStub(chan));
-
- // Generate the observation batch.
- ObservationBatch req;
- ObservationMetadata* metadata = req.mutable_meta_data();
-
- for (const GenObservation& observation : observations_) {
- // Assume all observations have the same metadata.
- *metadata = observation.metadata;
-
- EncryptedMessage* msg = req.add_encrypted_observation();
- *msg = observation.encrypted;
- }
-
- // send RPCs
- Timer t;
- t.start();
-
- for (int i = 0; i < FLAGS_num_rpcs; i++) {
- ClientContext context;
- Empty resp;
-
- Status status = analyzer->AddObservations(&context, req, &resp);
-
- if (!status.ok())
- errx(1, "error sending RPC: %s", status.error_message().c_str());
- }
-
- t.stop();
-
- printf("Took %lu ms for %d requests\n", t.elapsed() / 1000UL,
- FLAGS_num_rpcs);
- }
-
- void send_shuffler() {
- CHECK(!FLAGS_shuffler_uri.empty());
-
- std::shared_ptr<Channel> chan = grpc::CreateChannel(
- FLAGS_shuffler_uri, grpc::InsecureChannelCredentials());
-
- std::unique_ptr<Shuffler::Stub> shuffler(Shuffler::NewStub(chan));
-
- // Build the messages to send to the shuffler.
- std::vector<EncryptedMessage> messages;
-
- for (GenObservation& observation : observations_) {
- // TODO(rudominer) Use the fact that an Envelope can hold
- // multpile ObservationBatches and an ObservationBatch can hold
- // multiple observations. For now we are using an Envelope per
- // Observation.
- Envelope envelope;
- auto* observation_batch = envelope.add_batch();
- observation_batch->set_allocated_meta_data(
- new ObservationMetadata(observation.metadata));
-
- auto* encrypted_observation =
- observation_batch->add_encrypted_observation();
- encrypted_observation->Swap(&observation.encrypted);
-
- // Encrypt the envelope.
- std::string cleartext, encrypted;
-
- envelope.SerializeToString(&cleartext);
- encrypt(cleartext, &encrypted);
-
- EncryptedMessage em;
- em.set_ciphertext(encrypted);
-
- messages.push_back(em);
- }
-
- // send RPCs.
- Timer t;
- t.start();
-
- auto msg_iter = messages.begin();
-
- if (msg_iter == messages.end()) LOG(FATAL) << "Need messags";
-
- for (int i = 0; i < FLAGS_num_rpcs; i++) {
- ClientContext context;
- Empty resp;
-
- Status status = shuffler->Process(&context, *msg_iter, &resp);
-
- if (!status.ok())
- errx(1, "error sending RPC: %s", status.error_message().c_str());
-
- if (++msg_iter == messages.end()) msg_iter = messages.begin();
- }
-
- t.stop();
-
- printf("Took %lu ms for %d requests\n", t.elapsed() / 1000UL,
- FLAGS_num_rpcs);
- }
-
- void encrypt(const std::string& in, std::string* out) {
- // TODO(pseudorandom): please implement
- *out = in;
- }
-
- int customer_id_;
- int project_id_;
- int metric_id_;
- std::string part_name_;
- std::vector<GenObservation> observations_;
- std::unique_ptr<EncodingRegistry> encodings_;
-}; // class CGen
-
-} // namespace cgen
-} // namespace cobalt
-
-int main(int argc, char* argv[]) {
- cobalt::cgen::CGen cgen;
-
- cgen.setup(argc, argv);
- cgen.start();
-
- exit(0);
-}
diff --git a/tools/go/src/report_client/report_client.go b/tools/go/src/report_client/report_client.go
index a19f176..ff9a387 100644
--- a/tools/go/src/report_client/report_client.go
+++ b/tools/go/src/report_client/report_client.go
@@ -19,6 +19,7 @@
package report_client
import (
+ "bytes"
"encoding/csv"
"fmt"
"io"
@@ -322,15 +323,25 @@
}
}
+// ReportRowsSortedByValues returns a sorted slice of ReportRows.
+// The rows of are sorted in increasing order of their values.
+// It is possible for nil to be returned if there are not ReportRows.
+func ReportRowsSortedByValues(report *report_master.Report, includeStdErr bool) []*report_master.ReportRow {
+ rows := report.GetRows().GetRows()
+ if rows != nil {
+ sort.Sort(ByValues(rows))
+ }
+ return rows
+}
+
// ReportToStrings returns a sorted list of human-readable report rows.
// Each element of the returned list represents a row of the report.
// The rows of are sorted in increasing order of their values.
// Each row is itself a list of strings as specified by ReportRowToStrings.
func ReportToStrings(report *report_master.Report, includeStdErr bool) [][]string {
result := [][]string{}
- rows := report.GetRows().GetRows()
+ rows := ReportRowsSortedByValues(report, includeStdErr)
if rows != nil {
- sort.Sort(ByValues(rows))
for _, row := range rows {
result = append(result, ReportRowToStrings(row, includeStdErr))
}
@@ -354,3 +365,15 @@
csvWriter.Flush()
return nil
}
+
+// WriteCSVReportToString writes a comma-separated values representation of the
+// given |report| and returns it as a string. See comments at WriteCSVReport
+// for more details.
+func WriteCSVReportToString(report *report_master.Report, includeStdErr bool) (csv string, err error) {
+ var buffer bytes.Buffer
+ if err = WriteCSVReport(&buffer, report, includeStdErr); err != nil {
+ return
+ }
+ csv = buffer.String()
+ return
+}