| package events |
| |
| import ( |
| "fmt" |
| "sync" |
| ) |
| |
| // Channel provides a sink that can be listened on. The writer and channel |
| // listener must operate in separate goroutines. |
| // |
| // Consumers should listen on Channel.C until Closed is closed. |
| type Channel struct { |
| C chan Event |
| |
| closed chan struct{} |
| once sync.Once |
| } |
| |
| // NewChannel returns a channel. If buffer is zero, the channel is |
| // unbuffered. |
| func NewChannel(buffer int) *Channel { |
| return &Channel{ |
| C: make(chan Event, buffer), |
| closed: make(chan struct{}), |
| } |
| } |
| |
| // Done returns a channel that will always proceed once the sink is closed. |
| func (ch *Channel) Done() chan struct{} { |
| return ch.closed |
| } |
| |
| // Write the event to the channel. Must be called in a separate goroutine from |
| // the listener. |
| func (ch *Channel) Write(event Event) error { |
| select { |
| case ch.C <- event: |
| return nil |
| case <-ch.closed: |
| return ErrSinkClosed |
| } |
| } |
| |
| // Close the channel sink. |
| func (ch *Channel) Close() error { |
| ch.once.Do(func() { |
| close(ch.closed) |
| }) |
| |
| return nil |
| } |
| |
| func (ch *Channel) String() string { |
| // Serialize a copy of the Channel that doesn't contain the sync.Once, |
| // to avoid a data race. |
| ch2 := map[string]interface{}{ |
| "C": ch.C, |
| "closed": ch.closed, |
| } |
| return fmt.Sprint(ch2) |
| } |