blob: b0b22212b54a8fa7bb14579e12f5fc3255840b19 [file] [log] [blame]
package metadata
import (
"bytes"
"encoding/json"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
)
const (
mainBucket = "_main"
indexBucket = "_index"
externalBucket = "_external"
)
var errNotFound = errors.Errorf("not found")
type Store struct {
db *bolt.DB
}
func NewStore(dbPath string) (*Store, error) {
db, err := bolt.Open(dbPath, 0600, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open database file %s", dbPath)
}
return &Store{db: db}, nil
}
func (s *Store) DB() *bolt.DB {
return s.db
}
func (s *Store) All() ([]*StorageItem, error) {
var out []*StorageItem
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(mainBucket))
if b == nil {
return nil
}
return b.ForEach(func(key, _ []byte) error {
b := b.Bucket(key)
if b == nil {
return nil
}
si, err := newStorageItem(string(key), b, s)
if err != nil {
return err
}
out = append(out, si)
return nil
})
})
return out, errors.WithStack(err)
}
func (s *Store) Probe(index string) (bool, error) {
var exists bool
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(indexBucket))
if b == nil {
return nil
}
main := tx.Bucket([]byte(mainBucket))
if main == nil {
return nil
}
search := []byte(indexKey(index, ""))
c := b.Cursor()
k, _ := c.Seek(search)
if k != nil && bytes.HasPrefix(k, search) {
exists = true
}
return nil
})
return exists, errors.WithStack(err)
}
func (s *Store) Search(index string) ([]*StorageItem, error) {
var out []*StorageItem
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(indexBucket))
if b == nil {
return nil
}
main := tx.Bucket([]byte(mainBucket))
if main == nil {
return nil
}
index = indexKey(index, "")
c := b.Cursor()
k, _ := c.Seek([]byte(index))
for {
if k != nil && strings.HasPrefix(string(k), index) {
itemID := strings.TrimPrefix(string(k), index)
k, _ = c.Next()
b := main.Bucket([]byte(itemID))
if b == nil {
logrus.Errorf("index pointing to missing record %s", itemID)
continue
}
si, err := newStorageItem(itemID, b, s)
if err != nil {
return err
}
out = append(out, si)
} else {
break
}
}
return nil
})
return out, errors.WithStack(err)
}
func (s *Store) View(id string, fn func(b *bolt.Bucket) error) error {
return s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(mainBucket))
if b == nil {
return errors.WithStack(errNotFound)
}
b = b.Bucket([]byte(id))
if b == nil {
return errors.WithStack(errNotFound)
}
return fn(b)
})
}
func (s *Store) Clear(id string) error {
return errors.WithStack(s.db.Update(func(tx *bolt.Tx) error {
external := tx.Bucket([]byte(externalBucket))
if external != nil {
external.DeleteBucket([]byte(id))
}
main := tx.Bucket([]byte(mainBucket))
if main == nil {
return nil
}
b := main.Bucket([]byte(id))
if b == nil {
return nil
}
si, err := newStorageItem(id, b, s)
if err != nil {
return err
}
if indexes := si.Indexes(); len(indexes) > 0 {
b := tx.Bucket([]byte(indexBucket))
if b != nil {
for _, index := range indexes {
if err := b.Delete([]byte(indexKey(index, id))); err != nil {
return err
}
}
}
}
return main.DeleteBucket([]byte(id))
}))
}
func (s *Store) Update(id string, fn func(b *bolt.Bucket) error) error {
return errors.WithStack(s.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(mainBucket))
if err != nil {
return errors.WithStack(err)
}
b, err = b.CreateBucketIfNotExists([]byte(id))
if err != nil {
return errors.WithStack(err)
}
return fn(b)
}))
}
func (s *Store) Get(id string) (*StorageItem, bool) {
empty := func() *StorageItem {
si, _ := newStorageItem(id, nil, s)
return si
}
tx, err := s.db.Begin(false)
if err != nil {
return empty(), false
}
defer tx.Rollback()
b := tx.Bucket([]byte(mainBucket))
if b == nil {
return empty(), false
}
b = b.Bucket([]byte(id))
if b == nil {
return empty(), false
}
si, _ := newStorageItem(id, b, s)
return si, true
}
func (s *Store) Close() error {
return errors.WithStack(s.db.Close())
}
type StorageItem struct {
id string
values map[string]*Value
queue []func(*bolt.Bucket) error
storage *Store
mu sync.RWMutex
}
func newStorageItem(id string, b *bolt.Bucket, s *Store) (*StorageItem, error) {
si := &StorageItem{
id: id,
storage: s,
values: make(map[string]*Value),
}
if b != nil {
if err := b.ForEach(func(k, v []byte) error {
var sv Value
if len(v) > 0 {
if err := json.Unmarshal(v, &sv); err != nil {
return errors.WithStack(err)
}
si.values[string(k)] = &sv
}
return nil
}); err != nil {
return si, errors.WithStack(err)
}
}
return si, nil
}
func (s *StorageItem) Storage() *Store { // TODO: used in local source. how to remove this?
return s.storage
}
func (s *StorageItem) ID() string {
return s.id
}
func (s *StorageItem) View(fn func(b *bolt.Bucket) error) error {
return s.storage.View(s.id, fn)
}
func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
return s.storage.Update(s.id, fn)
}
func (s *StorageItem) Metadata() *StorageItem {
return s
}
func (s *StorageItem) Keys() []string {
keys := make([]string, 0, len(s.values))
for k := range s.values {
keys = append(keys, k)
}
return keys
}
func (s *StorageItem) Get(k string) *Value {
s.mu.RLock()
v := s.values[k]
s.mu.RUnlock()
return v
}
func (s *StorageItem) GetExternal(k string) ([]byte, error) {
var dt []byte
err := s.storage.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(externalBucket))
if b == nil {
return errors.WithStack(errNotFound)
}
b = b.Bucket([]byte(s.id))
if b == nil {
return errors.WithStack(errNotFound)
}
dt = b.Get([]byte(k))
if dt == nil {
return errors.WithStack(errNotFound)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return dt, nil
}
func (s *StorageItem) SetExternal(k string, dt []byte) error {
return errors.WithStack(s.storage.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(externalBucket))
if err != nil {
return errors.WithStack(err)
}
b, err = b.CreateBucketIfNotExists([]byte(s.id))
if err != nil {
return errors.WithStack(err)
}
return b.Put([]byte(k), dt)
}))
}
func (s *StorageItem) Queue(fn func(b *bolt.Bucket) error) {
s.mu.Lock()
defer s.mu.Unlock()
s.queue = append(s.queue, fn)
}
func (s *StorageItem) Commit() error {
s.mu.Lock()
defer s.mu.Unlock()
return errors.WithStack(s.Update(func(b *bolt.Bucket) error {
for _, fn := range s.queue {
if err := fn(b); err != nil {
return errors.WithStack(err)
}
}
s.queue = s.queue[:0]
return nil
}))
}
func (s *StorageItem) Indexes() (out []string) {
for _, v := range s.values {
if v.Index != "" {
out = append(out, v.Index)
}
}
return
}
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
if v == nil {
if old, ok := s.values[key]; ok {
if old.Index != "" {
b, err := b.Tx().CreateBucketIfNotExists([]byte(indexBucket))
if err != nil {
return errors.WithStack(err)
}
b.Delete([]byte(indexKey(old.Index, s.ID()))) // ignore error
}
}
if err := b.Put([]byte(key), nil); err != nil {
return err
}
delete(s.values, key)
return nil
}
dt, err := json.Marshal(v)
if err != nil {
return errors.WithStack(err)
}
if err := b.Put([]byte(key), dt); err != nil {
return errors.WithStack(err)
}
if v.Index != "" {
b, err := b.Tx().CreateBucketIfNotExists([]byte(indexBucket))
if err != nil {
return errors.WithStack(err)
}
if err := b.Put([]byte(indexKey(v.Index, s.ID())), []byte{}); err != nil {
return errors.WithStack(err)
}
}
s.values[key] = v
return nil
}
var ErrSkipSetValue = errors.New("skip setting metadata value")
func (s *StorageItem) GetAndSetValue(key string, fn func(*Value) (*Value, error)) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.Update(func(b *bolt.Bucket) error {
v, err := fn(s.values[key])
if errors.Is(err, ErrSkipSetValue) {
return nil
} else if err != nil {
return err
}
return s.SetValue(b, key, v)
})
}
type Value struct {
Value json.RawMessage `json:"value,omitempty"`
Index string `json:"index,omitempty"`
}
func NewValue(v interface{}) (*Value, error) {
dt, err := json.Marshal(v)
if err != nil {
return nil, errors.WithStack(err)
}
return &Value{Value: json.RawMessage(dt)}, nil
}
func (v *Value) Unmarshal(target interface{}) error {
return errors.WithStack(json.Unmarshal(v.Value, target))
}
func indexKey(index, target string) string {
return index + "::" + target
}