blob: 3ce212948c2d556310dddea0216637b3c1613dac [file] [log] [blame]
package progress
import (
"context"
"io"
"sort"
"sync"
"time"
"github.com/pkg/errors"
)
// Progress package provides utility functions for using the context to capture
// progress of a running function. All progress items written contain an ID
// that is used to collapse unread messages.
type contextKeyT string
var contextKey = contextKeyT("buildkit/util/progress")
// FromContext returns a progress writer from a context.
func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) {
v := ctx.Value(contextKey)
pw, ok := v.(*progressWriter)
if !ok {
if pw, ok := v.(*MultiWriter); ok {
return pw, true, ctx
}
return &noOpWriter{}, false, ctx
}
pw = newWriter(pw)
for _, o := range opts {
o(pw)
}
ctx = context.WithValue(ctx, contextKey, pw)
return pw, true, ctx
}
type WriterOption func(Writer)
// NewContext returns a new context and a progress reader that captures all
// progress items writtern to this context. Last returned parameter is a closer
// function to signal that no new writes will happen to this context.
func NewContext(ctx context.Context) (Reader, context.Context, func()) {
pr, pw, cancel := pipe()
ctx = WithProgress(ctx, pw)
return pr, ctx, cancel
}
func WithProgress(ctx context.Context, pw Writer) context.Context {
return context.WithValue(ctx, contextKey, pw)
}
func WithMetadata(key string, val interface{}) WriterOption {
return func(w Writer) {
if pw, ok := w.(*progressWriter); ok {
pw.meta[key] = val
}
if pw, ok := w.(*MultiWriter); ok {
pw.meta[key] = val
}
}
}
type Controller interface {
Start(context.Context) (context.Context, func(error))
Status(id string, action string) func()
}
type Writer interface {
Write(id string, value interface{}) error
Close() error
}
type Reader interface {
Read(context.Context) ([]*Progress, error)
}
type Progress struct {
ID string
Timestamp time.Time
Sys interface{}
meta map[string]interface{}
}
type Status struct {
Action string
Current int
Total int
Started *time.Time
Completed *time.Time
}
type progressReader struct {
ctx context.Context
cond *sync.Cond
mu sync.Mutex
writers map[*progressWriter]struct{}
dirty map[string]*Progress
}
func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-done:
case <-ctx.Done():
pr.mu.Lock()
pr.cond.Broadcast()
pr.mu.Unlock()
}
}()
pr.mu.Lock()
for {
select {
case <-ctx.Done():
pr.mu.Unlock()
return nil, ctx.Err()
default:
}
dmap := pr.dirty
if len(dmap) == 0 {
select {
case <-pr.ctx.Done():
if len(pr.writers) == 0 {
pr.mu.Unlock()
return nil, io.EOF
}
default:
}
pr.cond.Wait()
continue
}
pr.dirty = make(map[string]*Progress)
pr.mu.Unlock()
out := make([]*Progress, 0, len(dmap))
for _, p := range dmap {
out = append(out, p)
}
sort.Slice(out, func(i, j int) bool {
return out[i].Timestamp.Before(out[j].Timestamp)
})
return out, nil
}
}
func (pr *progressReader) append(pw *progressWriter) {
pr.mu.Lock()
defer pr.mu.Unlock()
select {
case <-pr.ctx.Done():
return
default:
pr.writers[pw] = struct{}{}
}
}
func pipe() (*progressReader, *progressWriter, func()) {
ctx, cancel := context.WithCancel(context.Background())
pr := &progressReader{
ctx: ctx,
writers: make(map[*progressWriter]struct{}),
dirty: make(map[string]*Progress),
}
pr.cond = sync.NewCond(&pr.mu)
go func() {
<-ctx.Done()
pr.mu.Lock()
pr.cond.Broadcast()
pr.mu.Unlock()
}()
pw := &progressWriter{
reader: pr,
}
return pr, pw, cancel
}
func newWriter(pw *progressWriter) *progressWriter {
meta := make(map[string]interface{})
for k, v := range pw.meta {
meta[k] = v
}
pw = &progressWriter{
reader: pw.reader,
meta: meta,
}
pw.reader.append(pw)
return pw
}
type progressWriter struct {
done bool
reader *progressReader
meta map[string]interface{}
}
func (pw *progressWriter) Write(id string, v interface{}) error {
if pw.done {
return errors.Errorf("writing %s to closed progress writer", id)
}
return pw.writeRawProgress(&Progress{
ID: id,
Timestamp: time.Now(),
Sys: v,
meta: pw.meta,
})
}
func (pw *progressWriter) WriteRawProgress(p *Progress) error {
meta := p.meta
if len(pw.meta) > 0 {
meta = map[string]interface{}{}
for k, v := range p.meta {
meta[k] = v
}
for k, v := range pw.meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
}
p.meta = meta
return pw.writeRawProgress(p)
}
func (pw *progressWriter) writeRawProgress(p *Progress) error {
pw.reader.mu.Lock()
pw.reader.dirty[p.ID] = p
pw.reader.cond.Broadcast()
pw.reader.mu.Unlock()
return nil
}
func (pw *progressWriter) Close() error {
pw.reader.mu.Lock()
delete(pw.reader.writers, pw)
pw.reader.mu.Unlock()
pw.reader.cond.Broadcast()
pw.done = true
return nil
}
func (p *Progress) Meta(key string) (interface{}, bool) {
v, ok := p.meta[key]
return v, ok
}
type noOpWriter struct{}
func (pw *noOpWriter) Write(_ string, _ interface{}) error {
return nil
}
func (pw *noOpWriter) Close() error {
return nil
}