| package events |
| |
| import ( |
| "fmt" |
| "sync" |
| |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // Broadcaster sends events to multiple, reliable Sinks. The goal of this |
| // component is to dispatch events to configured endpoints. Reliability can be |
| // provided by wrapping incoming sinks. |
| type Broadcaster struct { |
| sinks []Sink |
| events chan Event |
| adds chan configureRequest |
| removes chan configureRequest |
| |
| shutdown chan struct{} |
| closed chan struct{} |
| once sync.Once |
| } |
| |
| // NewBroadcaster appends one or more sinks to the list of sinks. The |
| // broadcaster behavior will be affected by the properties of the sink. |
| // Generally, the sink should accept all messages and deal with reliability on |
| // its own. Use of EventQueue and RetryingSink should be used here. |
| func NewBroadcaster(sinks ...Sink) *Broadcaster { |
| b := Broadcaster{ |
| sinks: sinks, |
| events: make(chan Event), |
| adds: make(chan configureRequest), |
| removes: make(chan configureRequest), |
| shutdown: make(chan struct{}), |
| closed: make(chan struct{}), |
| } |
| |
| // Start the broadcaster |
| go b.run() |
| |
| return &b |
| } |
| |
| // Write accepts an event to be dispatched to all sinks. This method will never |
| // fail and should never block (hopefully!). The caller cedes the memory to the |
| // broadcaster and should not modify it after calling write. |
| func (b *Broadcaster) Write(event Event) error { |
| select { |
| case b.events <- event: |
| case <-b.closed: |
| return ErrSinkClosed |
| } |
| return nil |
| } |
| |
| // Add the sink to the broadcaster. |
| // |
| // The provided sink must be comparable with equality. Typically, this just |
| // works with a regular pointer type. |
| func (b *Broadcaster) Add(sink Sink) error { |
| return b.configure(b.adds, sink) |
| } |
| |
| // Remove the provided sink. |
| func (b *Broadcaster) Remove(sink Sink) error { |
| return b.configure(b.removes, sink) |
| } |
| |
| type configureRequest struct { |
| sink Sink |
| response chan error |
| } |
| |
| func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error { |
| response := make(chan error, 1) |
| |
| for { |
| select { |
| case ch <- configureRequest{ |
| sink: sink, |
| response: response}: |
| ch = nil |
| case err := <-response: |
| return err |
| case <-b.closed: |
| return ErrSinkClosed |
| } |
| } |
| } |
| |
| // Close the broadcaster, ensuring that all messages are flushed to the |
| // underlying sink before returning. |
| func (b *Broadcaster) Close() error { |
| b.once.Do(func() { |
| close(b.shutdown) |
| }) |
| |
| <-b.closed |
| return nil |
| } |
| |
| // run is the main broadcast loop, started when the broadcaster is created. |
| // Under normal conditions, it waits for events on the event channel. After |
| // Close is called, this goroutine will exit. |
| func (b *Broadcaster) run() { |
| defer close(b.closed) |
| remove := func(target Sink) { |
| for i, sink := range b.sinks { |
| if sink == target { |
| b.sinks = append(b.sinks[:i], b.sinks[i+1:]...) |
| break |
| } |
| } |
| } |
| |
| for { |
| select { |
| case event := <-b.events: |
| for _, sink := range b.sinks { |
| if err := sink.Write(event); err != nil { |
| if err == ErrSinkClosed { |
| // remove closed sinks |
| remove(sink) |
| continue |
| } |
| logrus.WithField("event", event).WithField("events.sink", sink).WithError(err). |
| Errorf("broadcaster: dropping event") |
| } |
| } |
| case request := <-b.adds: |
| // while we have to iterate for add/remove, common iteration for |
| // send is faster against slice. |
| |
| var found bool |
| for _, sink := range b.sinks { |
| if request.sink == sink { |
| found = true |
| break |
| } |
| } |
| |
| if !found { |
| b.sinks = append(b.sinks, request.sink) |
| } |
| // b.sinks[request.sink] = struct{}{} |
| request.response <- nil |
| case request := <-b.removes: |
| remove(request.sink) |
| request.response <- nil |
| case <-b.shutdown: |
| // close all the underlying sinks |
| for _, sink := range b.sinks { |
| if err := sink.Close(); err != nil && err != ErrSinkClosed { |
| logrus.WithField("events.sink", sink).WithError(err). |
| Errorf("broadcaster: closing sink failed") |
| } |
| } |
| return |
| } |
| } |
| } |
| |
| func (b *Broadcaster) String() string { |
| // Serialize copy of this broadcaster without the sync.Once, to avoid |
| // a data race. |
| |
| b2 := map[string]interface{}{ |
| "sinks": b.sinks, |
| "events": b.events, |
| "adds": b.adds, |
| "removes": b.removes, |
| |
| "shutdown": b.shutdown, |
| "closed": b.closed, |
| } |
| |
| return fmt.Sprint(b2) |
| } |