blob: 5e1a44d2ea65aa44428de9ec3086fcb162749fff [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"io/ioutil"
"path/filepath"
"receiver"
"time"
"dispatcher"
"shuffler"
"shuffler_config"
"storage"
"util/stackdriver"
"github.com/golang/glog"
)
var (
// If true, tls is enabled for both server and client connections
tls = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
tls_to_analyzer = flag.Bool("tls_to_analyzer", false, "Use TLS to connect to the analyzer")
// shuffler server configuration flags
certFile = flag.String("cert_file", "", "The TLS cert file")
keyFile = flag.String("key_file", "", "The TLS key file")
port = flag.Int("port", 50051, "The server port")
privateKeyPemFile = flag.String("private_key_pem_file", "",
"Path to a file containing a PEM encoding of the private key of "+
"the Shuffler used for Cobalt's internal encryption scheme. If "+
"not specified then the Shuffler will not support encrypted Envelopes.")
// shuffler client configuration flags to connect to analyzer
caFile = flag.String("ca_file", "", "The file containing the CA root certificate")
timeout = flag.Int("timeout", 30, "Grpc connection timeout in seconds")
analyzerURL = flag.String("analyzer_uri", "", "The URL for analyzer service")
// shuffler dispatch configuration flags
configFile = flag.String("config_file", "", "The Shuffler config file")
batchSize = flag.Int("batch_size", 1000, "The size of ObservationBatch to be sent to Analyzer")
// shuffler db configuration flags
useMemStore = flag.Bool("use_memstore", false, "Shuffler uses in memory store if true, else persistent store")
dbDir = flag.String("db_dir", "", "Path to the Shuffler local datastore")
deleteAllData = flag.Bool("danger_danger_delete_all_data_at_startup", false,
"If true then upon startup all data from previous executions of the Shuffler will be deleted. "+
"This should not be set true in normal shuffler operation.")
)
const (
readPrivateKeyPemFileFailure = "shuffler-main-read-private-key-pem-file-failure"
)
func main() {
flag.Parse()
// Initialize Shuffler configuration
var sConfig *shuffler.ShufflerConfig
var err error
if *configFile == "" {
glog.Warning("Using Shuffler default configuration. Pass -config_file to specify custom config options.")
// Use the default config
sConfig = &shuffler.ShufflerConfig{}
sConfig.GlobalConfig = &shuffler.Policy{
FrequencyInHours: 24,
PObservationDrop: 0.0,
Threshold: 500,
DisposalAgeDays: 4,
}
} else {
if sConfig, err = shuffler_config.LoadConfig(*configFile); err != nil {
glog.Fatal("Error loading shuffler config file: [", *configFile, "]: ", err)
}
}
// Read the private key PEM file
privateKeyPem := ""
if *privateKeyPemFile != "" {
if fileContents, err := ioutil.ReadFile(*privateKeyPemFile); err != nil {
stackdriver.LogCountMetricf(readPrivateKeyPemFileFailure,
"Error attempting to read private key PEM file %s: %v. "+
"The shuffler will not be able to decrypt EncryptedMessages.", *privateKeyPemFile, err)
} else {
glog.Infof("Successfully read private key PEM file %s.", *privateKeyPemFile)
privateKeyPem = string(fileContents)
}
} else {
glog.Warning("The flag -private_key_pem_file was not provided. The shuffler will not be able to decrypt EncryptedMessages.")
}
// Initialize Shuffler data store
var store storage.Store
if *useMemStore {
glog.Warning("Using MemStore--data will not be persistent. All data will be lost when the Shufler restarts!")
store = storage.NewMemStore()
} else {
if *dbDir == "" {
glog.Fatal("Either -use_memstore or -db_dir are required.")
}
observationsDBpath, err := filepath.Abs(filepath.Join(*dbDir, "observations_db"))
if err != nil {
glog.Fatal("%v", err)
}
glog.Infof("Using LevelDB store located at %s.", observationsDBpath)
if store, err = storage.NewLevelDBStore(observationsDBpath); err != nil || store == nil {
glog.Fatal("Error initializing shuffler datastore: [", *dbDir, "]: ", err)
}
if *deleteAllData {
glog.Warning("*** WARNING: DELETING ALL DATA FROM SHUFFLER'S DATA STORE!!! ***")
glog.Warning("The flag -danger_danger_delete_all_data_at_startup was passed.")
store.(*storage.LevelDBStore).EraseAllData()
}
}
// Override analyzer client's url if |analyzerURL| flag is set
url := sConfig.GetGlobalConfig().AnalyzerUrl
if *analyzerURL != "" {
url = *analyzerURL
}
grpcAnalyzerClient := dispatcher.NewGrpcAnalyzerTransport(&dispatcher.GrpcClientConfig{
EnableTLS: *tls_to_analyzer,
CAFile: *caFile,
Timeout: time.Duration(*timeout) * time.Second,
URL: url,
})
// Start dispatcher and keep polling for dispatch events
go dispatcher.Start(sConfig, store, *batchSize, grpcAnalyzerClient)
// Start listening on receiver for incoming requests from Encoder
receiver.Run(store, &receiver.ServerConfig{
EnableTLS: *tls,
CertFile: *certFile,
KeyFile: *keyFile,
Port: *port,
PrivateKeyPem: privateKeyPem,
})
}