blob: 3e501c51718697df42539c36ed9f8d94764f7dba [file] [log] [blame]
package metadata
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/labels"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
"github.com/pkg/errors"
)
type snapshotter struct {
snapshots.Snapshotter
name string
db *DB
l sync.RWMutex
}
// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
// using the provided name and database.
func newSnapshotter(db *DB, name string, sn snapshots.Snapshotter) *snapshotter {
return &snapshotter{
Snapshotter: sn,
name: name,
db: db,
}
}
func createKey(id uint64, namespace, key string) string {
return fmt.Sprintf("%s/%d/%s", namespace, id, key)
}
func getKey(tx *bolt.Tx, ns, name, key string) string {
bkt := getSnapshotterBucket(tx, ns, name)
if bkt == nil {
return ""
}
bkt = bkt.Bucket([]byte(key))
if bkt == nil {
return ""
}
v := bkt.Get(bucketKeyName)
if len(v) == 0 {
return ""
}
return string(v)
}
func (s *snapshotter) resolveKey(ctx context.Context, key string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
var id string
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
id = getKey(tx, ns, s.name, key)
if id == "" {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
return nil
}); err != nil {
return "", err
}
return id, nil
}
func (s *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return snapshots.Info{}, err
}
var (
bkey string
local = snapshots.Info{
Name: key,
}
)
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
sbkt := bkt.Bucket([]byte(key))
if sbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
local.Labels, err = boltutil.ReadLabels(sbkt)
if err != nil {
return errors.Wrap(err, "failed to read labels")
}
if err := boltutil.ReadTimestamps(sbkt, &local.Created, &local.Updated); err != nil {
return errors.Wrap(err, "failed to read timestamps")
}
bkey = string(sbkt.Get(bucketKeyName))
local.Parent = string(sbkt.Get(bucketKeyParent))
return nil
}); err != nil {
return snapshots.Info{}, err
}
info, err := s.Snapshotter.Stat(ctx, bkey)
if err != nil {
return snapshots.Info{}, err
}
return overlayInfo(info, local), nil
}
func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return snapshots.Info{}, err
}
if info.Name == "" {
return snapshots.Info{}, errors.Wrap(errdefs.ErrInvalidArgument, "")
}
var (
bkey string
local = snapshots.Info{
Name: info.Name,
}
)
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", info.Name)
}
sbkt := bkt.Bucket([]byte(info.Name))
if sbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", info.Name)
}
local.Labels, err = boltutil.ReadLabels(sbkt)
if err != nil {
return errors.Wrap(err, "failed to read labels")
}
if err := boltutil.ReadTimestamps(sbkt, &local.Created, &local.Updated); err != nil {
return errors.Wrap(err, "failed to read timestamps")
}
// Handle field updates
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if local.Labels == nil {
local.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
local.Labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
local.Labels = info.Labels
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on snapshot %q", path, info.Name)
}
}
} else {
local.Labels = info.Labels
}
if err := validateSnapshot(&local); err != nil {
return err
}
local.Updated = time.Now().UTC()
if err := boltutil.WriteTimestamps(sbkt, local.Created, local.Updated); err != nil {
return errors.Wrap(err, "failed to read timestamps")
}
if err := boltutil.WriteLabels(sbkt, local.Labels); err != nil {
return errors.Wrap(err, "failed to read labels")
}
bkey = string(sbkt.Get(bucketKeyName))
local.Parent = string(sbkt.Get(bucketKeyParent))
return nil
}); err != nil {
return snapshots.Info{}, err
}
info, err = s.Snapshotter.Stat(ctx, bkey)
if err != nil {
return snapshots.Info{}, err
}
return overlayInfo(info, local), nil
}
func overlayInfo(info, overlay snapshots.Info) snapshots.Info {
// Merge info
info.Name = overlay.Name
info.Created = overlay.Created
info.Updated = overlay.Updated
info.Parent = overlay.Parent
if info.Labels == nil {
info.Labels = overlay.Labels
} else {
for k, v := range overlay.Labels {
overlay.Labels[k] = v
}
}
return info
}
func (s *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
bkey, err := s.resolveKey(ctx, key)
if err != nil {
return snapshots.Usage{}, err
}
return s.Snapshotter.Usage(ctx, bkey)
}
func (s *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
bkey, err := s.resolveKey(ctx, key)
if err != nil {
return nil, err
}
return s.Snapshotter.Mounts(ctx, bkey)
}
func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.createSnapshot(ctx, key, parent, false, opts)
}
func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.createSnapshot(ctx, key, parent, true, opts)
}
func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshots.Opt) ([]mount.Mount, error) {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return nil, err
}
}
if err := validateSnapshot(&base); err != nil {
return nil, err
}
var m []mount.Mount
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt, err := createSnapshotterBucket(tx, ns, s.name)
if err != nil {
return err
}
bbkt, err := bkt.CreateBucket([]byte(key))
if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "snapshot %q", key)
}
return err
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
var bparent string
if parent != "" {
pbkt := bkt.Bucket([]byte(parent))
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", parent)
}
bparent = string(pbkt.Get(bucketKeyName))
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
if err != nil {
return err
}
if err := cbkt.Put([]byte(key), nil); err != nil {
return err
}
if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
return err
}
}
sid, err := bkt.NextSequence()
if err != nil {
return err
}
bkey := createKey(sid, ns, key)
if err := bbkt.Put(bucketKeyName, []byte(bkey)); err != nil {
return err
}
ts := time.Now().UTC()
if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil {
return err
}
if err := boltutil.WriteLabels(bbkt, base.Labels); err != nil {
return err
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
if readonly {
m, err = s.Snapshotter.View(ctx, bkey, bparent)
} else {
m, err = s.Snapshotter.Prepare(ctx, bkey, bparent)
}
return err
}); err != nil {
return nil, err
}
return m, nil
}
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
}
}
if err := validateSnapshot(&base); err != nil {
return err
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
bbkt, err := bkt.CreateBucket([]byte(name))
if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "snapshot %q", name)
}
return err
}
if err := addSnapshotLease(ctx, tx, s.name, name); err != nil {
return err
}
obkt := bkt.Bucket([]byte(key))
if obkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
bkey := string(obkt.Get(bucketKeyName))
sid, err := bkt.NextSequence()
if err != nil {
return err
}
nameKey := createKey(sid, ns, name)
if err := bbkt.Put(bucketKeyName, []byte(nameKey)); err != nil {
return err
}
parent := obkt.Get(bucketKeyParent)
if len(parent) > 0 {
pbkt := bkt.Bucket(parent)
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent))
}
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
if err != nil {
return err
}
if err := cbkt.Delete([]byte(key)); err != nil {
return err
}
if err := cbkt.Put([]byte(name), nil); err != nil {
return err
}
if err := bbkt.Put(bucketKeyParent, parent); err != nil {
return err
}
}
ts := time.Now().UTC()
if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil {
return err
}
if err := boltutil.WriteLabels(bbkt, base.Labels); err != nil {
return err
}
if err := bkt.DeleteBucket([]byte(key)); err != nil {
return err
}
if err := removeSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
return s.Snapshotter.Commit(ctx, nameKey, bkey)
})
}
func (s *snapshotter) Remove(ctx context.Context, key string) error {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
var sbkt *bolt.Bucket
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt != nil {
sbkt = bkt.Bucket([]byte(key))
}
if sbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
cbkt := sbkt.Bucket(bucketKeyChildren)
if cbkt != nil {
if child, _ := cbkt.Cursor().First(); child != nil {
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot remove snapshot with child")
}
}
parent := sbkt.Get(bucketKeyParent)
if len(parent) > 0 {
pbkt := bkt.Bucket(parent)
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent))
}
cbkt := pbkt.Bucket(bucketKeyChildren)
if cbkt != nil {
if err := cbkt.Delete([]byte(key)); err != nil {
return errors.Wrap(err, "failed to remove child link")
}
}
}
if err := bkt.DeleteBucket([]byte(key)); err != nil {
return err
}
if err := removeSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
// Mark snapshotter as dirty for triggering garbage collection
s.db.dirtyL.Lock()
s.db.dirtySS[s.name] = struct{}{}
s.db.dirtyL.Unlock()
return nil
})
}
type infoPair struct {
bkey string
info snapshots.Info
}
func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
var (
batchSize = 100
pairs = []infoPair{}
lastKey string
)
for {
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return nil
}
c := bkt.Cursor()
var k, v []byte
if lastKey == "" {
k, v = c.First()
} else {
k, v = c.Seek([]byte(lastKey))
}
for k != nil {
if v == nil {
if len(pairs) >= batchSize {
break
}
sbkt := bkt.Bucket(k)
pair := infoPair{
bkey: string(sbkt.Get(bucketKeyName)),
info: snapshots.Info{
Name: string(k),
Parent: string(sbkt.Get(bucketKeyParent)),
},
}
err := boltutil.ReadTimestamps(sbkt, &pair.info.Created, &pair.info.Updated)
if err != nil {
return err
}
pair.info.Labels, err = boltutil.ReadLabels(sbkt)
if err != nil {
return err
}
pairs = append(pairs, pair)
}
k, v = c.Next()
}
lastKey = string(k)
return nil
}); err != nil {
return err
}
for _, pair := range pairs {
info, err := s.Snapshotter.Stat(ctx, pair.bkey)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return err
}
if err := fn(ctx, overlayInfo(info, pair.info)); err != nil {
return err
}
}
if lastKey == "" {
break
}
pairs = pairs[:0]
}
return nil
}
func validateSnapshot(info *snapshots.Info) error {
for k, v := range info.Labels {
if err := labels.Validate(k, v); err != nil {
return errors.Wrapf(err, "info.Labels")
}
}
return nil
}
func (s *snapshotter) garbageCollect(ctx context.Context) (d time.Duration, err error) {
s.l.Lock()
t1 := time.Now()
defer func() {
if err == nil {
d = time.Now().Sub(t1)
}
s.l.Unlock()
}()
seen := map[string]struct{}{}
if err := s.db.View(func(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
sbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectSnapshots)
if sbkt == nil {
continue
}
// Load specific snapshotter
ssbkt := sbkt.Bucket([]byte(s.name))
if ssbkt == nil {
continue
}
if err := ssbkt.ForEach(func(sk, sv []byte) error {
if sv == nil {
bkey := ssbkt.Bucket(sk).Get(bucketKeyName)
if len(bkey) > 0 {
seen[string(bkey)] = struct{}{}
}
}
return nil
}); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, err
}
roots, err := s.walkTree(ctx, seen)
if err != nil {
return 0, err
}
// TODO: Unlock before removal (once nodes are fully unavailable).
// This could be achieved through doing prune inside the lock
// and having a cleanup method which actually performs the
// deletions on the snapshotters which support it.
for _, node := range roots {
if err := s.pruneBranch(ctx, node); err != nil {
return 0, err
}
}
return
}
type treeNode struct {
info snapshots.Info
remove bool
children []*treeNode
}
func (s *snapshotter) walkTree(ctx context.Context, seen map[string]struct{}) ([]*treeNode, error) {
roots := []*treeNode{}
nodes := map[string]*treeNode{}
if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
_, isSeen := seen[info.Name]
node, ok := nodes[info.Name]
if !ok {
node = &treeNode{}
nodes[info.Name] = node
}
node.remove = !isSeen
node.info = info
if info.Parent == "" {
roots = append(roots, node)
} else {
parent, ok := nodes[info.Parent]
if !ok {
parent = &treeNode{}
nodes[info.Parent] = parent
}
parent.children = append(parent.children, node)
}
return nil
}); err != nil {
return nil, err
}
return roots, nil
}
func (s *snapshotter) pruneBranch(ctx context.Context, node *treeNode) error {
for _, child := range node.children {
if err := s.pruneBranch(ctx, child); err != nil {
return err
}
}
if node.remove {
logger := log.G(ctx).WithField("snapshotter", s.name)
if err := s.Snapshotter.Remove(ctx, node.info.Name); err != nil {
if !errdefs.IsFailedPrecondition(err) {
return err
}
logger.WithError(err).WithField("key", node.info.Name).Warnf("snapshot removal failed")
} else {
logger.WithField("key", node.info.Name).Debug("removed snapshot")
}
}
return nil
}
// Close closes s.Snapshotter but not db
func (s *snapshotter) Close() error {
return s.Snapshotter.Close()
}