| package metadata |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/boltdb/bolt" |
| "github.com/containerd/containerd/content" |
| "github.com/containerd/containerd/gc" |
| "github.com/containerd/containerd/log" |
| "github.com/containerd/containerd/snapshots" |
| "github.com/pkg/errors" |
| ) |
| |
| const ( |
| // schemaVersion represents the schema version of |
| // the database. This schema version represents the |
| // structure of the data in the database. The schema |
| // can envolve at any time but any backwards |
| // incompatible changes or structural changes require |
| // bumping the schema version. |
| schemaVersion = "v1" |
| |
| // dbVersion represents updates to the schema |
| // version which are additions and compatible with |
| // prior version of the same schema. |
| dbVersion = 1 |
| ) |
| |
| // DB represents a metadata database backed by a bolt |
| // database. The database is fully namespaced and stores |
| // image, container, namespace, snapshot, and content data |
| // while proxying data shared across namespaces to backend |
| // datastores for content and snapshots. |
| type DB struct { |
| db *bolt.DB |
| ss map[string]*snapshotter |
| cs *contentStore |
| |
| // wlock is used to protect access to the data structures during garbage |
| // collection. While the wlock is held no writable transactions can be |
| // opened, preventing changes from occurring between the mark and |
| // sweep phases without preventing read transactions. |
| wlock sync.RWMutex |
| |
| // dirty flags and lock keeps track of datastores which have had deletions |
| // since the last garbage collection. These datastores will will be garbage |
| // collected during the next garbage collection. |
| dirtyL sync.Mutex |
| dirtySS map[string]struct{} |
| dirtyCS bool |
| |
| // mutationCallbacks are called after each mutation with the flag |
| // set indicating whether any dirty flags are set |
| mutationCallbacks []func(bool) |
| } |
| |
| // NewDB creates a new metadata database using the provided |
| // bolt database, content store, and snapshotters. |
| func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshots.Snapshotter) *DB { |
| m := &DB{ |
| db: db, |
| ss: make(map[string]*snapshotter, len(ss)), |
| dirtySS: map[string]struct{}{}, |
| } |
| |
| // Initialize data stores |
| m.cs = newContentStore(m, cs) |
| for name, sn := range ss { |
| m.ss[name] = newSnapshotter(m, name, sn) |
| } |
| |
| return m |
| } |
| |
| // Init ensures the database is at the correct version |
| // and performs any needed migrations. |
| func (m *DB) Init(ctx context.Context) error { |
| // errSkip is used when no migration or version needs to be written |
| // to the database and the transaction can be immediately rolled |
| // back rather than performing a much slower and unnecessary commit. |
| var errSkip = errors.New("skip update") |
| |
| err := m.db.Update(func(tx *bolt.Tx) error { |
| var ( |
| // current schema and version |
| schema = "v0" |
| version = 0 |
| ) |
| |
| // i represents the index of the first migration |
| // which must be run to get the database up to date. |
| // The migration's version will be checked in reverse |
| // order, decrementing i for each migration which |
| // represents a version newer than the current |
| // database version |
| i := len(migrations) |
| |
| for ; i > 0; i-- { |
| migration := migrations[i-1] |
| |
| bkt := tx.Bucket([]byte(migration.schema)) |
| if bkt == nil { |
| // Hasn't encountered another schema, go to next migration |
| if schema == "v0" { |
| continue |
| } |
| break |
| } |
| if schema == "v0" { |
| schema = migration.schema |
| vb := bkt.Get(bucketKeyDBVersion) |
| if vb != nil { |
| v, _ := binary.Varint(vb) |
| version = int(v) |
| } |
| } |
| |
| if version >= migration.version { |
| break |
| } |
| } |
| |
| // Previous version fo database found |
| if schema != "v0" { |
| updates := migrations[i:] |
| |
| // No migration updates, return immediately |
| if len(updates) == 0 { |
| return errSkip |
| } |
| |
| for _, m := range updates { |
| t0 := time.Now() |
| if err := m.migrate(tx); err != nil { |
| return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version) |
| } |
| log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("finished database migration to %s.%d", m.schema, m.version) |
| } |
| } |
| |
| bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) |
| if err != nil { |
| return err |
| } |
| |
| versionEncoded, err := encodeInt(dbVersion) |
| if err != nil { |
| return err |
| } |
| |
| return bkt.Put(bucketKeyDBVersion, versionEncoded) |
| }) |
| if err == errSkip { |
| err = nil |
| } |
| return err |
| } |
| |
| // ContentStore returns a namespaced content store |
| // proxied to a content store. |
| func (m *DB) ContentStore() content.Store { |
| if m.cs == nil { |
| return nil |
| } |
| return m.cs |
| } |
| |
| // Snapshotter returns a namespaced content store for |
| // the requested snapshotter name proxied to a snapshotter. |
| func (m *DB) Snapshotter(name string) snapshots.Snapshotter { |
| sn, ok := m.ss[name] |
| if !ok { |
| return nil |
| } |
| return sn |
| } |
| |
| // View runs a readonly transaction on the metadata store. |
| func (m *DB) View(fn func(*bolt.Tx) error) error { |
| return m.db.View(fn) |
| } |
| |
| // Update runs a writable transaction on the metadata store. |
| func (m *DB) Update(fn func(*bolt.Tx) error) error { |
| m.wlock.RLock() |
| defer m.wlock.RUnlock() |
| err := m.db.Update(fn) |
| if err == nil { |
| m.dirtyL.Lock() |
| dirty := m.dirtyCS || len(m.dirtySS) > 0 |
| for _, fn := range m.mutationCallbacks { |
| fn(dirty) |
| } |
| m.dirtyL.Unlock() |
| } |
| |
| return err |
| } |
| |
| // RegisterMutationCallback registers a function to be called after a metadata |
| // mutations has been performed. |
| // |
| // The callback function in an argument for whether a deletion has occurred |
| // since the last garbage collection. |
| func (m *DB) RegisterMutationCallback(fn func(bool)) { |
| m.dirtyL.Lock() |
| m.mutationCallbacks = append(m.mutationCallbacks, fn) |
| m.dirtyL.Unlock() |
| } |
| |
| // GCStats holds the duration for the different phases of the garbage collector |
| type GCStats struct { |
| MetaD time.Duration |
| ContentD time.Duration |
| SnapshotD map[string]time.Duration |
| } |
| |
| // GarbageCollect starts garbage collection |
| func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) { |
| m.wlock.Lock() |
| t1 := time.Now() |
| |
| marked, err := m.getMarked(ctx) |
| if err != nil { |
| m.wlock.Unlock() |
| return GCStats{}, err |
| } |
| |
| m.dirtyL.Lock() |
| |
| if err := m.db.Update(func(tx *bolt.Tx) error { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| |
| rm := func(ctx context.Context, n gc.Node) error { |
| if _, ok := marked[n]; ok { |
| return nil |
| } |
| |
| if n.Type == ResourceSnapshot { |
| if idx := strings.IndexRune(n.Key, '/'); idx > 0 { |
| m.dirtySS[n.Key[:idx]] = struct{}{} |
| } |
| } else if n.Type == ResourceContent { |
| m.dirtyCS = true |
| } |
| return remove(ctx, tx, n) |
| } |
| |
| if err := scanAll(ctx, tx, rm); err != nil { |
| return errors.Wrap(err, "failed to scan and remove") |
| } |
| |
| return nil |
| }); err != nil { |
| m.dirtyL.Unlock() |
| m.wlock.Unlock() |
| return GCStats{}, err |
| } |
| |
| var wg sync.WaitGroup |
| |
| if len(m.dirtySS) > 0 { |
| var sl sync.Mutex |
| stats.SnapshotD = map[string]time.Duration{} |
| wg.Add(len(m.dirtySS)) |
| for snapshotterName := range m.dirtySS { |
| log.G(ctx).WithField("snapshotter", snapshotterName).Debug("schedule snapshotter cleanup") |
| go func(snapshotterName string) { |
| st1 := time.Now() |
| m.cleanupSnapshotter(snapshotterName) |
| |
| sl.Lock() |
| stats.SnapshotD[snapshotterName] = time.Now().Sub(st1) |
| sl.Unlock() |
| |
| wg.Done() |
| }(snapshotterName) |
| } |
| m.dirtySS = map[string]struct{}{} |
| } |
| |
| if m.dirtyCS { |
| wg.Add(1) |
| log.G(ctx).Debug("schedule content cleanup") |
| go func() { |
| ct1 := time.Now() |
| m.cleanupContent() |
| stats.ContentD = time.Now().Sub(ct1) |
| wg.Done() |
| }() |
| m.dirtyCS = false |
| } |
| |
| m.dirtyL.Unlock() |
| |
| stats.MetaD = time.Now().Sub(t1) |
| m.wlock.Unlock() |
| |
| wg.Wait() |
| |
| return |
| } |
| |
| func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { |
| var marked map[gc.Node]struct{} |
| if err := m.db.View(func(tx *bolt.Tx) error { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| |
| var ( |
| nodes []gc.Node |
| wg sync.WaitGroup |
| roots = make(chan gc.Node) |
| ) |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| for n := range roots { |
| nodes = append(nodes, n) |
| } |
| }() |
| // Call roots |
| if err := scanRoots(ctx, tx, roots); err != nil { |
| cancel() |
| return err |
| } |
| close(roots) |
| wg.Wait() |
| |
| refs := func(n gc.Node) ([]gc.Node, error) { |
| var sn []gc.Node |
| if err := references(ctx, tx, n, func(nn gc.Node) { |
| sn = append(sn, nn) |
| }); err != nil { |
| return nil, err |
| } |
| return sn, nil |
| } |
| |
| reachable, err := gc.Tricolor(nodes, refs) |
| if err != nil { |
| return err |
| } |
| marked = reachable |
| return nil |
| }); err != nil { |
| return nil, err |
| } |
| return marked, nil |
| } |
| |
| func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) { |
| ctx := context.Background() |
| sn, ok := m.ss[name] |
| if !ok { |
| return 0, nil |
| } |
| |
| d, err := sn.garbageCollect(ctx) |
| logger := log.G(ctx).WithField("snapshotter", name) |
| if err != nil { |
| logger.WithError(err).Warn("snapshot garbage collection failed") |
| } else { |
| logger.WithField("d", d).Debugf("snapshot garbage collected") |
| } |
| return d, err |
| } |
| |
| func (m *DB) cleanupContent() (time.Duration, error) { |
| ctx := context.Background() |
| if m.cs == nil { |
| return 0, nil |
| } |
| |
| d, err := m.cs.garbageCollect(ctx) |
| if err != nil { |
| log.G(ctx).WithError(err).Warn("content garbage collection failed") |
| } else { |
| log.G(ctx).WithField("d", d).Debugf("content garbage collected") |
| } |
| |
| return d, err |
| } |