| package metadata |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/boltdb/bolt" |
| "github.com/containerd/containerd/content" |
| "github.com/containerd/containerd/gc" |
| "github.com/containerd/containerd/log" |
| "github.com/containerd/containerd/snapshot" |
| "github.com/pkg/errors" |
| ) |
| |
| const ( |
| // schemaVersion represents the schema version of |
| // the database. This schema version represents the |
| // structure of the data in the database. The schema |
| // can envolve at any time but any backwards |
| // incompatible changes or structural changes require |
| // bumping the schema version. |
| schemaVersion = "v1" |
| |
| // dbVersion represents updates to the schema |
| // version which are additions and compatible with |
| // prior version of the same schema. |
| dbVersion = 1 |
| ) |
| |
| // DB represents a metadata database backed by a bolt |
| // database. The database is fully namespaced and stores |
| // image, container, namespace, snapshot, and content data |
| // while proxying data shared across namespaces to backend |
| // datastores for content and snapshots. |
| type DB struct { |
| db *bolt.DB |
| ss map[string]*snapshotter |
| cs *contentStore |
| |
| // wlock is used to protect access to the data structures during garbage |
| // collection. While the wlock is held no writable transactions can be |
| // opened, preventing changes from occurring between the mark and |
| // sweep phases without preventing read transactions. |
| wlock sync.RWMutex |
| |
| // dirty flags and lock keeps track of datastores which have had deletions |
| // since the last garbage collection. These datastores will will be garbage |
| // collected during the next garbage collection. |
| dirtyL sync.Mutex |
| dirtySS map[string]struct{} |
| dirtyCS bool |
| |
| // TODO: Keep track of stats such as pause time, number of collected objects, errors |
| lastCollection time.Time |
| } |
| |
| // NewDB creates a new metadata database using the provided |
| // bolt database, content store, and snapshotters. |
| func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB { |
| m := &DB{ |
| db: db, |
| ss: make(map[string]*snapshotter, len(ss)), |
| dirtySS: map[string]struct{}{}, |
| } |
| |
| // Initialize data stores |
| m.cs = newContentStore(m, cs) |
| for name, sn := range ss { |
| m.ss[name] = newSnapshotter(m, name, sn) |
| } |
| |
| return m |
| } |
| |
| // Init ensures the database is at the correct version |
| // and performs any needed migrations. |
| func (m *DB) Init(ctx context.Context) error { |
| // errSkip is used when no migration or version needs to be written |
| // to the database and the transaction can be immediately rolled |
| // back rather than performing a much slower and unnecessary commit. |
| var errSkip = errors.New("skip update") |
| |
| err := m.db.Update(func(tx *bolt.Tx) error { |
| var ( |
| // current schema and version |
| schema = "v0" |
| version = 0 |
| ) |
| |
| // i represents the index of the first migration |
| // which must be run to get the database up to date. |
| // The migration's version will be checked in reverse |
| // order, decrementing i for each migration which |
| // represents a version newer than the current |
| // database version |
| i := len(migrations) |
| |
| for ; i > 0; i-- { |
| migration := migrations[i-1] |
| |
| bkt := tx.Bucket([]byte(migration.schema)) |
| if bkt == nil { |
| // Hasn't encountered another schema, go to next migration |
| if schema == "v0" { |
| continue |
| } |
| break |
| } |
| if schema == "v0" { |
| schema = migration.schema |
| vb := bkt.Get(bucketKeyDBVersion) |
| if vb != nil { |
| v, _ := binary.Varint(vb) |
| version = int(v) |
| } |
| } |
| |
| if version >= migration.version { |
| break |
| } |
| } |
| |
| // Previous version fo database found |
| if schema != "v0" { |
| updates := migrations[i:] |
| |
| // No migration updates, return immediately |
| if len(updates) == 0 { |
| return errSkip |
| } |
| |
| for _, m := range updates { |
| t0 := time.Now() |
| if err := m.migrate(tx); err != nil { |
| return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version) |
| } |
| log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version) |
| } |
| } |
| |
| bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) |
| if err != nil { |
| return err |
| } |
| |
| versionEncoded, err := encodeInt(dbVersion) |
| if err != nil { |
| return err |
| } |
| |
| return bkt.Put(bucketKeyDBVersion, versionEncoded) |
| }) |
| if err == errSkip { |
| err = nil |
| } |
| return err |
| } |
| |
| // ContentStore returns a namespaced content store |
| // proxied to a content store. |
| func (m *DB) ContentStore() content.Store { |
| if m.cs == nil { |
| return nil |
| } |
| return m.cs |
| } |
| |
| // Snapshotter returns a namespaced content store for |
| // the requested snapshotter name proxied to a snapshotter. |
| func (m *DB) Snapshotter(name string) snapshot.Snapshotter { |
| sn, ok := m.ss[name] |
| if !ok { |
| return nil |
| } |
| return sn |
| } |
| |
| // View runs a readonly transaction on the metadata store. |
| func (m *DB) View(fn func(*bolt.Tx) error) error { |
| return m.db.View(fn) |
| } |
| |
| // Update runs a writable transation on the metadata store. |
| func (m *DB) Update(fn func(*bolt.Tx) error) error { |
| m.wlock.RLock() |
| defer m.wlock.RUnlock() |
| return m.db.Update(fn) |
| } |
| |
| func (m *DB) GarbageCollect(ctx context.Context) error { |
| lt1 := time.Now() |
| m.wlock.Lock() |
| defer func() { |
| m.wlock.Unlock() |
| log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected") |
| }() |
| |
| var marked map[gc.Node]struct{} |
| |
| if err := m.db.View(func(tx *bolt.Tx) error { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| |
| roots := make(chan gc.Node) |
| errChan := make(chan error) |
| go func() { |
| defer close(errChan) |
| defer close(roots) |
| |
| // Call roots |
| if err := scanRoots(ctx, tx, roots); err != nil { |
| cancel() |
| errChan <- err |
| } |
| }() |
| |
| refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error { |
| return references(ctx, tx, n, fn) |
| } |
| |
| reachable, err := gc.ConcurrentMark(ctx, roots, refs) |
| if rerr := <-errChan; rerr != nil { |
| return rerr |
| } |
| if err != nil { |
| return err |
| } |
| marked = reachable |
| return nil |
| }); err != nil { |
| return err |
| } |
| |
| m.dirtyL.Lock() |
| defer m.dirtyL.Unlock() |
| |
| if err := m.db.Update(func(tx *bolt.Tx) error { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| |
| nodeC := make(chan gc.Node) |
| var scanErr error |
| |
| go func() { |
| defer close(nodeC) |
| scanErr = scanAll(ctx, tx, nodeC) |
| }() |
| |
| rm := func(n gc.Node) error { |
| if n.Type == ResourceSnapshot { |
| if idx := strings.IndexRune(n.Key, '/'); idx > 0 { |
| m.dirtySS[n.Key[:idx]] = struct{}{} |
| } |
| } else if n.Type == ResourceContent { |
| m.dirtyCS = true |
| } |
| return remove(ctx, tx, n) |
| } |
| |
| if err := gc.Sweep(marked, nodeC, rm); err != nil { |
| return errors.Wrap(err, "failed to sweep") |
| } |
| |
| if scanErr != nil { |
| return errors.Wrap(scanErr, "failed to scan all") |
| } |
| |
| return nil |
| }); err != nil { |
| return err |
| } |
| |
| m.lastCollection = time.Now() |
| |
| if len(m.dirtySS) > 0 { |
| for snapshotterName := range m.dirtySS { |
| log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup") |
| go m.cleanupSnapshotter(snapshotterName) |
| } |
| m.dirtySS = map[string]struct{}{} |
| } |
| |
| if m.dirtyCS { |
| log.G(ctx).Debug("scheduling content cleanup") |
| go m.cleanupContent() |
| m.dirtyCS = false |
| } |
| |
| return nil |
| } |
| |
| func (m *DB) cleanupSnapshotter(name string) { |
| ctx := context.Background() |
| sn, ok := m.ss[name] |
| if !ok { |
| return |
| } |
| |
| err := sn.garbageCollect(ctx) |
| if err != nil { |
| log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed") |
| } |
| } |
| |
| func (m *DB) cleanupContent() { |
| ctx := context.Background() |
| if m.cs == nil { |
| return |
| } |
| |
| err := m.cs.garbageCollect(ctx) |
| if err != nil { |
| log.G(ctx).WithError(err).Warn("content garbage collection failed") |
| } |
| } |