blob: 83d9f1f081d399e3934d8f2282bd2e88c97f6350 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"os"
"runtime"
"strings"
"sync"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"cobalt"
"shuffler"
"util/stackdriver"
)
const (
initializeFailed = "leveldb-store-initialize-failed"
addAllObservationsFailed = "leveldb-store-add-all-observations-failed"
)
// LevelDBStore is an persistent store implementation of the Store interface.
type LevelDBStore struct {
// Path to leveldb database folder
dbDir string
// Observation database consisting of ObservationVals as values for each
// rowkey (|ObservationMetadata|_<random_identifier>).
db *leveldb.DB
// bucketSizes is a map from active buckets to their current sizes. A
// "bucket" is a set of ObservationVals with a common ObservationMetadata.
// The keys to this map are string representations of ObservationMetadata and
// the values are the number of entries in |db| corresponding to that
// ObservationMetadata. Note that a single bucket is represented by many rows
// of |db|.
//
// We use a signed integer to represent bucket sizes because it is possible
// for the value to temporarily be negative. This might occur in the
// following situation:
// (a) Thread 1 adds Observations to the Store.
// (b) Thread 2 reads these Observations from the Store, deletes them, and
// decrements the bucketSizes value.
// (c) Thread 1 increments the bucketSizes value.
// Between (b) and (c) the value may be negative.
bucketSizes map[string]int64
// mu is the global mutex that protects all elements of |bucketSizes| in-memory
// map.
mu sync.RWMutex
}
// NewLevelDBStore returns an implementation of store using LevelDB
// (https://github.com/google/leveldb).
func NewLevelDBStore(dbDirPath string) (*LevelDBStore, error) {
db, err := leveldb.OpenFile(dbDirPath, nil)
if err != nil {
if db != nil {
db.Close()
}
return nil, err
}
store := &LevelDBStore{
dbDir: dbDirPath,
db: db,
bucketSizes: make(map[string]int64),
}
if err := store.initialize(); err != nil {
return nil, err
}
return store, nil
}
// initialize populates in-memory metadata_db map by parsing rows from existing
// leveldb store.
func (store *LevelDBStore) initialize() error {
iter := store.db.NewIterator(nil, nil)
for iter.Next() {
dbKey := string(iter.Key())
bKey, err := ExtractBKey(dbKey)
if err != nil {
stackdriver.LogCountMetricln(initializeFailed, "Existing DB key [", dbKey, "] found corrupted: ", err)
continue
}
store.bucketSizes[bKey]++
}
iter.Release()
if err := iter.Error(); err != nil {
return err
}
return nil
}
// close closes the database files and unlocks any resources used by
// leveldb.
func (store *LevelDBStore) close() error {
if store.db != nil {
if err := store.db.Close(); err != nil {
return err
}
store.db = nil
}
runtime.GC()
return nil
}
// rowKeyPrefix returns the leveldb |prefixRange| for the given
// ObservationMetadata |om| or an error. RowKey prefix is used in generating
// unique row keys and also as an index into |bucketSizes| map for LevelDBStore.
func rowKeyPrefix(om *cobalt.ObservationMetadata) (prefixRange *leveldb_util.Range, err error) {
if om == nil {
panic("Metadata is nil")
}
bKey, err := BKey(om)
if err != nil {
return nil, err
}
prefix := strings.Join([]string{bKey}, "_")
return leveldb_util.BytesPrefix([]byte(prefix)), nil
}
// makeDBVal returns a serialized |ObservationVal| generated from the given
// |encryptedObservation|, |id| and |arrivalDayIndex|.
func makeDBVal(encryptedObservation *cobalt.EncryptedMessage, id string, arrivalDayIndex uint32) ([]byte, error) {
if encryptedObservation == nil {
panic("encryptedObservation is nil")
}
valBytes, err := proto.Marshal(NewObservationVal(encryptedObservation, id, arrivalDayIndex))
if err != nil {
return []byte(""), err
}
return valBytes, nil
}
// AddAllObservations adds all of the encrypted observations in all of the
// ObservationBatches in |envelopeBatch| to the store. New |ObservationVal|s
// are created to hold the values and the given |arrivalDayIndex|. Returns a
// non-nil error if the arguments are invalid or the operation fails.
func (store *LevelDBStore) AddAllObservations(envelopeBatch []*cobalt.ObservationBatch, arrivalDayIndex uint32) error {
dbBatch := new(leveldb.Batch)
tmpBucketSizes := make(map[string]int64)
// process all observations into a tmp |dbBatch|
for _, batch := range envelopeBatch {
if batch == nil {
return grpc.Errorf(codes.InvalidArgument, "One of the ObservationBatches in the Envelope is not set.")
}
om := batch.GetMetaData()
if om == nil {
return grpc.Errorf(codes.InvalidArgument, "The meta_data field is unset for one of the ObservationBatches.")
}
bKey, err := BKey(om)
if err != nil {
return grpc.Errorf(codes.Internal, "Error in making bucket key for metadata [%v]: [%v]", om, err)
}
glog.V(3).Infoln(fmt.Sprintf("Received a batch of %d encrypted Observations.", len(batch.GetEncryptedObservation())))
for _, encryptedObservation := range batch.GetEncryptedObservation() {
if encryptedObservation == nil {
return grpc.Errorf(codes.InvalidArgument, "One of the encrypted_observations in one of the ObservationBatches with metadata [%v] was null", om)
}
// generate a new random key for each encrypted observation
key, id, err := NewRowKey(bKey)
if err != nil {
stackdriver.LogCountMetricln(addAllObservationsFailed, "AddAllObservations() failed in generating PKey for metadata [", om, "]: ", err)
return grpc.Errorf(codes.Internal, "Error in processing observation metadata for batch [%v]", om)
}
// generate |ObservationVal| for each encrypted observation
val, err := makeDBVal(encryptedObservation, id, arrivalDayIndex)
if err != nil {
stackdriver.LogCountMetricln(addAllObservationsFailed, "AddAllObservations() failed in parsing observation value for metadata [", *om, "]: ", err)
return grpc.Errorf(codes.Internal, "Error in processing one of the observations for metadata [%v]", *om)
}
dbBatch.Put(key, val)
tmpBucketSizes[bKey]++
}
}
// Set db write options |Sync| to sync underlying writes from the OS buffer
// cache through to actual disk immediately and |NoWriteMerge| to disable
// write merge on concurrent access. Setting Sync can result in slower writes.
// If same key is specified twice, it will get overwritten by the most recent
// update.
woptions := &opt.WriteOptions{
NoWriteMerge: false,
Sync: true,
}
// commit |dbBatch|
if err := store.db.Write(dbBatch, woptions); err != nil {
stackdriver.LogCountMetricln(addAllObservationsFailed, "AddAllObservations failed with error:", err)
return grpc.Errorf(codes.Internal, "Internal error in processing the ObservationBatch.")
}
// update counts for all keys
store.mu.Lock()
defer store.mu.Unlock()
for k := range tmpBucketSizes {
store.bucketSizes[k] += tmpBucketSizes[k]
}
return nil
}
// GetObservations returns a LevelDBStoreIterator to iterate through the
// shuffled list of ObservationVals from the data store for the given
// |ObservationMetadata| key or returns an error.
func (store *LevelDBStore) GetObservations(om *cobalt.ObservationMetadata) (Iterator, error) {
if om == nil {
panic("observation metadata is nil")
}
keyPrefix, err := rowKeyPrefix(om)
if err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "Error in generating rowkey prefix for observation metadata [%v]: [%v]", *om, err)
}
iter := store.db.NewIterator(keyPrefix, nil)
return NewLevelDBStoreIterator(iter), nil
}
// GetKeys returns the list of all |ObservationMetadata| keys stored in the
// data store or returns an error.
func (store *LevelDBStore) GetKeys() ([]*cobalt.ObservationMetadata, error) {
store.mu.RLock()
defer store.mu.RUnlock()
keys := []*cobalt.ObservationMetadata{}
for bKey := range store.bucketSizes {
om, err := UnmarshalBKey(bKey)
if err != nil {
return nil, grpc.Errorf(codes.Internal, "Error in parsing observation metadata [%v]: [%v]", *om, err)
}
keys = append(keys, om)
}
return keys, nil
}
// DeleteValues deletes the given |ObservationVal|s for |ObservationMetadata|
// key from the data store or returns an error.
func (store *LevelDBStore) DeleteValues(om *cobalt.ObservationMetadata, obVals []*shuffler.ObservationVal) error {
if om == nil {
panic("observation metadata is nil")
}
if len(obVals) == 0 {
return nil
}
batch := new(leveldb.Batch)
for _, obVal := range obVals {
rowKey, err := RowKeyFromMetadata(om, obVal.Id)
if err != nil {
return grpc.Errorf(codes.InvalidArgument, "Error in making rowkey from observation metadata [%v]: [%v]", om, err)
}
batch.Delete([]byte(rowKey))
}
if err := store.db.Write(batch, nil); err != nil {
return grpc.Errorf(codes.Internal, "LevelDB write error: [%v]", err)
}
// update bucketSizes map for the deleted rows
store.mu.Lock()
defer store.mu.Unlock()
bKey, err := BKey(om)
if err != nil {
return grpc.Errorf(codes.InvalidArgument, "Error in parsing observation metadata [%v]: [%v]", om, err)
}
// Note that this decrement may cause the value of bucketSizes[bKey] to,
// temporarily, be negative. See explanation of how this might occur above.
store.bucketSizes[bKey] -= int64(len(obVals))
return nil
}
// GetNumObservations returns the total count of ObservationVals in the data
// store for the given |ObservationMmetadata| key or returns an error.
func (store *LevelDBStore) GetNumObservations(om *cobalt.ObservationMetadata) (int, error) {
if om == nil {
panic("observation metadata is nil")
}
bKey, err := BKey(om)
if err != nil {
return 0, grpc.Errorf(codes.InvalidArgument, "Error in parsing observation metadata [%v]: [%v]", om, err)
}
store.mu.RLock()
defer store.mu.RUnlock()
count, present := store.bucketSizes[bKey]
if !present {
return 0, grpc.Errorf(codes.InvalidArgument, "Observation metadata [%v] not found.", om)
}
return int(count), nil
}
// Reset clears any in-memory caches and deletes all data permanently from
// the |store| if |destroy| is set to true.
func (store *LevelDBStore) Reset(destroy bool) {
// clear bucketSizes map
store.mu.Lock()
defer store.mu.Unlock()
store.bucketSizes = make(map[string]int64)
// clear and reset db instance
store.close()
if destroy {
store.EraseAllData()
}
}
// EraseAllData erases all data in the LevelDB backend.
func (store *LevelDBStore) EraseAllData() {
os.RemoveAll(store.dbDir)
}