blob: 0bf105f54dff9b22066e8d06be95b7ce73da3d08 [file] [log] [blame]
package events
import (
"sync"
"time"
eventtypes "github.com/docker/docker/api/types/events"
"github.com/docker/docker/pkg/pubsub"
)
const (
eventsLimit = 64
bufferSize = 1024
)
// Events is pubsub channel for events generated by the engine.
type Events struct {
mu sync.Mutex
events []eventtypes.Message
pub *pubsub.Publisher
}
// New returns new *Events instance
func New() *Events {
return &Events{
events: make([]eventtypes.Message, 0, eventsLimit),
pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize),
}
}
// Subscribe adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion), and a function to call
// to stop the stream of events.
func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
eventSubscribers.Inc()
e.mu.Lock()
current := make([]eventtypes.Message, len(e.events))
copy(current, e.events)
l := e.pub.Subscribe()
e.mu.Unlock()
cancel := func() {
e.Evict(l)
}
return current, l, cancel
}
// SubscribeTopic adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion).
func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
eventSubscribers.Inc()
e.mu.Lock()
var topic func(m interface{}) bool
if ef != nil && ef.filter.Len() > 0 {
topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
}
buffered := e.loadBufferedEvents(since, until, topic)
var ch chan interface{}
if topic != nil {
ch = e.pub.SubscribeTopic(topic)
} else {
// Subscribe to all events if there are no filters
ch = e.pub.Subscribe()
}
e.mu.Unlock()
return buffered, ch
}
// Evict evicts listener from pubsub
func (e *Events) Evict(l chan interface{}) {
eventSubscribers.Dec()
e.pub.Evict(l)
}
// Log broadcasts event to listeners. Each listener has 100 millisecond for
// receiving event or it will be skipped.
func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
eventsCounter.Inc()
now := time.Now().UTC()
jm := eventtypes.Message{
Action: action,
Type: eventType,
Actor: actor,
Time: now.Unix(),
TimeNano: now.UnixNano(),
}
// fill deprecated fields for container and images
switch eventType {
case eventtypes.ContainerEventType:
jm.ID = actor.ID
jm.Status = action
jm.From = actor.Attributes["image"]
case eventtypes.ImageEventType:
jm.ID = actor.ID
jm.Status = action
}
e.mu.Lock()
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
e.mu.Unlock()
e.pub.Publish(jm)
}
// SubscribersCount returns number of event listeners
func (e *Events) SubscribersCount() int {
return e.pub.Len()
}
// loadBufferedEvents iterates over the cached events in the buffer
// and returns those that were emitted between two specific dates.
// It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
var buffered []eventtypes.Message
if since.IsZero() && until.IsZero() {
return buffered
}
var sinceNanoUnix int64
if !since.IsZero() {
sinceNanoUnix = since.UnixNano()
}
var untilNanoUnix int64
if !until.IsZero() {
untilNanoUnix = until.UnixNano()
}
for i := len(e.events) - 1; i >= 0; i-- {
ev := e.events[i]
if ev.TimeNano < sinceNanoUnix {
break
}
if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
continue
}
if topic == nil || topic(ev) {
buffered = append([]eventtypes.Message{ev}, buffered...)
}
}
return buffered
}