| package events |
| |
| import ( |
| "container/list" |
| "sync" |
| |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // Queue accepts all messages into a queue for asynchronous consumption |
| // by a sink. It is unbounded and thread safe but the sink must be reliable or |
| // events will be dropped. |
| type Queue struct { |
| dst Sink |
| events *list.List |
| cond *sync.Cond |
| mu sync.Mutex |
| closed bool |
| } |
| |
| // NewQueue returns a queue to the provided Sink dst. |
| func NewQueue(dst Sink) *Queue { |
| eq := Queue{ |
| dst: dst, |
| events: list.New(), |
| } |
| |
| eq.cond = sync.NewCond(&eq.mu) |
| go eq.run() |
| return &eq |
| } |
| |
| // Write accepts the events into the queue, only failing if the queue has |
| // been closed. |
| func (eq *Queue) Write(event Event) error { |
| eq.mu.Lock() |
| defer eq.mu.Unlock() |
| |
| if eq.closed { |
| return ErrSinkClosed |
| } |
| |
| eq.events.PushBack(event) |
| eq.cond.Signal() // signal waiters |
| |
| return nil |
| } |
| |
| // Close shutsdown the event queue, flushing |
| func (eq *Queue) Close() error { |
| eq.mu.Lock() |
| defer eq.mu.Unlock() |
| |
| if eq.closed { |
| return nil |
| } |
| |
| // set closed flag |
| eq.closed = true |
| eq.cond.Signal() // signal flushes queue |
| eq.cond.Wait() // wait for signal from last flush |
| return eq.dst.Close() |
| } |
| |
| // run is the main goroutine to flush events to the target sink. |
| func (eq *Queue) run() { |
| for { |
| event := eq.next() |
| |
| if event == nil { |
| return // nil block means event queue is closed. |
| } |
| |
| if err := eq.dst.Write(event); err != nil { |
| // TODO(aaronl): Dropping events could be bad depending |
| // on the application. We should have a way of |
| // communicating this condition. However, logging |
| // at a log level above debug may not be appropriate. |
| // Eventually, go-events should not use logrus at all, |
| // and should bubble up conditions like this through |
| // error values. |
| logrus.WithFields(logrus.Fields{ |
| "event": event, |
| "sink": eq.dst, |
| }).WithError(err).Debug("eventqueue: dropped event") |
| } |
| } |
| } |
| |
| // next encompasses the critical section of the run loop. When the queue is |
| // empty, it will block on the condition. If new data arrives, it will wake |
| // and return a block. When closed, a nil slice will be returned. |
| func (eq *Queue) next() Event { |
| eq.mu.Lock() |
| defer eq.mu.Unlock() |
| |
| for eq.events.Len() < 1 { |
| if eq.closed { |
| eq.cond.Broadcast() |
| return nil |
| } |
| |
| eq.cond.Wait() |
| } |
| |
| front := eq.events.Front() |
| block := front.Value.(Event) |
| eq.events.Remove(front) |
| |
| return block |
| } |