blob: 01245a6966ba092ffc90819443b9734b5d817c10 [file] [log] [blame]
package store
import (
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
pb "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/watch"
gogotypes "github.com/gogo/protobuf/types"
memdb "github.com/hashicorp/go-memdb"
"golang.org/x/net/context"
)
const (
indexID = "id"
indexName = "name"
indexRuntime = "runtime"
indexServiceID = "serviceid"
indexNodeID = "nodeid"
indexSlot = "slot"
indexDesiredState = "desiredstate"
indexTaskState = "taskstate"
indexRole = "role"
indexMembership = "membership"
indexNetwork = "network"
indexSecret = "secret"
indexConfig = "config"
indexKind = "kind"
indexCustom = "custom"
prefix = "_prefix"
// MaxChangesPerTransaction is the number of changes after which a new
// transaction should be started within Batch.
MaxChangesPerTransaction = 200
// MaxTransactionBytes is the maximum serialized transaction size.
MaxTransactionBytes = 1.5 * 1024 * 1024
)
var (
// ErrExist is returned by create operations if the provided ID is already
// taken.
ErrExist = errors.New("object already exists")
// ErrNotExist is returned by altering operations (update, delete) if the
// provided ID is not found.
ErrNotExist = errors.New("object does not exist")
// ErrNameConflict is returned by create/update if the object name is
// already in use by another object.
ErrNameConflict = errors.New("name conflicts with an existing object")
// ErrInvalidFindBy is returned if an unrecognized type is passed to Find.
ErrInvalidFindBy = errors.New("invalid find argument type")
// ErrSequenceConflict is returned when trying to update an object
// whose sequence information does not match the object in the store's.
ErrSequenceConflict = errors.New("update out of sequence")
objectStorers []ObjectStoreConfig
schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{},
}
errUnknownStoreAction = errors.New("unknown store action")
// WedgeTimeout is the maximum amount of time the store lock may be
// held before declaring a suspected deadlock.
WedgeTimeout = 30 * time.Second
)
func register(os ObjectStoreConfig) {
objectStorers = append(objectStorers, os)
schema.Tables[os.Table.Name] = os.Table
}
// timedMutex wraps a sync.Mutex, and keeps track of when it was locked.
type timedMutex struct {
sync.Mutex
lockedAt atomic.Value
}
func (m *timedMutex) Lock() {
m.Mutex.Lock()
m.lockedAt.Store(time.Now())
}
func (m *timedMutex) Unlock() {
m.Mutex.Unlock()
m.lockedAt.Store(time.Time{})
}
func (m *timedMutex) LockedAt() time.Time {
lockedTimestamp := m.lockedAt.Load()
if lockedTimestamp == nil {
return time.Time{}
}
return lockedTimestamp.(time.Time)
}
// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
// updateLock must be held during an update transaction.
updateLock timedMutex
memDB *memdb.MemDB
queue *watch.Queue
proposer state.Proposer
}
// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
memDB, err := memdb.NewMemDB(schema)
if err != nil {
// This shouldn't fail
panic(err)
}
return &MemoryStore{
memDB: memDB,
queue: watch.NewQueue(),
proposer: proposer,
}
}
// Close closes the memory store and frees its associated resources.
func (s *MemoryStore) Close() error {
return s.queue.Close()
}
func fromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
// Add the null character as a terminator
arg += "\x00"
return []byte(arg), nil
}
func prefixFromArgs(args ...interface{}) ([]byte, error) {
val, err := fromArgs(args...)
if err != nil {
return nil, err
}
// Strip the null terminator, the rest is a prefix
n := len(val)
if n > 0 {
return val[:n-1], nil
}
return val, nil
}
// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
lookup(table, index, id string) api.StoreObject
get(table, id string) api.StoreObject
find(table string, by By, checkType func(By) error, appendResult func(api.StoreObject)) error
}
type readTx struct {
memDBTx *memdb.Txn
}
// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
memDBTx := s.memDB.Txn(false)
readTx := readTx{
memDBTx: memDBTx,
}
cb(readTx)
memDBTx.Commit()
}
// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
ReadTx
create(table string, o api.StoreObject) error
update(table string, o api.StoreObject) error
delete(table, id string) error
}
type tx struct {
readTx
curVersion *api.Version
changelist []api.Event
}
// changelistBetweenVersions returns the changes after "from" up to and
// including "to".
func (s *MemoryStore) changelistBetweenVersions(from, to api.Version) ([]api.Event, error) {
if s.proposer == nil {
return nil, errors.New("store does not support versioning")
}
changes, err := s.proposer.ChangesBetween(from, to)
if err != nil {
return nil, err
}
var changelist []api.Event
for _, change := range changes {
for _, sa := range change.StoreActions {
event, err := api.EventFromStoreAction(sa, nil)
if err != nil {
return nil, err
}
changelist = append(changelist, event)
}
changelist = append(changelist, state.EventCommit{Version: change.Version.Copy()})
}
return changelist, nil
}
// ApplyStoreActions updates a store based on StoreAction messages.
func (s *MemoryStore) ApplyStoreActions(actions []api.StoreAction) error {
s.updateLock.Lock()
memDBTx := s.memDB.Txn(true)
tx := tx{
readTx: readTx{
memDBTx: memDBTx,
},
}
for _, sa := range actions {
if err := applyStoreAction(&tx, sa); err != nil {
memDBTx.Abort()
s.updateLock.Unlock()
return err
}
}
memDBTx.Commit()
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
s.queue.Publish(state.EventCommit{})
}
s.updateLock.Unlock()
return nil
}
func applyStoreAction(tx Tx, sa api.StoreAction) error {
for _, os := range objectStorers {
err := os.ApplyStoreAction(tx, sa)
if err != errUnknownStoreAction {
return err
}
}
return errors.New("unrecognized action type")
}
func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
s.updateLock.Lock()
memDBTx := s.memDB.Txn(true)
var curVersion *api.Version
if proposer != nil {
curVersion = proposer.GetVersion()
}
var tx tx
tx.init(memDBTx, curVersion)
err := cb(&tx)
if err == nil {
if proposer == nil {
memDBTx.Commit()
} else {
var sa []api.StoreAction
sa, err = tx.changelistStoreActions()
if err == nil {
if len(sa) != 0 {
err = proposer.ProposeValue(context.Background(), sa, func() {
memDBTx.Commit()
})
} else {
memDBTx.Commit()
}
}
}
}
if err == nil {
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
if proposer != nil {
curVersion = proposer.GetVersion()
}
s.queue.Publish(state.EventCommit{Version: curVersion})
}
} else {
memDBTx.Abort()
}
s.updateLock.Unlock()
return err
}
func (s *MemoryStore) updateLocal(cb func(Tx) error) error {
return s.update(nil, cb)
}
// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
return s.update(s.proposer, cb)
}
// Batch provides a mechanism to batch updates to a store.
type Batch struct {
tx tx
store *MemoryStore
// applied counts the times Update has run successfully
applied int
// transactionSizeEstimate is the running count of the size of the
// current transaction.
transactionSizeEstimate int
// changelistLen is the last known length of the transaction's
// changelist.
changelistLen int
err error
}
// Update adds a single change to a batch. Each call to Update is atomic, but
// different calls to Update may be spread across multiple transactions to
// circumvent transaction size limits.
func (batch *Batch) Update(cb func(Tx) error) error {
if batch.err != nil {
return batch.err
}
if err := cb(&batch.tx); err != nil {
return err
}
batch.applied++
for batch.changelistLen < len(batch.tx.changelist) {
sa, err := api.NewStoreAction(batch.tx.changelist[batch.changelistLen])
if err != nil {
return err
}
batch.transactionSizeEstimate += sa.Size()
batch.changelistLen++
}
if batch.changelistLen >= MaxChangesPerTransaction || batch.transactionSizeEstimate >= (MaxTransactionBytes*3)/4 {
if err := batch.commit(); err != nil {
return err
}
// Yield the update lock
batch.store.updateLock.Unlock()
runtime.Gosched()
batch.store.updateLock.Lock()
batch.newTx()
}
return nil
}
func (batch *Batch) newTx() {
var curVersion *api.Version
if batch.store.proposer != nil {
curVersion = batch.store.proposer.GetVersion()
}
batch.tx.init(batch.store.memDB.Txn(true), curVersion)
batch.transactionSizeEstimate = 0
batch.changelistLen = 0
}
func (batch *Batch) commit() error {
if batch.store.proposer != nil {
var sa []api.StoreAction
sa, batch.err = batch.tx.changelistStoreActions()
if batch.err == nil {
if len(sa) != 0 {
batch.err = batch.store.proposer.ProposeValue(context.Background(), sa, func() {
batch.tx.memDBTx.Commit()
})
} else {
batch.tx.memDBTx.Commit()
}
}
} else {
batch.tx.memDBTx.Commit()
}
if batch.err != nil {
batch.tx.memDBTx.Abort()
return batch.err
}
for _, c := range batch.tx.changelist {
batch.store.queue.Publish(c)
}
if len(batch.tx.changelist) != 0 {
batch.store.queue.Publish(state.EventCommit{})
}
return nil
}
// Batch performs one or more transactions that allow reads and writes
// It invokes a callback that is passed a Batch object. The callback may
// call batch.Update for each change it wants to make as part of the
// batch. The changes in the batch may be split over multiple
// transactions if necessary to keep transactions below the size limit.
// Batch holds a lock over the state, but will yield this lock every
// it creates a new transaction to allow other writers to proceed.
// Thus, unrelated changes to the state may occur between calls to
// batch.Update.
//
// This method allows the caller to iterate over a data set and apply
// changes in sequence without holding the store write lock for an
// excessive time, or producing a transaction that exceeds the maximum
// size.
//
// If Batch returns an error, no guarantees are made about how many updates
// were committed successfully.
func (s *MemoryStore) Batch(cb func(*Batch) error) error {
s.updateLock.Lock()
batch := Batch{
store: s,
}
batch.newTx()
if err := cb(&batch); err != nil {
batch.tx.memDBTx.Abort()
s.updateLock.Unlock()
return err
}
err := batch.commit()
s.updateLock.Unlock()
return err
}
func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
tx.memDBTx = memDBTx
tx.curVersion = curVersion
tx.changelist = nil
}
func (tx tx) changelistStoreActions() ([]api.StoreAction, error) {
var actions []api.StoreAction
for _, c := range tx.changelist {
sa, err := api.NewStoreAction(c)
if err != nil {
return nil, err
}
actions = append(actions, sa)
}
return actions, nil
}
// lookup is an internal typed wrapper around memdb.
func (tx readTx) lookup(table, index, id string) api.StoreObject {
j, err := tx.memDBTx.First(table, index, id)
if err != nil {
return nil
}
if j != nil {
return j.(api.StoreObject)
}
return nil
}
// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o api.StoreObject) error {
if tx.lookup(table, indexID, o.GetID()) != nil {
return ErrExist
}
copy := o.CopyStoreObject()
meta := copy.GetMeta()
if err := touchMeta(&meta, tx.curVersion); err != nil {
return err
}
copy.SetMeta(meta)
err := tx.memDBTx.Insert(table, copy)
if err == nil {
tx.changelist = append(tx.changelist, copy.EventCreate())
o.SetMeta(meta)
}
return err
}
// Update updates an existing object in the store.
// Returns ErrNotExist if the object doesn't exist.
func (tx *tx) update(table string, o api.StoreObject) error {
oldN := tx.lookup(table, indexID, o.GetID())
if oldN == nil {
return ErrNotExist
}
meta := o.GetMeta()
if tx.curVersion != nil {
if oldN.GetMeta().Version != meta.Version {
return ErrSequenceConflict
}
}
copy := o.CopyStoreObject()
if err := touchMeta(&meta, tx.curVersion); err != nil {
return err
}
copy.SetMeta(meta)
err := tx.memDBTx.Insert(table, copy)
if err == nil {
tx.changelist = append(tx.changelist, copy.EventUpdate(oldN))
o.SetMeta(meta)
}
return err
}
// Delete removes an object from the store.
// Returns ErrNotExist if the object doesn't exist.
func (tx *tx) delete(table, id string) error {
n := tx.lookup(table, indexID, id)
if n == nil {
return ErrNotExist
}
err := tx.memDBTx.Delete(table, n)
if err == nil {
tx.changelist = append(tx.changelist, n.EventDelete())
}
return err
}
// Get looks up an object by ID.
// Returns nil if the object doesn't exist.
func (tx readTx) get(table, id string) api.StoreObject {
o := tx.lookup(table, indexID, id)
if o == nil {
return nil
}
return o.CopyStoreObject()
}
// findIterators returns a slice of iterators. The union of items from these
// iterators provides the result of the query.
func (tx readTx) findIterators(table string, by By, checkType func(By) error) ([]memdb.ResultIterator, error) {
switch by.(type) {
case byAll, orCombinator: // generic types
default: // all other types
if err := checkType(by); err != nil {
return nil, err
}
}
switch v := by.(type) {
case byAll:
it, err := tx.memDBTx.Get(table, indexID)
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case orCombinator:
var iters []memdb.ResultIterator
for _, subBy := range v.bys {
it, err := tx.findIterators(table, subBy, checkType)
if err != nil {
return nil, err
}
iters = append(iters, it...)
}
return iters, nil
case byName:
it, err := tx.memDBTx.Get(table, indexName, strings.ToLower(string(v)))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byIDPrefix:
it, err := tx.memDBTx.Get(table, indexID+prefix, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byNamePrefix:
it, err := tx.memDBTx.Get(table, indexName+prefix, strings.ToLower(string(v)))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byRuntime:
it, err := tx.memDBTx.Get(table, indexRuntime, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byNode:
it, err := tx.memDBTx.Get(table, indexNodeID, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byService:
it, err := tx.memDBTx.Get(table, indexServiceID, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case bySlot:
it, err := tx.memDBTx.Get(table, indexSlot, v.serviceID+"\x00"+strconv.FormatUint(uint64(v.slot), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byDesiredState:
it, err := tx.memDBTx.Get(table, indexDesiredState, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byTaskState:
it, err := tx.memDBTx.Get(table, indexTaskState, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byRole:
it, err := tx.memDBTx.Get(table, indexRole, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byMembership:
it, err := tx.memDBTx.Get(table, indexMembership, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedNetworkID:
it, err := tx.memDBTx.Get(table, indexNetwork, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedSecretID:
it, err := tx.memDBTx.Get(table, indexSecret, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedConfigID:
it, err := tx.memDBTx.Get(table, indexConfig, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byKind:
it, err := tx.memDBTx.Get(table, indexKind, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byCustom:
var key string
if v.objType != "" {
key = v.objType + "|" + v.index + "|" + v.value
} else {
key = v.index + "|" + v.value
}
it, err := tx.memDBTx.Get(table, indexCustom, key)
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byCustomPrefix:
var key string
if v.objType != "" {
key = v.objType + "|" + v.index + "|" + v.value
} else {
key = v.index + "|" + v.value
}
it, err := tx.memDBTx.Get(table, indexCustom+prefix, key)
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
default:
return nil, ErrInvalidFindBy
}
}
// find selects a set of objects calls a callback for each matching object.
func (tx readTx) find(table string, by By, checkType func(By) error, appendResult func(api.StoreObject)) error {
fromResultIterators := func(its ...memdb.ResultIterator) {
ids := make(map[string]struct{})
for _, it := range its {
for {
obj := it.Next()
if obj == nil {
break
}
o := obj.(api.StoreObject)
id := o.GetID()
if _, exists := ids[id]; !exists {
appendResult(o.CopyStoreObject())
ids[id] = struct{}{}
}
}
}
}
iters, err := tx.findIterators(table, by, checkType)
if err != nil {
return err
}
fromResultIterators(iters...)
return nil
}
// Save serializes the data in the store.
func (s *MemoryStore) Save(tx ReadTx) (*pb.StoreSnapshot, error) {
var snapshot pb.StoreSnapshot
for _, os := range objectStorers {
if err := os.Save(tx, &snapshot); err != nil {
return nil, err
}
}
return &snapshot, nil
}
// Restore sets the contents of the store to the serialized data in the
// argument.
func (s *MemoryStore) Restore(snapshot *pb.StoreSnapshot) error {
return s.updateLocal(func(tx Tx) error {
for _, os := range objectStorers {
if err := os.Restore(tx, snapshot); err != nil {
return err
}
}
return nil
})
}
// WatchQueue returns the publish/subscribe queue.
func (s *MemoryStore) WatchQueue() *watch.Queue {
return s.queue
}
// ViewAndWatch calls a callback which can observe the state of this
// MemoryStore. It also returns a channel that will return further events from
// this point so the snapshot can be kept up to date. The watch channel must be
// released with watch.StopWatch when it is no longer needed. The channel is
// guaranteed to get all events after the moment of the snapshot, and only
// those events.
func ViewAndWatch(store *MemoryStore, cb func(ReadTx) error, specifiers ...api.Event) (watch chan events.Event, cancel func(), err error) {
// Using Update to lock the store and guarantee consistency between
// the watcher and the the state seen by the callback. snapshotReadTx
// exposes this Tx as a ReadTx so the callback can't modify it.
err = store.Update(func(tx Tx) error {
if err := cb(tx); err != nil {
return err
}
watch, cancel = state.Watch(store.WatchQueue(), specifiers...)
return nil
})
if watch != nil && err != nil {
cancel()
cancel = nil
watch = nil
}
return
}
// WatchFrom returns a channel that will return past events from starting
// from "version", and new events until the channel is closed. If "version"
// is nil, this function is equivalent to
//
// state.Watch(store.WatchQueue(), specifiers...).
//
// If the log has been compacted and it's not possible to produce the exact
// set of events leading from "version" to the current state, this function
// will return an error, and the caller should re-sync.
//
// The watch channel must be released with watch.StopWatch when it is no
// longer needed.
func WatchFrom(store *MemoryStore, version *api.Version, specifiers ...api.Event) (chan events.Event, func(), error) {
if version == nil {
ch, cancel := state.Watch(store.WatchQueue(), specifiers...)
return ch, cancel, nil
}
if store.proposer == nil {
return nil, nil, errors.New("store does not support versioning")
}
var (
curVersion *api.Version
watch chan events.Event
cancelWatch func()
)
// Using Update to lock the store
err := store.Update(func(tx Tx) error {
// Get current version
curVersion = store.proposer.GetVersion()
// Start the watch with the store locked so events cannot be
// missed
watch, cancelWatch = state.Watch(store.WatchQueue(), specifiers...)
return nil
})
if watch != nil && err != nil {
cancelWatch()
return nil, nil, err
}
if curVersion == nil {
cancelWatch()
return nil, nil, errors.New("could not get current version from store")
}
changelist, err := store.changelistBetweenVersions(*version, *curVersion)
if err != nil {
cancelWatch()
return nil, nil, err
}
ch := make(chan events.Event)
stop := make(chan struct{})
cancel := func() {
close(stop)
}
go func() {
defer cancelWatch()
matcher := state.Matcher(specifiers...)
for _, change := range changelist {
if matcher(change) {
select {
case ch <- change:
case <-stop:
return
}
}
}
for {
select {
case <-stop:
return
case e := <-watch:
ch <- e
}
}
}()
return ch, cancel, nil
}
// touchMeta updates an object's timestamps when necessary and bumps the version
// if provided.
func touchMeta(meta *api.Meta, version *api.Version) error {
// Skip meta update if version is not defined as it means we're applying
// from raft or restoring from a snapshot.
if version == nil {
return nil
}
now, err := gogotypes.TimestampProto(time.Now())
if err != nil {
return err
}
meta.Version = *version
// Updated CreatedAt if not defined
if meta.CreatedAt == nil {
meta.CreatedAt = now
}
meta.UpdatedAt = now
return nil
}
// Wedged returns true if the store lock has been held for a long time,
// possibly indicating a deadlock.
func (s *MemoryStore) Wedged() bool {
lockedAt := s.updateLock.LockedAt()
if lockedAt.IsZero() {
return false
}
return time.Since(lockedAt) > WedgeTimeout
}