blob: 1744321800e00034b4c4e9e15238eb7aff5f602f [file] [log] [blame]
package metadata
import (
"context"
"encoding/binary"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots"
"github.com/pkg/errors"
)
const (
// schemaVersion represents the schema version of
// the database. This schema version represents the
// structure of the data in the database. The schema
// can envolve at any time but any backwards
// incompatible changes or structural changes require
// bumping the schema version.
schemaVersion = "v1"
// dbVersion represents updates to the schema
// version which are additions and compatible with
// prior version of the same schema.
dbVersion = 1
)
// DB represents a metadata database backed by a bolt
// database. The database is fully namespaced and stores
// image, container, namespace, snapshot, and content data
// while proxying data shared across namespaces to backend
// datastores for content and snapshots.
type DB struct {
db *bolt.DB
ss map[string]*snapshotter
cs *contentStore
// wlock is used to protect access to the data structures during garbage
// collection. While the wlock is held no writable transactions can be
// opened, preventing changes from occurring between the mark and
// sweep phases without preventing read transactions.
wlock sync.RWMutex
// dirty flags and lock keeps track of datastores which have had deletions
// since the last garbage collection. These datastores will will be garbage
// collected during the next garbage collection.
dirtyL sync.Mutex
dirtySS map[string]struct{}
dirtyCS bool
// mutationCallbacks are called after each mutation with the flag
// set indicating whether any dirty flags are set
mutationCallbacks []func(bool)
}
// NewDB creates a new metadata database using the provided
// bolt database, content store, and snapshotters.
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshots.Snapshotter) *DB {
m := &DB{
db: db,
ss: make(map[string]*snapshotter, len(ss)),
dirtySS: map[string]struct{}{},
}
// Initialize data stores
m.cs = newContentStore(m, cs)
for name, sn := range ss {
m.ss[name] = newSnapshotter(m, name, sn)
}
return m
}
// Init ensures the database is at the correct version
// and performs any needed migrations.
func (m *DB) Init(ctx context.Context) error {
// errSkip is used when no migration or version needs to be written
// to the database and the transaction can be immediately rolled
// back rather than performing a much slower and unnecessary commit.
var errSkip = errors.New("skip update")
err := m.db.Update(func(tx *bolt.Tx) error {
var (
// current schema and version
schema = "v0"
version = 0
)
// i represents the index of the first migration
// which must be run to get the database up to date.
// The migration's version will be checked in reverse
// order, decrementing i for each migration which
// represents a version newer than the current
// database version
i := len(migrations)
for ; i > 0; i-- {
migration := migrations[i-1]
bkt := tx.Bucket([]byte(migration.schema))
if bkt == nil {
// Hasn't encountered another schema, go to next migration
if schema == "v0" {
continue
}
break
}
if schema == "v0" {
schema = migration.schema
vb := bkt.Get(bucketKeyDBVersion)
if vb != nil {
v, _ := binary.Varint(vb)
version = int(v)
}
}
if version >= migration.version {
break
}
}
// Previous version fo database found
if schema != "v0" {
updates := migrations[i:]
// No migration updates, return immediately
if len(updates) == 0 {
return errSkip
}
for _, m := range updates {
t0 := time.Now()
if err := m.migrate(tx); err != nil {
return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
}
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version)
}
}
bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
versionEncoded, err := encodeInt(dbVersion)
if err != nil {
return err
}
return bkt.Put(bucketKeyDBVersion, versionEncoded)
})
if err == errSkip {
err = nil
}
return err
}
// ContentStore returns a namespaced content store
// proxied to a content store.
func (m *DB) ContentStore() content.Store {
if m.cs == nil {
return nil
}
return m.cs
}
// Snapshotter returns a namespaced content store for
// the requested snapshotter name proxied to a snapshotter.
func (m *DB) Snapshotter(name string) snapshots.Snapshotter {
sn, ok := m.ss[name]
if !ok {
return nil
}
return sn
}
// View runs a readonly transaction on the metadata store.
func (m *DB) View(fn func(*bolt.Tx) error) error {
return m.db.View(fn)
}
// Update runs a writable transaction on the metadata store.
func (m *DB) Update(fn func(*bolt.Tx) error) error {
m.wlock.RLock()
defer m.wlock.RUnlock()
err := m.db.Update(fn)
if err == nil {
m.dirtyL.Lock()
dirty := m.dirtyCS || len(m.dirtySS) > 0
for _, fn := range m.mutationCallbacks {
fn(dirty)
}
m.dirtyL.Unlock()
}
return err
}
// RegisterMutationCallback registers a function to be called after a metadata
// mutations has been performed.
//
// The callback function in an argument for whether a deletion has occurred
// since the last garbage collection.
func (m *DB) RegisterMutationCallback(fn func(bool)) {
m.dirtyL.Lock()
m.mutationCallbacks = append(m.mutationCallbacks, fn)
m.dirtyL.Unlock()
}
// GCStats holds the duration for the different phases of the garbage collector
type GCStats struct {
MetaD time.Duration
ContentD time.Duration
SnapshotD map[string]time.Duration
}
// GarbageCollect starts garbage collection
func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
m.wlock.Lock()
t1 := time.Now()
marked, err := m.getMarked(ctx)
if err != nil {
m.wlock.Unlock()
return GCStats{}, err
}
m.dirtyL.Lock()
if err := m.db.Update(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rm := func(ctx context.Context, n gc.Node) error {
if _, ok := marked[n]; ok {
return nil
}
if n.Type == ResourceSnapshot {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{}
}
} else if n.Type == ResourceContent {
m.dirtyCS = true
}
return remove(ctx, tx, n)
}
if err := scanAll(ctx, tx, rm); err != nil {
return errors.Wrap(err, "failed to scan and remove")
}
return nil
}); err != nil {
m.dirtyL.Unlock()
m.wlock.Unlock()
return GCStats{}, err
}
var wg sync.WaitGroup
if len(m.dirtySS) > 0 {
var sl sync.Mutex
stats.SnapshotD = map[string]time.Duration{}
wg.Add(len(m.dirtySS))
for snapshotterName := range m.dirtySS {
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup")
go func(snapshotterName string) {
st1 := time.Now()
m.cleanupSnapshotter(snapshotterName)
sl.Lock()
stats.SnapshotD[snapshotterName] = time.Now().Sub(st1)
sl.Unlock()
wg.Done()
}(snapshotterName)
}
m.dirtySS = map[string]struct{}{}
}
if m.dirtyCS {
wg.Add(1)
log.G(ctx).Debug("scheduling content cleanup")
go func() {
ct1 := time.Now()
m.cleanupContent()
stats.ContentD = time.Now().Sub(ct1)
wg.Done()
}()
m.dirtyCS = false
}
m.dirtyL.Unlock()
stats.MetaD = time.Now().Sub(t1)
m.wlock.Unlock()
wg.Wait()
return
}
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
nodes []gc.Node
wg sync.WaitGroup
roots = make(chan gc.Node)
)
wg.Add(1)
go func() {
defer wg.Done()
for n := range roots {
nodes = append(nodes, n)
}
}()
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
return err
}
close(roots)
wg.Wait()
refs := func(n gc.Node) ([]gc.Node, error) {
var sn []gc.Node
if err := references(ctx, tx, n, func(nn gc.Node) {
sn = append(sn, nn)
}); err != nil {
return nil, err
}
return sn, nil
}
reachable, err := gc.Tricolor(nodes, refs)
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
return nil, err
}
return marked, nil
}
func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) {
ctx := context.Background()
sn, ok := m.ss[name]
if !ok {
return 0, nil
}
d, err := sn.garbageCollect(ctx)
logger := log.G(ctx).WithField("snapshotter", name)
if err != nil {
logger.WithError(err).Warn("snapshot garbage collection failed")
} else {
logger.WithField("d", d).Debugf("snapshot garbage collected")
}
return d, err
}
func (m *DB) cleanupContent() (time.Duration, error) {
ctx := context.Background()
if m.cs == nil {
return 0, nil
}
d, err := m.cs.garbageCollect(ctx)
if err != nil {
log.G(ctx).WithError(err).Warn("content garbage collection failed")
} else {
log.G(ctx).WithField("d", d).Debugf("content garbage collected")
}
return d, err
}