blob: 3eaaac6d987461b32b304d9684e8890c7d144c7c [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"math/rand"
"strconv"
"sync"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"cobalt"
"shuffler"
rand_util "util"
)
var randGen rand_util.Random
// MemStore is an in-memory implementation of the Store interface.
type MemStore struct {
// ObservationsMap is a map for storing observations. Map keys are serialized
// |ObservationMetadata| strings that point to a map of |ObservationVal|s.
//
// Map keys for |ObservationVal| map are the same identifiers that uniquely
// represent the |ObservationVal| in the data store.
observationsMap map[string]map[string]*shuffler.ObservationVal
// mu is the global mutex that protects all elements of the store
mu sync.RWMutex
}
// NewMemStore creates an empty MemStore.
func NewMemStore() *MemStore {
randGen = rand_util.NewDeterministicRandom(int64(1))
return &MemStore{
observationsMap: make(map[string]map[string]*shuffler.ObservationVal),
}
}
// Key returns the text representation of the given |ObservationMetadata|.
func key(om *cobalt.ObservationMetadata) string {
if om == nil {
return ""
}
return proto.CompactTextString(om)
}
// shuffle returns a random ordering of input ObservationVals.
func shuffle(obVals []*shuffler.ObservationVal) []*shuffler.ObservationVal {
numObservations := len(obVals)
// Get a random ordering for all messages. We assume that the random
// number generator is appropriately seeded.
perm := rand.Perm(numObservations)
shuffledObservations := make([]*shuffler.ObservationVal, numObservations)
for i, rnd := range perm {
shuffledObservations[i] = obVals[rnd]
}
return shuffledObservations
}
// AddAllObservations adds all of the encrypted observations in all of the
// ObservationBatches in |envelopeBatch| to the store. New |ObservationVal|s
// are created to hold the values and the given |arrivalDayIndex|. Returns a
// non-nil error if the arguments are invalid or the operation fails.
func (store *MemStore) AddAllObservations(envelopeBatch []*cobalt.ObservationBatch, dayIndex uint32) error {
store.mu.Lock()
defer store.mu.Unlock()
for _, batch := range envelopeBatch {
if batch != nil {
om := batch.GetMetaData()
if om == nil {
return grpc.Errorf(codes.InvalidArgument, "One of the ObservationBatches did not have meta_data set")
}
glog.V(3).Infoln(fmt.Sprintf("Received a batch of %d encrypted Observations.", len(batch.GetEncryptedObservation())))
for _, encryptedObservation := range batch.GetEncryptedObservation() {
if encryptedObservation == nil {
return grpc.Errorf(codes.InvalidArgument, "The ObservationBatch with key %v contained a Null encrypted_observation", om)
}
id, err := randGen.RandomUint63(1<<63 - 1)
if err != nil {
return grpc.Errorf(codes.Internal, "Error in generating unique identifier for key [%v]: %v", om, err)
}
valMap, ok := store.observationsMap[key(om)]
if !ok {
valMap = make(map[string]*shuffler.ObservationVal)
store.observationsMap[key(om)] = valMap
}
idStr := strconv.Itoa(int(id))
valMap[idStr] = NewObservationVal(encryptedObservation, idStr, dayIndex)
}
}
}
return nil
}
// GetObservations returns a MemStoreIterator to iterate through the shuffled
// list of ObservationVals from the data store for the given
// |ObservationMetadata| key or returns an error.
func (store *MemStore) GetObservations(om *cobalt.ObservationMetadata) (Iterator, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if om == nil {
panic("om is nil")
}
// get ObservationVal map for the given key
valMap, present := store.observationsMap[key(om)]
if !present {
return nil, grpc.Errorf(codes.InvalidArgument, "Key %v not found", om)
}
// make return slice from ObservationVal map
var obVals []*shuffler.ObservationVal
for _, val := range valMap {
obVals = append(obVals, val)
}
// Shuffler data store layer guarantees that the list returned on Get() call
// is always shuffled. In memstore, this is acheieved by shuffling the
// |ObservationVal| result set.
iter := NewMemStoreIterator(shuffle(obVals))
return iter, nil
}
// GetKeys returns the list of all |ObservationMetadata| keys stored in the
// data store or returns an error.
func (store *MemStore) GetKeys() ([]*cobalt.ObservationMetadata, error) {
store.mu.RLock()
defer store.mu.RUnlock()
keys := []*cobalt.ObservationMetadata{}
for k := range store.observationsMap {
om := &cobalt.ObservationMetadata{}
err := proto.UnmarshalText(k, om)
if err != nil {
return nil, grpc.Errorf(codes.Internal, "Error in parsing keys: %v", err)
}
keys = append(keys, om)
}
return keys, nil
}
// DeleteValues deletes the given |ObservationVal|s for |ObservationMetadata|
// key from the data store or returns an error.
func (store *MemStore) DeleteValues(om *cobalt.ObservationMetadata, deleteObVals []*shuffler.ObservationVal) error {
store.mu.Lock()
defer store.mu.Unlock()
if om == nil {
panic("om is nil")
}
valMap, present := store.observationsMap[key(om)]
if !present {
return grpc.Errorf(codes.InvalidArgument, "Key %v not found", om)
}
for _, obVal := range deleteObVals {
delete(valMap, obVal.Id)
}
if len(valMap) == 0 {
delete(store.observationsMap, key(om))
}
return nil
}
// GetNumObservations returns the total count of ObservationVals in the data
// store for the given |ObservationMmetadata| key or returns an error.
func (store *MemStore) GetNumObservations(om *cobalt.ObservationMetadata) (int, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if om == nil {
panic("om is nil")
}
valMap, present := store.observationsMap[key(om)]
if !present {
return 0, grpc.Errorf(codes.InvalidArgument, "Key %v not found", om)
}
return len(valMap), nil
}
// Reset clears the existing in-memory state for |store|.
func (store *MemStore) Reset() {
store.mu.Lock()
defer store.mu.Unlock()
store.observationsMap = make(map[string]map[string]*shuffler.ObservationVal)
}