blob: de5d179acfdb4198c8c7f555384d5e5414cfbd12 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Shuffler receives ciphertexts from Encoders (end users), buffers them
// according to a policy, and then batch sends them in a random order to an
// Analyzer. The purpose is to break linkability between end users and
// ciphertexts from the Analyzer's point of view. The Analyzer does not know
// which end user produced which ciphertext.
package dispatcher
import (
// We sleep for this amount of time between buckets and between batches within a bucket
const dispatchDelay = 1 * time.Second
// In the case that FrequencyInHours has been set to zero we sleep for this
// duration between each invocation of Dispatch().
const minWaitTime = 1 * time.Second
const (
dispatchFailed = "dispatcher-dispatch-failed"
dispatchBucketFailed = "dispatcher-dispatch-bucket-failed"
deleteOldObservationsFailed = "dispatcher-delete-old-observations-failed"
makeBatchFailed = "dispatcher-make-batch-failed"
// AnalyzerTransport is an interface for Analyzer where the observations get
// collected, analyzed and reported.
type AnalyzerTransport interface {
send(obBatch *cobalt.ObservationBatch) error
// GrpcClientConfig lists the grpc client configuration parameters required to
// connect to Analyzer.
// If |EnableTLS| is false an insecure connection is used, and the remaining
// parameters except |URL| are ignored, otherwise TLS is used.
// |cc.CAFile| is optional. If non-empty it should specify the path to a file
// containing a PEM encoding of root certificates to use for TLS.
// |URL| specifies the url for the analyzer.
// |Timeout| specifies the time duration to terminate the client
// grpc connection to analyzer.
type GrpcClientConfig struct {
EnableTLS bool
CAFile string
Timeout time.Duration
URL string
// GrpcAnalyzerTransport sends data to Analyzer specified by Grpc |clientConfig|
// using the |client| interface.
// |conn| handle is used for closing and re-establishing grpc connections when
// dispatcher toggles between send and wait modes.
type GrpcAnalyzerTransport struct {
clientConfig *GrpcClientConfig
conn *grpc.ClientConn
client analyzer_service.AnalyzerClient
// NewGrpcAnalyzerTransport establishes a Grpc connection to the Analyzer
// specified by |clientConfig|, and returns a new |GrpcAnalyzerTransport|.
// Panics if |clientConfig| is nil or if the underlying grpc connection cannot
// be established.
func NewGrpcAnalyzerTransport(clientConfig *GrpcClientConfig) *GrpcAnalyzerTransport {
conn := connect(clientConfig)
return &GrpcAnalyzerTransport{
clientConfig: clientConfig,
conn: conn,
client: analyzer_service.NewAnalyzerClient(conn),
// connect returns a Grpc |ClientConn| handle after successfully establishing
// a connection to the analyzer endpoint using |cc| config parameters.
// If |cc.EnableTLS| is false an insecure connection is used, and the remaining
// parameters or ignored, otherwise TLS is used.
// |cc.CAFile| is optional. If non-empty it should specify the path to a file
// containing a PEM encoding of root certificates to use for TLS.
// Logs and crashes on any grpc failure, and panics if |cc| is not set.
func connect(cc *GrpcClientConfig) *grpc.ClientConn {
if cc == nil {
panic("Grpc client configuration is not set.")
glog.V(3).Infoln("Connecting to analyzer at:", cc.URL)
var opts []grpc.DialOption
if cc.EnableTLS {
var creds credentials.TransportCredentials
if cc.CAFile != "" {
var err error
creds, err = credentials.NewClientTLSFromFile(cc.CAFile, "")
if err != nil {
glog.Fatalf("Failed to create TLS credentials %v", err)
} else {
creds = credentials.NewClientTLSFromCert(nil, "")
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithBlock())
opts = append(opts, grpc.WithTimeout(cc.Timeout))
glog.V(4).Infoln("Dialing", cc.URL, "...")
conn, err := grpc.Dial(cc.URL, opts...)
if err != nil {
glog.Fatalf("Error in establishing connection to Analyzer [%v]: %v", cc.URL, err)
return conn
// close closes all the grpc underlying connections to Analyzer.
func (g *GrpcAnalyzerTransport) close() {
if g == nil {
panic("GrpcAnalyzerTransport is not set.")
if g.conn != nil {
g.conn = nil
// reconnect re-establishes the Grpc client connection to the Analyzer using the
// existing parameters from |g.clientConfig| and updates |g| accordingly.
func (g *GrpcAnalyzerTransport) reconnect() {
if g == nil {
panic("GrpcAnalyzerTransport is not set.")
if g.conn == nil {
g.conn = connect(g.clientConfig)
g.client = analyzer_service.NewAnalyzerClient(g.conn)
// send forwards a given ObservationBatch to Analyzer using the AddObservations
// interface.
func (g *GrpcAnalyzerTransport) send(obBatch *cobalt.ObservationBatch) error {
if g == nil {
panic("GrpcAnalyzerTransport is not set.")
if obBatch == nil {
return grpc.Errorf(codes.InvalidArgument, "ObservationBatch is not set.")
// Analyzer forwards a new context, so as to break the context correlation
// between originating request and the shuffled request that is being
// forwarded.
glog.V(3).Infof("Sending batch of %d observations to the analyzer.", len(obBatch.GetEncryptedObservation()))
_, err := g.client.AddObservations(context.Background(), obBatch)
if err != nil {
return grpc.Errorf(codes.Internal, "AddObservations call failed with error: %v", err)
glog.V(4).Infoln("ObservationBatch dispatched successfully.")
return nil
// Dispatcher stores and forwards encoder requests to |analyzer|s based on the
// type of |store|, |config|, |batchSize| and the |lastDispatchTime|.
type Dispatcher struct {
store storage.Store
config *shuffler.ShufflerConfig
batchSize int
analyzerTransport AnalyzerTransport
lastDispatchTime time.Time
var dispatcherSingleton *Dispatcher
// Start function either routes the incoming request from Encoder to next
// Shuffler or to the Analyzer, if the dispatch criteria is met. If the
// dispatch criteria is not met, the incoming Observation is buffered locally
// for the next dispatch attempt.
func Start(config *shuffler.ShufflerConfig, store storage.Store, batchSize int, analyzerTransport AnalyzerTransport) {
if store == nil {
glog.Fatal("Invalid data store handle, exiting.")
if config == nil {
glog.Fatal("Invalid server config, exiting.")
if analyzerTransport == nil {
glog.Fatal("Invalid Analyzer client.")
if batchSize <= 0 {
glog.Fatal("Invalid batch size.")
if dispatcherSingleton != nil {
glog.Fatal("Start() must not be invoked twice, exiting.")
// invoke dispatcher
dispatcherSingleton := &Dispatcher{
store: store,
config: config,
batchSize: batchSize,
analyzerTransport: analyzerTransport,
lastDispatchTime: time.Time{},
// Run dispatches stored observations to the Analyzer per each
// ObservationMetadata key if threshold and dispatch frequency are met. If the
// criteria is not met, dispatcher goes back to wait mode until the next
// dispatch attempt.
// The underlying grpc connection to analyzer is closed when the dispatcher
// goes to sleep mode.
func (d *Dispatcher) Run() {
for {
waitTime := d.computeWaitTime(time.Now())
shouldDisconnectWhileSleeping := true
if waitTime <= minWaitTime {
waitTime = minWaitTime
// Don't bother disconnecting and reconnecting for a 3 second sleep.
shouldDisconnectWhileSleeping = false
if shouldDisconnectWhileSleeping {
glog.V(3).Infoln("Close existing connection to Analyzer...")
glog.V(5).Infof("Dispatcher sleeping for [%v]...", waitTime)
if shouldDisconnectWhileSleeping {
glog.V(3).Infoln("Re-establish grpc connection to Analyzer before the next dispatch...")
d.lastDispatchTime = time.Now()
// Dispatch sends encoded observations to the Analyzer based on the following
// criteria:
// 1. The dispatch interval between attempts should be atleast
// |frequency_in_hours| as specified in the Shuffler configuration.
// 2. If frequency is met, Shuffler sends |ObservationBatch| to the Analyzer for
// each |ObservationMetadata| key if and only if:
// - The batch contains atleast |threshold| number of Observations, and
// - For each eligible batch, the Observations in that batch will be
// dispatched to the Analyzer and deleted from the Shuffler, and
// - For each batch whose Observations are not dispatched to the Analyzer
// because the batch size is too small, the Shuffler will delete those
// Observations from the batch whose age is at least |disposal_age_days|
// specified in the configuration.
// Between between buckets, and between the batches of a single bucket, we sleep
// for |sleepDuration|.
func (d *Dispatcher) dispatch(sleepDuration time.Duration) {
if == nil {
panic("Store handle is nil.")
if d.config == nil {
panic("Shuffler config is nil.")
glog.V(5).Infoln("Start dispatching ...")
keys, err :=
if err != nil {
stackdriver.LogCountMetricf(dispatchFailed, "GetKeys() failed with error: %v", err)
// Each bucket is either dispatched or disposed based on config and if there
// are errors, processing proceeds to the next bucket in the pipeline.
for _, key := range keys {
// Fetch bucket size for each key.
// We use the value returned from GetNumObservations() to determine whether
// or not to dispatch a bucket. But it's important to note that this value
// is not necessarily exactly equal to the number of Observations in the
// Store. This is because new Observations are being added to the store and
// the the count is being incremented asynchronously with this method and
// non-transactionally. In particular note that it is possible that the
// value returned from GetNumObservations() may, temporarily, be negative.
// We do maintain the following invariant: Let n = the value returned from
// GetNumObservations(). Then an invocation of GetObservations() by this
// same thread immediately afterwards will find at least n Observations.
// (The reason this invariant holds is that this is the only thread that
// ever deletes from the store or decrements the count. All other threads
// first add to the store, commit, and then increment the count.) This
// allows us to use the result of GetNumObservations() for conservative
// thresholding: We will not dispatch a bucket unless GetNumObservations()
// returns a value at least as large as the threshold.
bucketSize, err :=
glog.V(5).Infof("Bucket size from store: [%d]", bucketSize)
if err != nil {
stackdriver.LogCountMetricf(dispatchFailed, "GetNumObservations() failed for key: %v with error: %v", key, err)
// Compare bucket size to the configured limit.
if uint32(bucketSize) >= d.config.GetGlobalConfig().Threshold {
// Dispatch bucket associated with |key| and delete it after sending.
err := d.dispatchBucket(key, sleepDuration)
if err != nil {
stackdriver.LogCountMetricf(dispatchFailed, "dispatchBucket() failed for key: %v with error: %v", key, err)
} else {
// If threshold policy is not met, loop through the messages and check
// if any messages are in the queue for more than the allowed duration
// |disposal_age_days|. If found, discard them, otherwise queue it back
// in the store for the next dispatch event.
err = d.deleteOldObservations(key, storage.GetDayIndexUtc(time.Now()), d.config.GetGlobalConfig().DisposalAgeDays)
if err != nil {
stackdriver.LogCountMetricf(dispatchFailed, "Error in filtering Observations for key [%v]: %v", key, err)
// dispatchBucket dispatches the ObservationBatch associated with |key| in
// chunks of size |batchSize| to Analyzer using grpc transport.
// We sleep for |sleepDuration| between batches.
func (d *Dispatcher) dispatchBucket(key *cobalt.ObservationMetadata, sleepDuration time.Duration) error {
if key == nil {
panic("key is nil")
if d == nil {
panic("dispatcher is nil")
// Retrieve shuffled bucket from store for the given |key|
iterator, err :=
if err != nil {
stackdriver.LogCountMetricf(dispatchBucketFailed, "GetObservations() failed for key: %v with error: %v", key, err)
return err
// Send the shuffled bucket to Analyzer in chunks. If the bucket is too
// big, send it in multiple chunks of size |batchSize|.
batchID := 0
for {
glog.V(4).Infof("Sending observations to Analyzer in chunks, batch [%d] in progress...", batchID)
obVals, batchToSend := makeBatch(key, iterator, d.batchSize)
if len(obVals) == 0 {
// If makeBatch() returned an empty batch then the iteration is done.
sendErr := d.analyzerTransport.send(batchToSend)
if sendErr == nil {
// After successful send, delete the observations from the local
// datastore.
if err :=, obVals); err != nil {
stackdriver.LogCountMetricf(dispatchBucketFailed, "Error in deleting dispatched observations from the store for key: %v", key)
} else {
// TODO(ukode): Add retry behaviour for 3 or more attempts or use
// exponential backoff for errors relating to network issues.
stackdriver.LogCountMetricf(dispatchBucketFailed, "Error in transmitting data to Analyzer for key [%v]: %v", key, sendErr)
return nil
// deleteOldObservations deletes the observations for a given |key| from the
// store if the age of the observation is greater than the configured value
// |disposalAgeInDays|.
func (d *Dispatcher) deleteOldObservations(key *cobalt.ObservationMetadata,
currentDayIndex uint32, disposalAgeInDays uint32) error {
if key == nil {
panic("key is nil")
if d == nil {
panic("dispatcher is nil")
iterator, err :=
if err != nil {
stackdriver.LogCountMetricf(deleteOldObservationsFailed, "GetObservation call failed for key: %v with error: %v", key, err)
return nil
// We delete stale Observations iteratively in batches of size at most 1000.
const maxDeleteBatchSize = 1000
for {
var staleObVals []*shuffler.ObservationVal
for iterator.Next() {
obVal, err := iterator.Get()
if err != nil {
stackdriver.LogCountMetricf(deleteOldObservationsFailed, "deleteOldObservations: iterator.Get() returned an error: %v", err)
if currentDayIndex-obVal.ArrivalDayIndex > disposalAgeInDays {
staleObVals = append(staleObVals, obVal)
if len(staleObVals) == maxDeleteBatchSize {
if len(staleObVals) == 0 {
} else if err :=, staleObVals); err != nil {
return fmt.Errorf("Error [%v] in deleting old observations for metadata: %v", err, key)
return nil
// computeWaitTime returns the Duration until the next dispatch should occur.
// Note that this may be negative.
func (d *Dispatcher) computeWaitTime(currentTime time.Time) (waitTime time.Duration) {
if d == nil {
panic("Dispatcher is not set")
dispatchInterval := time.Duration(d.config.GetGlobalConfig().FrequencyInHours) * time.Hour
nextDispatchTime := d.lastDispatchTime.Add(dispatchInterval)
return nextDispatchTime.Sub(currentTime)
// makeBatch returns a new ObservationBatch for |key| consisting of the next
// chunk of observations from |iterator| of size at most |batchSize|.
func makeBatch(key *cobalt.ObservationMetadata, iterator storage.Iterator, batchSize int) ([]*shuffler.ObservationVal, *cobalt.ObservationBatch) {
if batchSize <= 0 {
panic("batchSize must be positive.")
var encryptedMessages []*cobalt.EncryptedMessage
var obVals []*shuffler.ObservationVal
for iterator.Next() {
obVal, err := iterator.Get()
if err != nil {
stackdriver.LogCountMetricf(makeBatchFailed, "makeBatch: iterator.Get() returned an error: %v", err)
obVals = append(obVals, obVal)
encryptedMessages = append(encryptedMessages, obVal.EncryptedObservation)
if len(encryptedMessages) == batchSize {
batch := cobalt.ObservationBatch{
MetaData: key,
EncryptedObservation: encryptedMessages,
return obVals, &batch