| package broadcaster |
| |
| import ( |
| "errors" |
| "io" |
| "sync" |
| ) |
| |
| // Buffered keeps track of one or more observers watching the progress |
| // of an operation. For example, if multiple clients are trying to pull an |
| // image, they share a Buffered struct for the download operation. |
| type Buffered struct { |
| sync.Mutex |
| // c is a channel that observers block on, waiting for the operation |
| // to finish. |
| c chan struct{} |
| // cond is a condition variable used to wake up observers when there's |
| // new data available. |
| cond *sync.Cond |
| // history is a buffer of the progress output so far, so a new observer |
| // can catch up. The history is stored as a slice of separate byte |
| // slices, so that if the writer is a WriteFlusher, the flushes will |
| // happen in the right places. |
| history [][]byte |
| // wg is a WaitGroup used to wait for all writes to finish on Close |
| wg sync.WaitGroup |
| // result is the argument passed to the first call of Close, and |
| // returned to callers of Wait |
| result error |
| } |
| |
| // NewBuffered returns an initialized Buffered structure. |
| func NewBuffered() *Buffered { |
| b := &Buffered{ |
| c: make(chan struct{}), |
| } |
| b.cond = sync.NewCond(b) |
| return b |
| } |
| |
| // closed returns true if and only if the broadcaster has been closed |
| func (broadcaster *Buffered) closed() bool { |
| select { |
| case <-broadcaster.c: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // receiveWrites runs as a goroutine so that writes don't block the Write |
| // function. It writes the new data in broadcaster.history each time there's |
| // activity on the broadcaster.cond condition variable. |
| func (broadcaster *Buffered) receiveWrites(observer io.Writer) { |
| n := 0 |
| |
| broadcaster.Lock() |
| |
| // The condition variable wait is at the end of this loop, so that the |
| // first iteration will write the history so far. |
| for { |
| newData := broadcaster.history[n:] |
| // Make a copy of newData so we can release the lock |
| sendData := make([][]byte, len(newData), len(newData)) |
| copy(sendData, newData) |
| broadcaster.Unlock() |
| |
| for len(sendData) > 0 { |
| _, err := observer.Write(sendData[0]) |
| if err != nil { |
| broadcaster.wg.Done() |
| return |
| } |
| n++ |
| sendData = sendData[1:] |
| } |
| |
| broadcaster.Lock() |
| |
| // If we are behind, we need to catch up instead of waiting |
| // or handling a closure. |
| if len(broadcaster.history) != n { |
| continue |
| } |
| |
| // detect closure of the broadcast writer |
| if broadcaster.closed() { |
| broadcaster.Unlock() |
| broadcaster.wg.Done() |
| return |
| } |
| |
| broadcaster.cond.Wait() |
| |
| // Mutex is still locked as the loop continues |
| } |
| } |
| |
| // Write adds data to the history buffer, and also writes it to all current |
| // observers. |
| func (broadcaster *Buffered) Write(p []byte) (n int, err error) { |
| broadcaster.Lock() |
| defer broadcaster.Unlock() |
| |
| // Is the broadcaster closed? If so, the write should fail. |
| if broadcaster.closed() { |
| return 0, errors.New("attempted write to a closed broadcaster.Buffered") |
| } |
| |
| // Add message in p to the history slice |
| newEntry := make([]byte, len(p), len(p)) |
| copy(newEntry, p) |
| broadcaster.history = append(broadcaster.history, newEntry) |
| |
| broadcaster.cond.Broadcast() |
| |
| return len(p), nil |
| } |
| |
| // Add adds an observer to the broadcaster. The new observer receives the |
| // data from the history buffer, and also all subsequent data. |
| func (broadcaster *Buffered) Add(w io.Writer) error { |
| // The lock is acquired here so that Add can't race with Close |
| broadcaster.Lock() |
| defer broadcaster.Unlock() |
| |
| if broadcaster.closed() { |
| return errors.New("attempted to add observer to a closed broadcaster.Buffered") |
| } |
| |
| broadcaster.wg.Add(1) |
| go broadcaster.receiveWrites(w) |
| |
| return nil |
| } |
| |
| // CloseWithError signals to all observers that the operation has finished. Its |
| // argument is a result that should be returned to waiters blocking on Wait. |
| func (broadcaster *Buffered) CloseWithError(result error) { |
| broadcaster.Lock() |
| if broadcaster.closed() { |
| broadcaster.Unlock() |
| return |
| } |
| broadcaster.result = result |
| close(broadcaster.c) |
| broadcaster.cond.Broadcast() |
| broadcaster.Unlock() |
| |
| // Don't return until all writers have caught up. |
| broadcaster.wg.Wait() |
| } |
| |
| // Close signals to all observers that the operation has finished. It causes |
| // all calls to Wait to return nil. |
| func (broadcaster *Buffered) Close() { |
| broadcaster.CloseWithError(nil) |
| } |
| |
| // Wait blocks until the operation is marked as completed by the Close method, |
| // and all writer goroutines have completed. It returns the argument that was |
| // passed to Close. |
| func (broadcaster *Buffered) Wait() error { |
| <-broadcaster.c |
| broadcaster.wg.Wait() |
| return broadcaster.result |
| } |