blob: a3c1701d8addf594aa7651d5af3667a5915ddd89 [file] [log] [blame]
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metadata
import (
"context"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
// LeaseManager manages the create/delete lifecyle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
}
// NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
return &LeaseManager{
tx: tx,
}
}
// Create creates a new lease using the provided lease
func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
var l leases.Lease
for _, opt := range opts {
if err := opt(&l); err != nil {
return leases.Lease{}, err
}
}
if l.ID == "" {
return leases.Lease{}, errors.New("lease id must be provided")
}
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return leases.Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return leases.Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(l.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return leases.Lease{}, errors.Wrapf(err, "lease %q", l.ID)
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return leases.Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return leases.Lease{}, err
}
if l.Labels != nil {
if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil {
return leases.Lease{}, err
}
}
l.CreatedAt = t
return l, nil
}
// Delete delets the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...leases.DeleteOpt) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil {
if err == bolt.ErrBucketNotFound {
err = errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
return err
}
return nil
}
// List lists all active leases
func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error())
}
var ll []leases.Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return ll, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
txbkt := topbkt.Bucket(k)
l := leases.Lease{
ID: string(k),
}
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
return err
}
}
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
l.Labels = labels
if filter.Match(adaptLease(l)) {
ll = append(ll, l)
}
return nil
}); err != nil {
return nil, err
}
return ll, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be checked")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectSnapshots)
if err != nil {
return err
}
bkt, err = bkt.CreateBucketIfNotExists([]byte(snapshotter))
if err != nil {
return err
}
return bkt.Put([]byte(key), nil)
}
func removeSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be checked")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectSnapshots, []byte(snapshotter))
if bkt == nil {
// Key does not exist so we return nil
return nil
}
return bkt.Delete([]byte(key))
}
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectContent)
if err != nil {
return err
}
return bkt.Put([]byte(dgst.String()), nil)
}
func removeContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be checked")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectContent)
if bkt == nil {
// Key does not exist so we return nil
return nil
}
return bkt.Delete([]byte(dgst.String()))
}
func addIngestLease(ctx context.Context, tx *bolt.Tx, ref string) (bool, error) {
lid, ok := leases.FromContext(ctx)
if !ok {
return false, nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return false, errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests)
if err != nil {
return false, err
}
if err := bkt.Put([]byte(ref), nil); err != nil {
return false, err
}
return true, nil
}
func removeIngestLease(ctx context.Context, tx *bolt.Tx, ref string) error {
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be checked")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectIngests)
if bkt == nil {
// Key does not exist so we return nil
return nil
}
return bkt.Delete([]byte(ref))
}