| package datastore |
| |
| import ( |
| "errors" |
| "path" |
| "strings" |
| "sync" |
| |
| store "github.com/moby/moby/v2/daemon/libnetwork/internal/kvstore" |
| "github.com/moby/moby/v2/daemon/libnetwork/internal/kvstore/boltdb" |
| "github.com/moby/moby/v2/daemon/libnetwork/types" |
| ) |
| |
| // ErrKeyModified is raised for an atomic update when the update is working on a stale state |
| var ( |
| ErrKeyModified = store.ErrKeyModified |
| ErrKeyNotFound = store.ErrKeyNotFound |
| ) |
| |
| type Store struct { |
| mu sync.Mutex |
| store store.Store |
| cache *cache |
| } |
| |
| // KVObject is Key/Value interface used by objects to be part of the Store. |
| type KVObject interface { |
| // Key method lets an object provide the Key to be used in KV Store |
| Key() []string |
| // KeyPrefix method lets an object return immediate parent key that can be used for tree walk |
| KeyPrefix() []string |
| // Value method lets an object marshal its content to be stored in the KV store |
| Value() []byte |
| // SetValue is used by the datastore to set the object's value when loaded from the data store. |
| SetValue([]byte) error |
| // Index method returns the latest DB Index as seen by the object |
| Index() uint64 |
| // SetIndex method allows the datastore to store the latest DB Index into the object |
| SetIndex(uint64) |
| // Exists returns true if the object exists in the datastore, false if it hasn't been stored yet. |
| // When SetIndex() is called, the object has been stored. |
| Exists() bool |
| // Skip provides a way for a KV Object to avoid persisting it in the KV Store |
| Skip() bool |
| // New returns a new object which is created based on the |
| // source object |
| New() KVObject |
| // CopyTo deep copies the contents of the implementing object |
| // to the passed destination object |
| CopyTo(KVObject) error |
| } |
| |
| const ( |
| // NetworkKeyPrefix is the prefix for network key in the kv store |
| NetworkKeyPrefix = "network" |
| // EndpointKeyPrefix is the prefix for endpoint key in the kv store |
| EndpointKeyPrefix = "endpoint" |
| ) |
| |
| var ( |
| defaultRootChain = []string{"docker", "network", "v1.0"} |
| rootChain = defaultRootChain |
| ) |
| |
| const DefaultBucket = "libnetwork" |
| |
| // Key provides convenient method to create a Key |
| func Key(key ...string) string { |
| var b strings.Builder |
| for _, parts := range [][]string{rootChain, key} { |
| for _, part := range parts { |
| b.WriteString(part) |
| b.WriteString("/") |
| } |
| } |
| return b.String() |
| } |
| |
| // New creates a new Store instance. |
| func New(dir, bucket string) (*Store, error) { |
| if dir == "" { |
| return nil, errors.New("empty dir") |
| } |
| if bucket == "" { |
| return nil, errors.New("empty bucket") |
| } |
| |
| s, err := boltdb.New(path.Join(dir, "local-kv.db"), bucket) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &Store{store: s, cache: newCache(s)}, nil |
| } |
| |
| // Close closes the data store. |
| func (ds *Store) Close() { |
| ds.store.Close() |
| } |
| |
| // PutObjectAtomic provides an atomic add and update operation for a Record. |
| func (ds *Store) PutObjectAtomic(kvObject KVObject) error { |
| ds.mu.Lock() |
| defer ds.mu.Unlock() |
| |
| if kvObject == nil { |
| return types.InvalidParameterErrorf("invalid KV Object: nil") |
| } |
| |
| kvObjValue := kvObject.Value() |
| |
| if kvObjValue == nil { |
| return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...)) |
| } |
| |
| if !kvObject.Skip() { |
| var previous *store.KVPair |
| if kvObject.Exists() { |
| previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} |
| } |
| |
| pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous) |
| if err != nil { |
| if errors.Is(err, store.ErrKeyExists) { |
| return ErrKeyModified |
| } |
| return err |
| } |
| |
| kvObject.SetIndex(pair.LastIndex) |
| } |
| |
| // If persistent store is skipped, sequencing needs to |
| // happen in cache. |
| return ds.cache.add(kvObject, kvObject.Skip()) |
| } |
| |
| // GetObject gets data from the store and unmarshals to the specified object. |
| func (ds *Store) GetObject(o KVObject) error { |
| ds.mu.Lock() |
| defer ds.mu.Unlock() |
| |
| return ds.cache.get(o) |
| } |
| |
| func (ds *Store) ensureParent(parent string) error { |
| exists, err := ds.store.Exists(parent) |
| if err != nil { |
| return err |
| } |
| if exists { |
| return nil |
| } |
| return ds.store.Put(parent, []byte{}) |
| } |
| |
| // List returns of a list of KVObjects belonging to the parent key. The caller |
| // must pass a KVObject of the same type as the objects that need to be listed. |
| func (ds *Store) List(kvObject KVObject) ([]KVObject, error) { |
| ds.mu.Lock() |
| defer ds.mu.Unlock() |
| |
| return ds.cache.list(kvObject) |
| } |
| |
| func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error { |
| // Make sure the parent key exists |
| if err := ds.ensureParent(key); err != nil { |
| return err |
| } |
| |
| kvList, err := ds.store.List(key) |
| if err != nil { |
| return err |
| } |
| |
| for _, kvPair := range kvList { |
| if len(kvPair.Value) == 0 { |
| continue |
| } |
| |
| dstO := ctor.New() |
| if err := dstO.SetValue(kvPair.Value); err != nil { |
| return err |
| } |
| |
| // Make sure the object has a correct view of the DB index in |
| // case we need to modify it and update the DB. |
| dstO.SetIndex(kvPair.LastIndex) |
| callback(kvPair.Key, dstO) |
| } |
| |
| return nil |
| } |
| |
| // Map returns a Map of KVObjects. |
| func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) { |
| ds.mu.Lock() |
| defer ds.mu.Unlock() |
| |
| results := map[string]KVObject{} |
| err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) { |
| // Trim the leading & trailing "/" to make it consistent across all stores |
| results[strings.Trim(key, "/")] = val |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return results, nil |
| } |
| |
| // DeleteObject deletes a kvObject from the on-disk DB and the in-memory cache. |
| // Unlike DeleteObjectAtomic, it doesn't check the optimistic lock of the |
| // passed kvObject. |
| func (ds *Store) DeleteObject(kvObject KVObject) error { |
| ds.mu.Lock() |
| defer ds.mu.Unlock() |
| |
| if kvObject == nil { |
| return types.InvalidParameterErrorf("invalid KV Object: nil") |
| } |
| |
| if !kvObject.Skip() { |
| if err := ds.store.Delete(Key(kvObject.Key()...)); err != nil { |
| return err |
| } |
| } |
| |
| // cleanup the cache only if AtomicDelete went through successfully |
| // If persistent store is skipped, sequencing needs to |
| // happen in cache. |
| return ds.cache.del(kvObject, false) |
| } |
| |
| // DeleteObjectAtomic performs atomic delete on a record. |
| func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error { |
| ds.mu.Lock() |
| defer ds.mu.Unlock() |
| |
| if kvObject == nil { |
| return types.InvalidParameterErrorf("invalid KV Object: nil") |
| } |
| |
| previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} |
| |
| if !kvObject.Skip() { |
| if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil { |
| if errors.Is(err, store.ErrKeyExists) { |
| return ErrKeyModified |
| } |
| return err |
| } |
| } |
| |
| // cleanup the cache only if AtomicDelete went through successfully |
| // If persistent store is skipped, sequencing needs to |
| // happen in cache. |
| return ds.cache.del(kvObject, kvObject.Skip()) |
| } |