blob: f16a06998d200d476ef3fda582c8c81782abf788 [file] [log] [blame]
package pullprogress
import (
"context"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/progress"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type PullManager interface {
content.IngestManager
content.Manager
}
type ProviderWithProgress struct {
Provider content.Provider
Manager PullManager
}
func (p *ProviderWithProgress) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
ra, err := p.Provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, p.Manager, doneCh)
return readerAtWithCancel{ReaderAt: ra, cancel: cancel, doneCh: doneCh}, nil
}
type readerAtWithCancel struct {
content.ReaderAt
cancel func()
doneCh <-chan struct{}
}
func (ra readerAtWithCancel) Close() error {
ra.cancel()
select {
case <-ra.doneCh:
case <-time.After(time.Second):
logrus.Warn("timeout waiting for pull progress to complete")
}
return ra.ReaderAt.Close()
}
type FetcherWithProgress struct {
Fetcher remotes.Fetcher
Manager PullManager
}
func (f *FetcherWithProgress) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, f.Manager, doneCh)
return readerWithCancel{ReadCloser: rc, cancel: cancel, doneCh: doneCh}, nil
}
type readerWithCancel struct {
io.ReadCloser
cancel func()
doneCh <-chan struct{}
}
func (r readerWithCancel) Close() error {
r.cancel()
select {
case <-r.doneCh:
case <-time.After(time.Second):
logrus.Warn("timeout waiting for pull progress to complete")
}
return r.ReadCloser.Close()
}
func trackProgress(ctx context.Context, desc ocispec.Descriptor, manager PullManager, doneCh chan<- struct{}) {
defer close(doneCh)
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
go func() {
<-ctx.Done()
ticker.Stop()
}()
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for !onFinalStatus {
select {
case <-ctx.Done():
onFinalStatus = true
case <-ticker.C:
}
status, err := manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, errdefs.ErrNotFound) {
logrus.Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := manager.Info(ctx, desc.Digest)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
}
}