New version of end-to-end test.

We delete cgen as it is no longer needed.

We introduce a new version of the end-to-end test that
uses the same tools as are used manually when
running the demo.

We start running this new end-to-end test by default as part of
cobaltb.py test.

Change-Id: Ifc8783eb580caf87ed5fb45cf174d462cfcd115a
diff --git a/cobaltb.py b/cobaltb.py
index 257ea8d..69f1cd4 100755
--- a/cobaltb.py
+++ b/cobaltb.py
@@ -157,6 +157,16 @@
           "--bigtable_project_name=%s" % args.bigtable_project_name,
           "--bigtable_instance_name=%s" % args.bigtable_instance_name
       ]
+    if (test_dir == 'e2e_tests'):
+      test_args = [
+          "-analyzer_uri=localhost:%d" % DEFAULT_ANALYZER_SERVICE_PORT,
+          "-shuffler_uri=localhost:%d" % DEFAULT_SHUFFLER_PORT,
+          "-report_master_uri=localhost:%d" % DEFAULT_REPORT_MASTER_PORT,
+          ("-observation_querier_path=%s" %
+              process_starter.OBSERVATION_QUERIER_PATH),
+          "-test_app_path=%s" % process_starter.TEST_APP_PATH,
+          "-sub_process_v=%d"%_verbose_count
+      ]
     print '********************************************************'
     success = (test_runner.run_all_tests(
         test_dir, start_bt_emulator=start_bt_emulator,
@@ -361,8 +371,8 @@
     help='Runs Cobalt tests. You must build first.')
   sub_parser.set_defaults(func=_test)
   sub_parser.add_argument('--tests', choices=TEST_FILTERS,
-      help='Specify a subset of tests to run. Default=noe2e',
-      default='noe2e')
+      help='Specify a subset of tests to run. Default=all',
+      default='all')
   sub_parser.add_argument('--bigtable_project_name',
       help='Specify a Cloud project against which to run the cloud_bt tests.'
       ' Optional. Default=google.com:shuffler-test.'
diff --git a/config/registered/registered_reports.txt b/config/registered/registered_reports.txt
index be1c16e..9006643 100644
--- a/config/registered/registered_reports.txt
+++ b/config/registered/registered_reports.txt
@@ -39,7 +39,7 @@
   customer_id: 1
   project_id: 1
   id: 2
-  name: "Fuschsia Usage by Hour"
+  name: "Fuchsia Usage by Hour"
   description: "This is a fictional report used for the development of Cobalt."
   metric_id: 2
   variable {
@@ -54,7 +54,7 @@
   customer_id: 1
   project_id: 1
   id: 3
-  name: "Fuschsia Fruit Consumption and Rating Joint Analysis"
+  name: "Fuchsia Fruit Consumption and Rating Joint Analysis"
   description: "This is a fictional report used for the development of Cobalt."
   metric_id: 3
   variable {
diff --git a/end_to_end_tests/CMakeLists.txt b/end_to_end_tests/CMakeLists.txt
index db7e838..b96bf71 100644
--- a/end_to_end_tests/CMakeLists.txt
+++ b/end_to_end_tests/CMakeLists.txt
@@ -12,30 +12,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+set(REPORT_MASTER_PB_GO "${CMAKE_BINARY_DIR}/go-proto-gen/src/analyzer/report_master/report_master.pb.go")
+set_source_files_properties(${REPORT_MASTER_PB_GO} PROPERTIES GENERATED TRUE)
+
 # Build end-to-end tests
-set(E2E_TESTS_SRC "${CMAKE_CURRENT_SOURCE_DIR}/src")
-set(COBALT_TEST_SRC "${E2E_TESTS_SRC}/cobalt_test.go")
-set(COBALT_TEST_BIN ${DIR_END_TO_END_TESTS}/cobalt_test)
+set(REPORT_CLIENT_SRC "${CMAKE_SOURCE_DIR}/tools/go/src/report_client/report_client.go")
+set(COBALT_E2E_TEST_SRC "${CMAKE_CURRENT_SOURCE_DIR}/src/cobalt_e2e_test.go")
+set(COBALT_E2E_TEST_BINARY ${DIR_END_TO_END_TESTS}/cobalt_e2e_test)
 
-set(COBALT_SHUFFLER_CONF_SRC
-    "${CMAKE_CURRENT_SOURCE_DIR}/config/shuffler_default.conf")
-set(COBALT_TEST_CONF_BIN ${CMAKE_BINARY_DIR}/config/shuffler_default.conf)
-
-add_custom_command(OUTPUT ${COBALT_TEST_CONF_BIN}
-    COMMAND cp ${COBALT_SHUFFLER_CONF_SRC} ${CMAKE_BINARY_DIR}/config
-    DEPENDS ${COBALT_SHUFFLER_CONF_SRC}
+add_custom_command(OUTPUT ${COBALT_E2E_TEST_BINARY}
+    COMMAND ${GO_BIN} test -c -o ${COBALT_E2E_TEST_BINARY}
+            ${COBALT_E2E_TEST_SRC}
+    DEPENDS ${COBALT_E2E_TEST_SRC}
+    DEPENDS ${REPORT_CLIENT_SRC}
+    DEPENDS ${REPORT_MASTER_PB_GO}
     WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
 )
 
-add_custom_command(OUTPUT ${COBALT_TEST_BIN}
-    COMMAND ${GO_BIN} test -c -o ${COBALT_TEST_BIN}
-            ${COBALT_TEST_SRC}
-    DEPENDS ${COBALT_TEST_SRC}
-    DEPENDS ${COBALT_SHUFFLER_CONF_SRC}
-    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
-)
-
-add_custom_target(test_all ALL
-      DEPENDS ${COBALT_TEST_BIN}
-      DEPENDS ${COBALT_TEST_CONF_BIN}
+add_custom_target(build_e2e_tests ALL
+      DEPENDS ${COBALT_E2E_TEST_BINARY}
 )
diff --git a/end_to_end_tests/src/cobalt_e2e_test.go b/end_to_end_tests/src/cobalt_e2e_test.go
new file mode 100644
index 0000000..3ab38d9
--- /dev/null
+++ b/end_to_end_tests/src/cobalt_e2e_test.go
@@ -0,0 +1,467 @@
+// Copyright 2017 The Fuchsia Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+An end-to-end test of Cobalt. This test assumes the existence of a running
+Cobalt system. The URIs of the Shuffler, the Analyzer Service and the
+ReportMaster are passed in to the test as flags. Usually this test is
+invoked via the command "cobaltb.py test --tests=e2e" which invokes
+"tools/test_runner.py" which uses "tools/process_starter.py" to start the
+Cobalt processes prior to invoking this test.
+
+The test uses the |cobalt_test_app| program in order to encode values into
+observations and send those observations to the Shuffler. It then uses
+the |query_observations| tool to query the Observation Store and it waits
+until all of the observations have arrived at the Observation Store. It
+then uses the report_client to contact the ReportMaster and request
+that a report be generated and wait for the report to be generated. It then
+validates the report.
+
+The test assumes particular contents of the Cobalt registration system.
+By default "tools/process_starter.py" will cause the Cobalt processes to
+use the configuration files located in <source root>/config/registered. Thus
+this test must be kept in sync with the contents of those files. Here
+we include a copy of the relevant parts of  those files for reference:
+
+#### Metric (1, 1, 1)
+element {
+  customer_id: 1
+  project_id: 1
+  id: 1
+  name: "Fuchsia Popular URLs"
+  description: "This is a fictional metric used for the development of Cobalt."
+  time_zone_policy: LOCAL
+  parts {
+    key: "url"
+    value {
+      description: "A URL."
+      data_type: STRING
+    }
+  }
+}
+
+#### Metric (1, 1, 2)
+element {
+  customer_id: 1
+  project_id: 1
+  id: 2
+  name: "Fuchsia Usage by Hour"
+  description: "This is a fictional metric used for the development of Cobalt."
+  time_zone_policy: LOCAL
+  parts {
+    key: "hour"
+    value {
+      description: "An integer from 0 to 23 representing the hour of the day."
+      data_type: INT
+    }
+  }
+}
+
+#### Encoding (1, 1, 1)
+element {
+  customer_id: 1
+  project_id: 1
+  id: 1
+  forculus {
+    threshold: 20
+    epoch_type: DAY
+  }
+}
+
+#### Encoding (1, 1, 2)
+element {
+  customer_id: 1
+  project_id: 1
+  id: 2
+  basic_rappor {
+    prob_0_becomes_1: 0.1
+    prob_1_stays_1: 0.9
+    int_range_categories: {
+      first: 0
+      last:  23
+    }
+  }
+}
+
+#### ReportConfig (1, 1, 1)
+element {
+  customer_id: 1
+  project_id: 1
+  id: 1
+  name: "Fuchsia Popular URLs"
+  description: "This is a fictional report used for the development of Cobalt."
+  metric_id: 1
+  variable {
+    metric_part: "url"
+  }
+  aggregation_epoch_type: DAY
+  report_delay_days: 1
+}
+
+#### ReportConfig (1, 1, 2)
+element {
+  customer_id: 1
+  project_id: 1
+  id: 2
+  name: "Fuchsia Usage by Hour"
+  description: "This is a fictional report used for the development of Cobalt."
+  metric_id: 2
+  variable {
+    metric_part: "hour"
+  }
+  aggregation_epoch_type: WEEK
+  report_delay_days: 5
+}
+
+*/
+
+package main
+
+import (
+	"bytes"
+	"flag"
+	"fmt"
+	"os/exec"
+	"strconv"
+	"testing"
+	"time"
+
+	"analyzer/report_master"
+	"github.com/golang/glog"
+	"report_client"
+)
+
+const (
+	customerId = 1
+	projectId  = 1
+
+	urlMetricId  = 1
+	hourMetricId = 2
+
+	forculusEncodingConfigId    = 1
+	basicRapporEncodingConfigId = 2
+
+	forculusUrlReportConfigId     = 1
+	basicRapporHourReportConfigId = 2
+
+	hourMetricPartName = "hour"
+	urlMetricPartName  = "url"
+)
+
+var (
+	observationQuerierPath = flag.String("observation_querier_path", "", "The full path to the Observation querier binary.")
+	testAppPath            = flag.String("test_app_path", "", "The full path to the Cobalt test app binary.")
+
+	analyzerUri     = flag.String("analyzer_uri", "", "The URI of the Analyzer Service.")
+	reportMasterUri = flag.String("report_master_uri", "", "The URI of the Report Master.")
+	shufflerUri     = flag.String("shuffler_uri", "", "The URI of the Shuffler.")
+
+	subProcessVerbosity = flag.Int("sub_process_v", 0, "-v verbosity level to pass to sub-processes")
+
+	reportClient *report_client.ReportClient
+)
+
+func init() {
+	flag.Parse()
+
+	reportClient = report_client.NewReportClient(customerId, projectId, *reportMasterUri, false, "")
+}
+
+// A ValuePart represents part of an input to the Cobalt encoder. It specifies
+// that the given integer or string should be encoded using the given
+// EncodingConfig and associated with the given metric part name.
+type ValuePart struct {
+	// The name of the metric part this value is for.
+	partName string
+
+	// The string representation of the value. If the value is of integer
+	// type this should be the representation using strconv.Itoa.
+	repr string
+
+	// The EncodingConfig id.
+	encoding uint32
+}
+
+// String returns a string representation of the ValuePart in the form
+// <partName>:<repr>:<encoding>. This is the form accepted as a flag to
+// the Cobalt test application.
+func (p *ValuePart) String() string {
+	return p.partName + ":" + p.repr + ":" + strconv.Itoa(int(p.encoding))
+}
+
+// FlagString builds a string appropriate to use as a flag value to the Cobalt
+// test application.
+func flagString(values []ValuePart) string {
+	var buffer bytes.Buffer
+	for i := 0; i < len(values); i++ {
+		if i > 0 {
+			buffer.WriteString(",")
+		}
+		buffer.WriteString(values[i].String())
+	}
+	return buffer.String()
+}
+
+// Invokes the "query_observations" command in order to query the ObservationStore
+// to determine the number of Observations contained in the store for the
+// given metric. |maxNum| bounds the query so that the returned value will
+// always be less than or equal to maxNum.
+func getNumObservations(metricId uint32, maxNum uint32) (uint32, error) {
+	cmd := exec.Command(*observationQuerierPath,
+		"-nointeractive",
+		"-for_testing_only_use_bigtable_emulator",
+		"-logtostderr", fmt.Sprintf("-v=%d", *subProcessVerbosity),
+		"-metric", strconv.Itoa(int(metricId)),
+		"-max_num", strconv.Itoa(int(maxNum)))
+	out, err := cmd.Output()
+	if err != nil {
+		stdErrMessage := ""
+		if exitError, ok := err.(*exec.ExitError); ok {
+			stdErrMessage = string(exitError.Stderr)
+		}
+		return 0, fmt.Errorf("Error returned from query_observations process: [%v] %s", err, stdErrMessage)
+	}
+	num, err := strconv.Atoi(string(out))
+	if err != nil {
+		return 0, fmt.Errorf("Unable to parse output of query_observations as an integer: error=[%v] output=[%v]", err, out)
+	}
+	if num < 0 {
+		return 0, fmt.Errorf("Expected non-negative integer received %d", num)
+	}
+	return uint32(num), nil
+}
+
+// Queries the Observation Store for the number of observations for the given
+// metric. If there is an error or the number of observations found is greater
+// then the |expectedNum| returns a non-nil error. If the number of observations
+// found is equal to the |expectedNum| returns nil (indicating success.) Otherwise
+// the number of observations found is less than the |expectedNum|. In this case
+// this function sleeps for one second and then tries again, repeating that for
+// up to 30 attempts and finally returns a non-nil error.
+func waitForObservations(metricId uint32, expectedNum uint32) error {
+	for trial := 0; trial < 30; trial++ {
+		num, err := getNumObservations(metricId, expectedNum+1)
+		if err != nil {
+			return err
+		}
+		if num == expectedNum {
+			return nil
+		}
+		if num > expectedNum {
+			return fmt.Errorf("Expected %d got %d", expectedNum, num)
+		}
+		glog.V(3).Infof("Observation store has %d observations. Waiting for %d...", num, expectedNum)
+		time.Sleep(time.Second)
+	}
+	return fmt.Errorf("After 30 attempts the number of observations was still not the expected number of %d", expectedNum)
+}
+
+// sendObservations uses the cobalt_test_app to encode the given values into observations and send the
+// observations to the Shuffler or the Analyzer.
+func sendObservations(metricId uint32, values []ValuePart, skipShuffler bool, numClients uint) error {
+	cmd := exec.Command(*testAppPath,
+		"-mode", "send-once",
+		"-analyzer_uri", *analyzerUri,
+		"-shuffler_uri", *shufflerUri,
+		"-logtostderr", fmt.Sprintf("-v=%d", *subProcessVerbosity),
+		"-metric", strconv.Itoa(int(metricId)),
+		"-num_clients", strconv.Itoa(int(numClients)),
+		"-values", flagString(values))
+	stdoutStderr, err := cmd.CombinedOutput()
+	message := string(stdoutStderr)
+	if len(message) > 0 {
+		fmt.Printf("%s", stdoutStderr)
+	}
+	return err
+}
+
+// sendForculusUrlObservations sends Observations containing a Forculus encryption of the
+// given |url| to the Shuffler. |numClients| different, independent
+// observations will be sent.
+func sendForculusUrlObservations(url string, numClients uint, t *testing.T) {
+	const skipShuffler = false
+	values := []ValuePart{
+		ValuePart{
+			urlMetricPartName,
+			url,
+			forculusEncodingConfigId,
+		},
+	}
+	if err := sendObservations(urlMetricId, values, skipShuffler, numClients); err != nil {
+		t.Fatalf("url=%s, numClient=%d, err=%v", url, numClients, err)
+	}
+}
+
+// sendBasicRapporHourObservations sends Observations containing a Basic RAPPOR encoding of the
+// given |hour| to the Shuffler. |numClients| different, independent observations
+// will be sent.
+func sendBasicRapporHourObservations(hour int, numClients uint, t *testing.T) {
+	const skipShuffler = false
+	values := []ValuePart{
+		ValuePart{
+			hourMetricPartName,
+			strconv.Itoa(hour),
+			basicRapporEncodingConfigId,
+		},
+	}
+	if err := sendObservations(hourMetricId, values, skipShuffler, numClients); err != nil {
+		t.Fatalf("hour=%d, numClient=%d, err=%v", hour, numClients, err)
+	}
+}
+
+// getReport asks the ReportMaster to start a new report for the given |reportConfigId|
+// that spans all day indices. It then waits for the report generation to complete
+// and returns the Report.
+func getReport(reportConfigId uint32, includeStdErr bool, t *testing.T) *report_master.Report {
+	reportId, err := reportClient.StartCompleteReport(reportConfigId)
+	if err != nil {
+		t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
+	}
+
+	report, err := reportClient.GetReport(reportId, 10*time.Second)
+	if err != nil {
+		t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
+	}
+
+	return report
+}
+
+// getCSVReport asks the ReportMaster to start a new report for the given |reportConfigId|
+// that spans all day indices. It then waits for the report generation to complete
+// and returns the report in CSV format.
+func getCSVReport(reportConfigId uint32, includeStdErr bool, t *testing.T) string {
+	report := getReport(reportConfigId, includeStdErr, t)
+
+	csv, err := report_client.WriteCSVReportToString(report, includeStdErr)
+	if err != nil {
+		t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
+	}
+	return csv
+}
+
+// We run the full Cobalt pipeline using Metric 1, Encoding Config 1 and
+// Report Config 1. This uses Forculus with a threshold of 20 to count
+// URLs.
+func TestForculusEncodingOfUrls(t *testing.T) {
+	// We send some observations to the Shuffler.
+	sendForculusUrlObservations("www.AAAA.com", 18, t)
+	sendForculusUrlObservations("www.BBBB.com", 19, t)
+	sendForculusUrlObservations("www.CCCC.com", 20, t)
+	sendForculusUrlObservations("www.DDDD.com", 21, t)
+
+	// We have not yet sent 100 observations and the Shuffler's threshold is
+	// set to 100 so we except no observations to have been sent to the
+	// Analyzer yet.
+	numObservations, err := getNumObservations(1, 10)
+	if err != nil {
+		t.Fatalf("Error returned from getNumObservations[%v]", err)
+	}
+	if numObservations != 0 {
+		t.Fatalf("Expected no observations in the Observation store yet but got %d", numObservations)
+	}
+
+	// We send additional observations to the Shuffler. This crosses the Shuffler's
+	// threshold and so all observations should now be sent to the Analyzer.
+	sendForculusUrlObservations("www.EEEE.com", 22, t)
+	sendForculusUrlObservations("www.FFFF.com", 23, t)
+
+	// There should now be 123 Observations sent to the Analyzer for metric 1.
+	// We wait for them.
+	if err := waitForObservations(1, 123); err != nil {
+		t.Fatalf("%s", err)
+	}
+
+	// Finally we will run a report. This is the expected output of the report.
+	const expectedCSV = `www.CCCC.com,20.000
+www.DDDD.com,21.000
+www.EEEE.com,22.000
+www.FFFF.com,23.000
+`
+
+	// Generate the report, fetch it as a CSV, check it.
+	csv := getCSVReport(forculusUrlReportConfigId, false, t)
+	if csv != expectedCSV {
+		t.Errorf("Got csv:[%s]", csv)
+	}
+}
+
+// We run the full Cobalt pipeline using Metric 2, Encoding Config 2 and
+// Report Config 2. This uses Basic RAPPOR with integer categories for the
+// 24 hours of the day.
+func TestBasicRapporEncodingOfHours(t *testing.T) {
+	sendBasicRapporHourObservations(8, 501, t)
+	sendBasicRapporHourObservations(9, 1002, t)
+	sendBasicRapporHourObservations(10, 503, t)
+	sendBasicRapporHourObservations(16, 504, t)
+	sendBasicRapporHourObservations(17, 1005, t)
+	sendBasicRapporHourObservations(18, 506, t)
+
+	// There should now be 4021 Observations sent to the Analyzer for metric 2.
+	// We wait for them.
+	if err := waitForObservations(2, 4021); err != nil {
+		t.Fatalf("%s", err)
+	}
+
+	report := getReport(basicRapporHourReportConfigId, true, t)
+	if report.Metadata.State != report_master.ReportState_COMPLETED_SUCCESSFULLY {
+		t.Fatalf("report.Metadata.State=%v", report.Metadata.State)
+	}
+	rows := report_client.ReportToStrings(report, true)
+	if rows == nil {
+		t.Fatalf("rows is nil")
+	}
+	if len(rows) != 24 {
+		t.Fatalf("len(rows)=%d", len(rows))
+	}
+
+	for hour := 0; hour <= 23; hour++ {
+		if len(rows[hour]) != 3 {
+			t.Fatalf("len(rows[hour])=%d", len(rows[hour]))
+		}
+		if rows[hour][0] != strconv.Itoa(hour) {
+			t.Errorf("Expected %s, got %s", strconv.Itoa(hour), rows[hour][0])
+		}
+		val, err := strconv.ParseFloat(rows[hour][1], 32)
+		if err != nil {
+			t.Errorf("Error parsing %s as float: %v", rows[hour][1], err)
+			continue
+		}
+		switch hour {
+		case 8:
+			fallthrough
+		case 10:
+			fallthrough
+		case 16:
+			fallthrough
+		case 18:
+			if val < 10.0 || val > 1000.0 {
+				t.Errorf("For hour %d unexpected val: %v", hour, val)
+			}
+		case 9:
+			fallthrough
+		case 17:
+			if val < 500.0 || val > 2000.0 {
+				t.Errorf("For hour %d unexpected val: %v", hour, val)
+			}
+		default:
+			if val > 100.0 {
+				t.Errorf("Val larger than expected: %v", val)
+				continue
+			}
+		}
+		if rows[hour][2] != "23.779" {
+			t.Errorf("rows[hour][2]=%s", rows[hour][2])
+		}
+	}
+}
diff --git a/end_to_end_tests/src/cobalt_test.go b/end_to_end_tests/src/cobalt_test.go
deleted file mode 100644
index 1db777f..0000000
--- a/end_to_end_tests/src/cobalt_test.go
+++ /dev/null
@@ -1,338 +0,0 @@
-// Copyright 2016 The Fuchsia Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// This test stands up the Bigtable emulator, the analzyer, and uses cgen to
-// generate fake reports.  It then verifies the presence of the reports in
-// Bigtable.
-
-package main
-
-import (
-	"bufio"
-	"bytes"
-	"errors"
-	"log"
-	"net"
-	"os"
-	"os/exec"
-	"path/filepath"
-	"strconv"
-	"strings"
-	"syscall"
-	"testing"
-	"time"
-
-	"cloud.google.com/go/bigtable"
-	"golang.org/x/net/context"
-	"google.golang.org/api/option"
-	"google.golang.org/grpc"
-)
-
-// This fixture stands up the bigtable emulator.
-type BigtableFixture struct {
-	cmd      *exec.Cmd
-	host     string
-	project  string
-	instance string
-	table    string
-}
-
-func NewBigtableFixture() (*BigtableFixture, error) {
-	f := new(BigtableFixture)
-	// These strings must coincide with the ones in bigtable_emulator_helper.h
-	f.project = "TestProject"
-	f.instance = "TestInstance"
-	// This name must coincide with the one in bigtable_names.h
-	f.table = "observations"
-
-	sysRootDir, _ := filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "../../sysroot"))
-	bin := filepath.Join(sysRootDir, "gcloud", "google-cloud-sdk", "platform", "bigtable-emulator", "cbtemulator")
-	f.cmd = exec.Command(bin)
-
-	stdout, _ := f.cmd.StdoutPipe()
-	reader := bufio.NewReader(stdout)
-
-	// Create a process group so we can kill children
-	f.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
-	log.Printf("Starting Bigtable Emulator: %v", bin)
-	f.cmd.Start()
-
-	// Wait for bigtable to start
-	for {
-		line, err := reader.ReadString('\n')
-		if err != nil {
-			return nil, err
-		}
-
-		if strings.Contains(line, "running") {
-			fields := strings.Fields(line)
-			f.host = fields[len(fields)-1]
-			break
-		}
-	}
-	log.Printf("Bigtable Emulator started with host: %v", f.host)
-
-	return f, nil
-}
-
-func (f *BigtableFixture) Close() {
-	// Kill process group
-	pgid, _ := syscall.Getpgid(f.cmd.Process.Pid)
-	syscall.Kill(-pgid, syscall.SIGTERM)
-
-	f.cmd.Wait()
-}
-
-func (f *BigtableFixture) CountRows() int {
-	conn, err := grpc.Dial(f.host, grpc.WithInsecure())
-	if err != nil {
-		log.Printf("Error while attempting to connect to the Bigtable Emulator: %v", err)
-		return -1
-	}
-
-	ctx := context.Background()
-	client, err := bigtable.NewClient(ctx, f.project, f.instance, option.WithGRPCConn(conn))
-	if err != nil {
-		log.Printf("Error while attempting to create a Bigtable Client: %v", err)
-		return -1
-	}
-	defer client.Close()
-	tbl := client.Open(f.table)
-
-	count := 0
-	err = tbl.ReadRows(ctx, bigtable.InfiniteRange(""),
-		func(row bigtable.Row) bool {
-			count++
-			return true
-		})
-	if err != nil {
-		log.Printf("Error while attempting to count rows in Bigtable: %v", err)
-		return -1
-	}
-
-	return count
-}
-
-// This fixture stands up the Analyzer.  It depends on Bigtable being up.
-type AnalyzerFixture struct {
-	cmd      *exec.Cmd
-	cgen     string
-	outdir   string
-	bigtable *BigtableFixture
-}
-
-func DaemonStarted(dst string) bool {
-	for i := 0; i < 10; i++ {
-		conn, err := net.Dial("tcp", dst)
-		if err == nil {
-			conn.Close()
-			return true
-		}
-		time.Sleep(10 * time.Millisecond)
-	}
-
-	return false
-}
-
-func NewAnalyzerFixture() (*AnalyzerFixture, error) {
-	// Create Bigtable first
-	bigtable, err := NewBigtableFixture()
-	if err != nil {
-		return nil, err
-	}
-
-	// Create the Analyzer
-	f := new(AnalyzerFixture)
-	f.bigtable = bigtable
-
-	f.outdir, _ = filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "../../out"))
-	abin := filepath.Join(f.outdir, "analyzer", "analyzer")
-
-	configDir, _ := filepath.Abs(filepath.Join(f.outdir, "..", "config", "registered"))
-	f.cmd = exec.Command(abin, "--port=8080", "-for_testing_only_use_bigtable_emulator",
-		"--cobalt_config_dir", configDir, "-logtostderr")
-	var out bytes.Buffer
-	f.cmd.Stdout = &out
-	var serr bytes.Buffer
-	f.cmd.Stderr = &serr
-
-	log.Printf("Starting Analyzer: %v", strings.Join(f.cmd.Args, " "))
-	err = f.cmd.Start()
-	if err != nil {
-		log.Printf("Command finished with error:[%v] with stdout:[%s] and stderr:[%s]", err, out.String(), serr.String())
-		log.Fatal(err)
-	}
-
-	// Wait for it to start
-	if !DaemonStarted("127.0.0.1:8080") {
-		f.Close()
-		return nil, errors.New("Unable to start the analyzer")
-	}
-
-	// get path to cgen
-	f.cgen = filepath.Join(f.outdir, "/tools/cgen")
-
-	return f, nil
-}
-
-func (f *AnalyzerFixture) Close() {
-	f.cmd.Process.Kill()
-	f.cmd.Wait()
-	f.bigtable.Close()
-}
-
-// Fixture to start the Shuffler.  It depends on the Analyzer fixture, which in
-// turn depends on the Bigtable fixture.  The Shuffler fixture will start the
-// entire backend system.
-type ShufflerFixture struct {
-	cmd      *exec.Cmd
-	analyzer *AnalyzerFixture
-	outdir   string
-}
-
-func NewShufflerFixture() (*ShufflerFixture, error) {
-	// Create Analyzer first
-	analyzer, err := NewAnalyzerFixture()
-	if err != nil {
-		return nil, err
-	}
-
-	// Create the Shuffler
-	f := new(ShufflerFixture)
-	f.analyzer = analyzer
-	f.outdir = filepath.Join(filepath.Dir(os.Args[0]), "../../out")
-
-	bin, _ := filepath.Abs(filepath.Join(f.analyzer.outdir, "shuffler", "shuffler"))
-
-	shufflerTestConfig, _ := filepath.Abs(filepath.Join(f.outdir, "config", "shuffler_default.conf"))
-	f.cmd = exec.Command(bin,
-		"-config_file", shufflerTestConfig,
-		"-batch_size", strconv.Itoa(100),
-		"-vmodule=receiver=2,dispatcher=2,store=2",
-		"-logtostderr")
-
-	var out bytes.Buffer
-	f.cmd.Stdout = &out
-	var serr bytes.Buffer
-	f.cmd.Stderr = &serr
-
-	log.Printf("Starting Shuffler: %v", strings.Join(f.cmd.Args, " "))
-	err = f.cmd.Start()
-	if err != nil {
-		log.Printf("Command finished with error:[%v] with stdout:[%s] and stderr:[%s]", err, out.String(), serr.String())
-		log.Fatal(err)
-	}
-
-	if !DaemonStarted("127.0.0.1:50051") {
-		f.Close()
-		return nil, errors.New("Unable to start the shuffler")
-	}
-
-	return f, nil
-}
-
-func (f *ShufflerFixture) Close() {
-	f.cmd.Process.Kill()
-	f.cmd.Wait()
-	f.analyzer.Close()
-}
-
-// This test depends on the AnalyzerFixture.
-//
-// It uses cgen to create 2 fake reports.
-// It then asserts that 2 reports exist in Bigtable.
-func OTestAnalyzerAddObservations(t *testing.T) {
-	// Start the Analyzer and Bigtable emulator
-	f, err := NewAnalyzerFixture()
-	if err != nil {
-		t.Error("Fixture failed:", err)
-		return
-	}
-	defer f.Close()
-
-	// Run cgen on the analyzer to create 2 observations
-	num := 2
-	cmd := exec.Command(f.cgen,
-		"-analyzer_uri", "localhost:8080",
-		"-num_observations", strconv.Itoa(num))
-	if cmd.Run() != nil {
-		t.Error("cgen failed")
-		return
-	}
-
-	// Grab the observations from bigtable
-	count := f.bigtable.CountRows()
-	if count == -1 {
-		t.Error("Can't read rows")
-		return
-	}
-
-	if count != num {
-		t.Errorf("Unexpected number of rows got %v want %v", count, num)
-		return
-	}
-}
-
-// This test depends on the ShufflerFixture.
-//
-// It uses cgen to create 2 fake reports.
-// It then asserts that 2 reports exist in Bigtable.
-func TestShufflerProcess(t *testing.T) {
-	// Start the entire system.
-	f, err := NewShufflerFixture()
-	if err != nil {
-		t.Error("Fixture failed:", err)
-		return
-	}
-	defer f.Close()
-
-	// Run cgen on the analyzer to create 2 observations
-	num := 2
-	cmd := exec.Command(f.analyzer.cgen,
-		"-shuffler_uri", "localhost:50051",
-		"-analyzer_uri", "localhost:8080",
-		"-num_observations", strconv.Itoa(num),
-		"-num_rpcs", strconv.Itoa(num))
-
-	log.Printf("Running cgen: %v", strings.Join(cmd.Args, " "))
-	if cmd.Run() != nil {
-		t.Error("cgen failed")
-		return
-	}
-	log.Printf("cgen completed")
-
-	var rows int
-
-	// The shuffler RPC is async so it could take a while before the data
-	// reaches bigtable.  Try multiple times.
-	for i := 0; i < 5; i++ {
-		rows = f.analyzer.bigtable.CountRows()
-		if rows == -1 {
-			t.Error("Can't read rows")
-			return
-		}
-
-		if rows == num {
-			break
-		}
-
-		time.Sleep(10 * time.Millisecond)
-	}
-
-	if rows != num {
-		t.Errorf("Unexpected number of rows got %v want %v", rows, num)
-		return
-	}
-
-}
diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt
index 0b5a46a..0f54e18 100644
--- a/tools/CMakeLists.txt
+++ b/tools/CMakeLists.txt
@@ -12,14 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-add_executable(cgen cgen.cc)
-
-target_link_libraries(cgen
-                      analyzer_service_lib
-                      shuffler_grpc_client
-                      forculus_encrypter
-                      gflags)
-
 add_subdirectory(go/src)
 add_subdirectory(observation_querier)
 add_subdirectory(test_app)
diff --git a/tools/cgen.cc b/tools/cgen.cc
deleted file mode 100644
index d80a89e..0000000
--- a/tools/cgen.cc
+++ /dev/null
@@ -1,352 +0,0 @@
-// Copyright 2016 The Fuchsia Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Cobalt traffic generator.
-//
-// This is a test and debug client to send data to cobalt components
-
-#include <assert.h>
-#include <err.h>
-#include <libgen.h>
-#include <limits.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <sys/time.h>
-#include <unistd.h>
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-
-#include "./observation.pb.h"
-#include "algorithms/forculus/forculus_encrypter.h"
-#include "analyzer/analyzer_service/analyzer_service.h"
-#include "config/encoding_config.h"
-#include "config/encodings.pb.h"
-#include "shuffler/shuffler.grpc.pb.h"
-
-using cobalt::Envelope;
-using cobalt::ObservationBatch;
-using cobalt::analyzer::Analyzer;
-using cobalt::config::EncodingRegistry;
-using cobalt::encoder::ClientSecret;
-using cobalt::forculus::ForculusEncrypter;
-using cobalt::shuffler::Shuffler;
-using grpc::Channel;
-using grpc::ClientContext;
-using grpc::Status;
-using google::protobuf::Empty;
-
-namespace cobalt {
-namespace cgen {
-
-DEFINE_string(analyzer_uri, "", "The URI of the Analyzer");
-DEFINE_string(shuffler_uri, "", "The URI of the Shuffler");
-DEFINE_int32(num_rpcs, 1, "Number of RPCs to send");
-DEFINE_int32(num_observations, 1, "Number of Observations to generate");
-DEFINE_uint32(customer, 1, "Customer ID");
-DEFINE_uint32(project, 1, "Project ID");
-DEFINE_uint32(metric, 1, "Metric ID");
-DEFINE_uint32(encoding, 1, "Encoding ID");
-DEFINE_string(registry, "", "Registry path for registered_encodings.txt etc.");
-DEFINE_string(part, "city", "Observation part name");
-DEFINE_string(payload, "hello", "Observation payload");
-
-// Measures time between start and stop.  Useful for benchmarking.
-class Timer {
- public:
-  Timer() : start_(0), stop_(0) {}
-
-  void start() { start_ = now(); }
-
-  void stop() { stop_ = now(); }
-
-  // returns time in microseconds
-  uint64_t now() {
-    struct timeval tv;
-
-    gettimeofday(&tv, NULL);
-
-    return tv.tv_sec * 1000UL * 1000UL + tv.tv_usec;
-  }
-
-  uint64_t elapsed() {
-    assert(start_ && stop_);
-    assert(stop_ >= start_);
-
-    return stop_ - start_;
-  }
-
- private:
-  uint64_t start_;
-  uint64_t stop_;
-};  // class Timer
-
-// Encapsulates Observations.
-struct GenObservation {
-  Observation observation;
-  EncryptedMessage encrypted;
-  ObservationMetadata metadata;
-};
-
-// Generates observations and RPCs to Cobalt components
-class CGen {
- public:
-  void setup(int argc, char* argv[]) {
-    google::SetUsageMessage("Cobalt gRPC generator");
-    google::ParseCommandLineFlags(&argc, &argv, true);
-
-    customer_id_ = FLAGS_customer;
-    project_id_ = FLAGS_project;
-    metric_id_ = FLAGS_metric;
-    part_name_ = FLAGS_part;
-
-    std::string registry_path = FLAGS_registry;
-
-    // If no path is given, try to deduce it from the cgen location.
-    if (registry_path == "") {
-      char path[PATH_MAX], path2[PATH_MAX];
-
-      // Get the directory of cgen.
-      if (!realpath(argv[0], path)) LOG(FATAL) << "realpath(): " << argv[0];
-
-      char* dir = dirname(path);
-
-      // Set the relative path to the registry.
-      snprintf(path2, sizeof(path2), "%s/../../config/registered", dir);
-
-      // Get the absolute path to the registry.
-      if (!realpath(path2, path)) LOG(FATAL) << "realpath(): " << path2;
-
-      registry_path = path;
-    }
-
-    load_registries(registry_path);
-  }
-
-  void load_registries(const std::string& path) {
-    char fname[PATH_MAX];
-
-    snprintf(fname, sizeof(fname), "%s/registered_encodings.txt", path.c_str());
-
-    auto encodings = EncodingRegistry::FromFile(fname, nullptr);
-    if (encodings.second != config::kOK)
-      LOG(FATAL) << "Can't load encodings configuration";
-
-    encodings_ = std::move(encodings.first);
-  }
-
-  void start() {
-    generate_observations();
-
-    if (!FLAGS_shuffler_uri.empty() && !FLAGS_analyzer_uri.empty()) {
-      send_shuffler();
-    } else if (!FLAGS_analyzer_uri.empty()) {
-      send_analyzer();
-    }
-  }
-
- private:
-  // Creates a bunch of fake observations that can be sent to shufflers or
-  // analyzers.
-  void generate_observations() {
-    // Metadata setup.
-    ObservationMetadata metadata;
-
-    metadata.set_customer_id(customer_id_);
-    metadata.set_project_id(project_id_);
-    metadata.set_metric_id(metric_id_);
-    metadata.set_day_index(4);
-
-    // encode the observation.
-    const EncodingConfig* const enc =
-        encodings_->Get(customer_id_, project_id_, FLAGS_encoding);
-
-    if (!enc) LOG(FATAL) << "Unkown encoding: " << FLAGS_encoding;
-
-    // TODO(bittau): add support for algorithms other than forculus.
-    if (!enc->has_forculus()) LOG(FATAL) << "Unsupported encoding";
-
-    ForculusConfig config;
-    ClientSecret client_secret = ClientSecret::GenerateNewSecret();
-
-    config.set_threshold(enc->forculus().threshold());
-
-    ForculusEncrypter forculus(config, customer_id_, project_id_, metric_id_,
-                               part_name_, client_secret);
-
-    for (int i = 0; i < FLAGS_num_observations; i++) {
-      Observation obs;
-      ObservationPart part;
-
-      part.set_encoding_config_id(FLAGS_encoding);
-
-      ForculusObservation* forc_obs = part.mutable_forculus();
-      uint32_t day_index = 0;
-
-      if (forculus.Encrypt(FLAGS_payload, day_index, forc_obs) !=
-          forculus::ForculusEncrypter::kOK) {
-        LOG(FATAL) << "Forculus encryption failed";
-      }
-
-      // TODO(bittau): need to specify what key-value to use for
-      // single-dimension metrics.  Using DEFAULT for now.
-      (*obs.mutable_parts())[part_name_] = part;
-
-      // Encrypt the observation.
-      std::string cleartext, encrypted;
-
-      obs.SerializeToString(&cleartext);
-      encrypt(cleartext, &encrypted);
-
-      EncryptedMessage em;
-
-      em.set_ciphertext(encrypted);
-
-      // Add this to the list of fake observations.
-      GenObservation o;
-      o.observation = obs;
-      o.encrypted = em;
-      o.metadata = metadata;
-
-      observations_.push_back(o);
-    }
-  }
-
-  // Send observations to the analyzer.
-  void send_analyzer() {
-    CHECK(!FLAGS_analyzer_uri.empty());
-    std::shared_ptr<Channel> chan = grpc::CreateChannel(
-        FLAGS_analyzer_uri, grpc::InsecureChannelCredentials());
-
-    std::unique_ptr<Analyzer::Stub> analyzer(Analyzer::NewStub(chan));
-
-    // Generate the observation batch.
-    ObservationBatch req;
-    ObservationMetadata* metadata = req.mutable_meta_data();
-
-    for (const GenObservation& observation : observations_) {
-      // Assume all observations have the same metadata.
-      *metadata = observation.metadata;
-
-      EncryptedMessage* msg = req.add_encrypted_observation();
-      *msg = observation.encrypted;
-    }
-
-    // send RPCs
-    Timer t;
-    t.start();
-
-    for (int i = 0; i < FLAGS_num_rpcs; i++) {
-      ClientContext context;
-      Empty resp;
-
-      Status status = analyzer->AddObservations(&context, req, &resp);
-
-      if (!status.ok())
-        errx(1, "error sending RPC: %s", status.error_message().c_str());
-    }
-
-    t.stop();
-
-    printf("Took %lu ms for %d requests\n", t.elapsed() / 1000UL,
-           FLAGS_num_rpcs);
-  }
-
-  void send_shuffler() {
-    CHECK(!FLAGS_shuffler_uri.empty());
-
-    std::shared_ptr<Channel> chan = grpc::CreateChannel(
-        FLAGS_shuffler_uri, grpc::InsecureChannelCredentials());
-
-    std::unique_ptr<Shuffler::Stub> shuffler(Shuffler::NewStub(chan));
-
-    // Build the messages to send to the shuffler.
-    std::vector<EncryptedMessage> messages;
-
-    for (GenObservation& observation : observations_) {
-      // TODO(rudominer) Use the fact that an Envelope can hold
-      // multpile ObservationBatches and an ObservationBatch can hold
-      // multiple observations. For now we are using an Envelope per
-      // Observation.
-      Envelope envelope;
-      auto* observation_batch = envelope.add_batch();
-      observation_batch->set_allocated_meta_data(
-          new ObservationMetadata(observation.metadata));
-
-      auto* encrypted_observation =
-          observation_batch->add_encrypted_observation();
-      encrypted_observation->Swap(&observation.encrypted);
-
-      // Encrypt the envelope.
-      std::string cleartext, encrypted;
-
-      envelope.SerializeToString(&cleartext);
-      encrypt(cleartext, &encrypted);
-
-      EncryptedMessage em;
-      em.set_ciphertext(encrypted);
-
-      messages.push_back(em);
-    }
-
-    // send RPCs.
-    Timer t;
-    t.start();
-
-    auto msg_iter = messages.begin();
-
-    if (msg_iter == messages.end()) LOG(FATAL) << "Need messags";
-
-    for (int i = 0; i < FLAGS_num_rpcs; i++) {
-      ClientContext context;
-      Empty resp;
-
-      Status status = shuffler->Process(&context, *msg_iter, &resp);
-
-      if (!status.ok())
-        errx(1, "error sending RPC: %s", status.error_message().c_str());
-
-      if (++msg_iter == messages.end()) msg_iter = messages.begin();
-    }
-
-    t.stop();
-
-    printf("Took %lu ms for %d requests\n", t.elapsed() / 1000UL,
-           FLAGS_num_rpcs);
-  }
-
-  void encrypt(const std::string& in, std::string* out) {
-    // TODO(pseudorandom): please implement
-    *out = in;
-  }
-
-  int customer_id_;
-  int project_id_;
-  int metric_id_;
-  std::string part_name_;
-  std::vector<GenObservation> observations_;
-  std::unique_ptr<EncodingRegistry> encodings_;
-};  // class CGen
-
-}  // namespace cgen
-}  // namespace cobalt
-
-int main(int argc, char* argv[]) {
-  cobalt::cgen::CGen cgen;
-
-  cgen.setup(argc, argv);
-  cgen.start();
-
-  exit(0);
-}
diff --git a/tools/go/src/report_client/report_client.go b/tools/go/src/report_client/report_client.go
index a19f176..ff9a387 100644
--- a/tools/go/src/report_client/report_client.go
+++ b/tools/go/src/report_client/report_client.go
@@ -19,6 +19,7 @@
 package report_client
 
 import (
+	"bytes"
 	"encoding/csv"
 	"fmt"
 	"io"
@@ -322,15 +323,25 @@
 	}
 }
 
