blob: 3ab38d9fe715b0ec35135573b8f5e890a53e6eb8 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
An end-to-end test of Cobalt. This test assumes the existence of a running
Cobalt system. The URIs of the Shuffler, the Analyzer Service and the
ReportMaster are passed in to the test as flags. Usually this test is
invoked via the command "cobaltb.py test --tests=e2e" which invokes
"tools/test_runner.py" which uses "tools/process_starter.py" to start the
Cobalt processes prior to invoking this test.
The test uses the |cobalt_test_app| program in order to encode values into
observations and send those observations to the Shuffler. It then uses
the |query_observations| tool to query the Observation Store and it waits
until all of the observations have arrived at the Observation Store. It
then uses the report_client to contact the ReportMaster and request
that a report be generated and wait for the report to be generated. It then
validates the report.
The test assumes particular contents of the Cobalt registration system.
By default "tools/process_starter.py" will cause the Cobalt processes to
use the configuration files located in <source root>/config/registered. Thus
this test must be kept in sync with the contents of those files. Here
we include a copy of the relevant parts of those files for reference:
#### Metric (1, 1, 1)
element {
customer_id: 1
project_id: 1
id: 1
name: "Fuchsia Popular URLs"
description: "This is a fictional metric used for the development of Cobalt."
time_zone_policy: LOCAL
parts {
key: "url"
value {
description: "A URL."
data_type: STRING
}
}
}
#### Metric (1, 1, 2)
element {
customer_id: 1
project_id: 1
id: 2
name: "Fuchsia Usage by Hour"
description: "This is a fictional metric used for the development of Cobalt."
time_zone_policy: LOCAL
parts {
key: "hour"
value {
description: "An integer from 0 to 23 representing the hour of the day."
data_type: INT
}
}
}
#### Encoding (1, 1, 1)
element {
customer_id: 1
project_id: 1
id: 1
forculus {
threshold: 20
epoch_type: DAY
}
}
#### Encoding (1, 1, 2)
element {
customer_id: 1
project_id: 1
id: 2
basic_rappor {
prob_0_becomes_1: 0.1
prob_1_stays_1: 0.9
int_range_categories: {
first: 0
last: 23
}
}
}
#### ReportConfig (1, 1, 1)
element {
customer_id: 1
project_id: 1
id: 1
name: "Fuchsia Popular URLs"
description: "This is a fictional report used for the development of Cobalt."
metric_id: 1
variable {
metric_part: "url"
}
aggregation_epoch_type: DAY
report_delay_days: 1
}
#### ReportConfig (1, 1, 2)
element {
customer_id: 1
project_id: 1
id: 2
name: "Fuchsia Usage by Hour"
description: "This is a fictional report used for the development of Cobalt."
metric_id: 2
variable {
metric_part: "hour"
}
aggregation_epoch_type: WEEK
report_delay_days: 5
}
*/
package main
import (
"bytes"
"flag"
"fmt"
"os/exec"
"strconv"
"testing"
"time"
"analyzer/report_master"
"github.com/golang/glog"
"report_client"
)
const (
customerId = 1
projectId = 1
urlMetricId = 1
hourMetricId = 2
forculusEncodingConfigId = 1
basicRapporEncodingConfigId = 2
forculusUrlReportConfigId = 1
basicRapporHourReportConfigId = 2
hourMetricPartName = "hour"
urlMetricPartName = "url"
)
var (
observationQuerierPath = flag.String("observation_querier_path", "", "The full path to the Observation querier binary.")
testAppPath = flag.String("test_app_path", "", "The full path to the Cobalt test app binary.")
analyzerUri = flag.String("analyzer_uri", "", "The URI of the Analyzer Service.")
reportMasterUri = flag.String("report_master_uri", "", "The URI of the Report Master.")
shufflerUri = flag.String("shuffler_uri", "", "The URI of the Shuffler.")
subProcessVerbosity = flag.Int("sub_process_v", 0, "-v verbosity level to pass to sub-processes")
reportClient *report_client.ReportClient
)
func init() {
flag.Parse()
reportClient = report_client.NewReportClient(customerId, projectId, *reportMasterUri, false, "")
}
// A ValuePart represents part of an input to the Cobalt encoder. It specifies
// that the given integer or string should be encoded using the given
// EncodingConfig and associated with the given metric part name.
type ValuePart struct {
// The name of the metric part this value is for.
partName string
// The string representation of the value. If the value is of integer
// type this should be the representation using strconv.Itoa.
repr string
// The EncodingConfig id.
encoding uint32
}
// String returns a string representation of the ValuePart in the form
// <partName>:<repr>:<encoding>. This is the form accepted as a flag to
// the Cobalt test application.
func (p *ValuePart) String() string {
return p.partName + ":" + p.repr + ":" + strconv.Itoa(int(p.encoding))
}
// FlagString builds a string appropriate to use as a flag value to the Cobalt
// test application.
func flagString(values []ValuePart) string {
var buffer bytes.Buffer
for i := 0; i < len(values); i++ {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(values[i].String())
}
return buffer.String()
}
// Invokes the "query_observations" command in order to query the ObservationStore
// to determine the number of Observations contained in the store for the
// given metric. |maxNum| bounds the query so that the returned value will
// always be less than or equal to maxNum.
func getNumObservations(metricId uint32, maxNum uint32) (uint32, error) {
cmd := exec.Command(*observationQuerierPath,
"-nointeractive",
"-for_testing_only_use_bigtable_emulator",
"-logtostderr", fmt.Sprintf("-v=%d", *subProcessVerbosity),
"-metric", strconv.Itoa(int(metricId)),
"-max_num", strconv.Itoa(int(maxNum)))
out, err := cmd.Output()
if err != nil {
stdErrMessage := ""
if exitError, ok := err.(*exec.ExitError); ok {
stdErrMessage = string(exitError.Stderr)
}
return 0, fmt.Errorf("Error returned from query_observations process: [%v] %s", err, stdErrMessage)
}
num, err := strconv.Atoi(string(out))
if err != nil {
return 0, fmt.Errorf("Unable to parse output of query_observations as an integer: error=[%v] output=[%v]", err, out)
}
if num < 0 {
return 0, fmt.Errorf("Expected non-negative integer received %d", num)
}
return uint32(num), nil
}
// Queries the Observation Store for the number of observations for the given
// metric. If there is an error or the number of observations found is greater
// then the |expectedNum| returns a non-nil error. If the number of observations
// found is equal to the |expectedNum| returns nil (indicating success.) Otherwise
// the number of observations found is less than the |expectedNum|. In this case
// this function sleeps for one second and then tries again, repeating that for
// up to 30 attempts and finally returns a non-nil error.
func waitForObservations(metricId uint32, expectedNum uint32) error {
for trial := 0; trial < 30; trial++ {
num, err := getNumObservations(metricId, expectedNum+1)
if err != nil {
return err
}
if num == expectedNum {
return nil
}
if num > expectedNum {
return fmt.Errorf("Expected %d got %d", expectedNum, num)
}
glog.V(3).Infof("Observation store has %d observations. Waiting for %d...", num, expectedNum)
time.Sleep(time.Second)
}
return fmt.Errorf("After 30 attempts the number of observations was still not the expected number of %d", expectedNum)
}
// sendObservations uses the cobalt_test_app to encode the given values into observations and send the
// observations to the Shuffler or the Analyzer.
func sendObservations(metricId uint32, values []ValuePart, skipShuffler bool, numClients uint) error {
cmd := exec.Command(*testAppPath,
"-mode", "send-once",
"-analyzer_uri", *analyzerUri,
"-shuffler_uri", *shufflerUri,
"-logtostderr", fmt.Sprintf("-v=%d", *subProcessVerbosity),
"-metric", strconv.Itoa(int(metricId)),
"-num_clients", strconv.Itoa(int(numClients)),
"-values", flagString(values))
stdoutStderr, err := cmd.CombinedOutput()
message := string(stdoutStderr)
if len(message) > 0 {
fmt.Printf("%s", stdoutStderr)
}
return err
}
// sendForculusUrlObservations sends Observations containing a Forculus encryption of the
// given |url| to the Shuffler. |numClients| different, independent
// observations will be sent.
func sendForculusUrlObservations(url string, numClients uint, t *testing.T) {
const skipShuffler = false
values := []ValuePart{
ValuePart{
urlMetricPartName,
url,
forculusEncodingConfigId,
},
}
if err := sendObservations(urlMetricId, values, skipShuffler, numClients); err != nil {
t.Fatalf("url=%s, numClient=%d, err=%v", url, numClients, err)
}
}
// sendBasicRapporHourObservations sends Observations containing a Basic RAPPOR encoding of the
// given |hour| to the Shuffler. |numClients| different, independent observations
// will be sent.
func sendBasicRapporHourObservations(hour int, numClients uint, t *testing.T) {
const skipShuffler = false
values := []ValuePart{
ValuePart{
hourMetricPartName,
strconv.Itoa(hour),
basicRapporEncodingConfigId,
},
}
if err := sendObservations(hourMetricId, values, skipShuffler, numClients); err != nil {
t.Fatalf("hour=%d, numClient=%d, err=%v", hour, numClients, err)
}
}
// getReport asks the ReportMaster to start a new report for the given |reportConfigId|
// that spans all day indices. It then waits for the report generation to complete
// and returns the Report.
func getReport(reportConfigId uint32, includeStdErr bool, t *testing.T) *report_master.Report {
reportId, err := reportClient.StartCompleteReport(reportConfigId)
if err != nil {
t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
}
report, err := reportClient.GetReport(reportId, 10*time.Second)
if err != nil {
t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
}
return report
}
// getCSVReport asks the ReportMaster to start a new report for the given |reportConfigId|
// that spans all day indices. It then waits for the report generation to complete
// and returns the report in CSV format.
func getCSVReport(reportConfigId uint32, includeStdErr bool, t *testing.T) string {
report := getReport(reportConfigId, includeStdErr, t)
csv, err := report_client.WriteCSVReportToString(report, includeStdErr)
if err != nil {
t.Fatalf("reportConfigId=%d, err=%v", reportConfigId, err)
}
return csv
}
// We run the full Cobalt pipeline using Metric 1, Encoding Config 1 and
// Report Config 1. This uses Forculus with a threshold of 20 to count
// URLs.
func TestForculusEncodingOfUrls(t *testing.T) {
// We send some observations to the Shuffler.
sendForculusUrlObservations("www.AAAA.com", 18, t)
sendForculusUrlObservations("www.BBBB.com", 19, t)
sendForculusUrlObservations("www.CCCC.com", 20, t)
sendForculusUrlObservations("www.DDDD.com", 21, t)
// We have not yet sent 100 observations and the Shuffler's threshold is
// set to 100 so we except no observations to have been sent to the
// Analyzer yet.
numObservations, err := getNumObservations(1, 10)
if err != nil {
t.Fatalf("Error returned from getNumObservations[%v]", err)
}
if numObservations != 0 {
t.Fatalf("Expected no observations in the Observation store yet but got %d", numObservations)
}
// We send additional observations to the Shuffler. This crosses the Shuffler's
// threshold and so all observations should now be sent to the Analyzer.
sendForculusUrlObservations("www.EEEE.com", 22, t)
sendForculusUrlObservations("www.FFFF.com", 23, t)
// There should now be 123 Observations sent to the Analyzer for metric 1.
// We wait for them.
if err := waitForObservations(1, 123); err != nil {
t.Fatalf("%s", err)
}
// Finally we will run a report. This is the expected output of the report.
const expectedCSV = `www.CCCC.com,20.000
www.DDDD.com,21.000
www.EEEE.com,22.000
www.FFFF.com,23.000
`
// Generate the report, fetch it as a CSV, check it.
csv := getCSVReport(forculusUrlReportConfigId, false, t)
if csv != expectedCSV {
t.Errorf("Got csv:[%s]", csv)
}
}
// We run the full Cobalt pipeline using Metric 2, Encoding Config 2 and
// Report Config 2. This uses Basic RAPPOR with integer categories for the
// 24 hours of the day.
func TestBasicRapporEncodingOfHours(t *testing.T) {
sendBasicRapporHourObservations(8, 501, t)
sendBasicRapporHourObservations(9, 1002, t)
sendBasicRapporHourObservations(10, 503, t)
sendBasicRapporHourObservations(16, 504, t)
sendBasicRapporHourObservations(17, 1005, t)
sendBasicRapporHourObservations(18, 506, t)
// There should now be 4021 Observations sent to the Analyzer for metric 2.
// We wait for them.
if err := waitForObservations(2, 4021); err != nil {
t.Fatalf("%s", err)
}
report := getReport(basicRapporHourReportConfigId, true, t)
if report.Metadata.State != report_master.ReportState_COMPLETED_SUCCESSFULLY {
t.Fatalf("report.Metadata.State=%v", report.Metadata.State)
}
rows := report_client.ReportToStrings(report, true)
if rows == nil {
t.Fatalf("rows is nil")
}
if len(rows) != 24 {
t.Fatalf("len(rows)=%d", len(rows))
}
for hour := 0; hour <= 23; hour++ {
if len(rows[hour]) != 3 {
t.Fatalf("len(rows[hour])=%d", len(rows[hour]))
}
if rows[hour][0] != strconv.Itoa(hour) {
t.Errorf("Expected %s, got %s", strconv.Itoa(hour), rows[hour][0])
}
val, err := strconv.ParseFloat(rows[hour][1], 32)
if err != nil {
t.Errorf("Error parsing %s as float: %v", rows[hour][1], err)
continue
}
switch hour {
case 8:
fallthrough
case 10:
fallthrough
case 16:
fallthrough
case 18:
if val < 10.0 || val > 1000.0 {
t.Errorf("For hour %d unexpected val: %v", hour, val)
}
case 9:
fallthrough
case 17:
if val < 500.0 || val > 2000.0 {
t.Errorf("For hour %d unexpected val: %v", hour, val)
}
default:
if val > 100.0 {
t.Errorf("Val larger than expected: %v", val)
continue
}
}
if rows[hour][2] != "23.779" {
t.Errorf("rows[hour][2]=%s", rows[hour][2])
}
}
}