blob: 2e161fd1be551d10f7cf5f9df58afae0f0ddeb99 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
var (
// If true, tls is enabled for both server and client connections
tls = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
// shuffler server configuration flags
certFile = flag.String("cert_file", "", "The TLS cert file")
keyFile = flag.String("key_file", "", "The TLS key file")
port = flag.Int("port", 50051, "The server port")
privateKeyPemFile = flag.String("private_key_pem_file", "",
"Path to a file containing a PEM encoding of the private key of "+
"the Shuffler used for Cobalt's internal encryption scheme. If "+
"not specified then the Shuffler will not support encrypted Envelopes.")
// shuffler client configuration flags to connect to analyzer
caFile = flag.String("ca_file", "", "The file containing the CA root certificate")
timeout = flag.Int("timeout", 30, "Grpc connection timeout in seconds")
analyzerURL = flag.String("analyzer_uri", "", "The URL for analyzer service")
// shuffler dispatch configuration flags
configFile = flag.String("config_file", "", "The Shuffler config file")
batchSize = flag.Int("batch_size", 1000, "The size of ObservationBatch to be sent to Analyzer")
// shuffler db configuration flags
useMemStore = flag.Bool("use_memstore", false, "Shuffler uses in memory store if true, else persistent store")
dbDir = flag.String("db_dir", "", "Path to the Shuffler local datastore")
deleteAllData = flag.Bool("danger_danger_delete_all_data_at_startup", false,
"If true then upon startup all data from previous executions of the Shuffler will be deleted. "+
"This should not be set true in normal shuffler operation.")
func main() {
// Initialize Shuffler configuration
var sConfig *shuffler.ShufflerConfig
var err error
if *configFile == "" {
glog.Warning("Using Shuffler default configuration. Pass -config_file to specify custom config options.")
// Use the default config
sConfig = &shuffler.ShufflerConfig{}
sConfig.GlobalConfig = &shuffler.Policy{
FrequencyInHours: 24,
PObservationDrop: 0.0,
Threshold: 500,
DisposalAgeDays: 4,
} else {
if sConfig, err = config.LoadConfig(*configFile); err != nil {
glog.Fatal("Error loading shuffler config file: [", *configFile, "]: ", err)
// Read the private key PEM file
privateKeyPem := ""
if *privateKeyPemFile != "" {
if fileContents, err := ioutil.ReadFile(*privateKeyPemFile); err != nil {
glog.Errorf("Error attempting to read private key PEM file %s: %v. "+
"The shuffler will not be able to decrypt EncryptedMessages.", *privateKeyPemFile, err)
} else {
glog.Infof("Successfully read private key PEM file %s.", *privateKeyPemFile)
privateKeyPem = string(fileContents)
} else {
glog.Warning("The flag -private_key_pem_file was not provided. The shuffler will not be able to decrypt EncryptedMessages.")
// Initialize Shuffler data store
var store storage.Store
if *useMemStore {
glog.Warning("Using MemStore--data will not be persistent. All data will be lost when the Shufler restarts!")
store = storage.NewMemStore()
} else {
if *dbDir == "" {
glog.Fatal("Either -use_memstore or -db_dir are required.")
observationsDBpath, err := filepath.Abs(filepath.Join(*dbDir, "observations_db"))
if err != nil {
glog.Fatal("%v", err)
glog.Infof("Using LevelDB store located at %s.", observationsDBpath)
if store, err = storage.NewLevelDBStore(observationsDBpath); err != nil || store == nil {
glog.Fatal("Error initializing shuffler datastore: [", *dbDir, "]: ", err)
if *deleteAllData {
glog.Warning("The flag -danger_danger_delete_all_data_at_startup was passed.")
// Override analyzer client's url if |analyzerURL| flag is set
url := sConfig.GetGlobalConfig().AnalyzerUrl
if *analyzerURL != "" {
url = *analyzerURL
grpcAnalyzerClient := dispatcher.NewGrpcAnalyzerTransport(&dispatcher.GrpcClientConfig{
EnableTLS: *tls,
CAFile: *caFile,
Timeout: time.Duration(*timeout) * time.Second,
URL: url,
// Start dispatcher and keep polling for dispatch events
go dispatcher.Start(sConfig, store, *batchSize, grpcAnalyzerClient)
// Start listening on receiver for incoming requests from Encoder
receiver.Run(store, &receiver.ServerConfig{
EnableTLS: *tls,
CertFile: *certFile,
KeyFile: *keyFile,
Port: *port,
PrivateKeyPem: privateKeyPem,