+// ReportRowsSortedByValues returns a sorted slice of ReportRows.
+// The rows of are sorted in increasing order of their values.
+// It is possible for nil to be returned if there are not ReportRows.
+func ReportRowsSortedByValues(report *report_master.Report, includeStdErr bool) []*report_master.ReportRow {
+	rows := report.GetRows().GetRows()
+	if rows != nil {
+		sort.Sort(ByValues(rows))
+	}
+	return rows
+}
+
 // ReportToStrings returns a sorted list of human-readable report rows.
 // Each element of the returned list represents  a row of the report.
 // The rows of are sorted in increasing order of their values.
 // Each row is itself a list of strings as specified by ReportRowToStrings.
 func ReportToStrings(report *report_master.Report, includeStdErr bool) [][]string {
 	result := [][]string{}
-	rows := report.GetRows().GetRows()
+	rows := ReportRowsSortedByValues(report, includeStdErr)
 	if rows != nil {
-		sort.Sort(ByValues(rows))
 		for _, row := range rows {
 			result = append(result, ReportRowToStrings(row, includeStdErr))
 		}
@@ -354,3 +365,15 @@
 	csvWriter.Flush()
 	return nil
 }
+
+// WriteCSVReportToString writes a comma-separated values representation of the
+// given |report| and returns it as a string. See comments at WriteCSVReport
+// for more details.
+func WriteCSVReportToString(report *report_master.Report, includeStdErr bool) (csv string, err error) {
+	var buffer bytes.Buffer
+	if err = WriteCSVReport(&buffer, report, includeStdErr); err != nil {
+		return
+	}
+	csv = buffer.String()
+	return
+}