| package events |
| |
| import ( |
| "context" |
| "strings" |
| "time" |
| |
| events "github.com/containerd/containerd/api/services/events/v1" |
| "github.com/containerd/containerd/errdefs" |
| "github.com/containerd/containerd/filters" |
| "github.com/containerd/containerd/identifiers" |
| "github.com/containerd/containerd/log" |
| "github.com/containerd/containerd/namespaces" |
| "github.com/containerd/typeurl" |
| goevents "github.com/docker/go-events" |
| "github.com/gogo/protobuf/types" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // Exchange broadcasts events |
| type Exchange struct { |
| broadcaster *goevents.Broadcaster |
| } |
| |
| // NewExchange returns a new event Exchange |
| func NewExchange() *Exchange { |
| return &Exchange{ |
| broadcaster: goevents.NewBroadcaster(), |
| } |
| } |
| |
| // Forward accepts an envelope to be direcly distributed on the exchange. |
| // |
| // This is useful when an event is forwaded on behalf of another namespace or |
| // when the event is propagated on behalf of another publisher. |
| func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) { |
| if err := validateEnvelope(envelope); err != nil { |
| return err |
| } |
| |
| defer func() { |
| logger := log.G(ctx).WithFields(logrus.Fields{ |
| "topic": envelope.Topic, |
| "ns": envelope.Namespace, |
| "type": envelope.Event.TypeUrl, |
| }) |
| |
| if err != nil { |
| logger.WithError(err).Error("error forwarding event") |
| } else { |
| logger.Debug("event forwarded") |
| } |
| }() |
| |
| return e.broadcaster.Write(envelope) |
| } |
| |
| // Publish packages and sends an event. The caller will be considered the |
| // initial publisher of the event. This means the timestamp will be calculated |
| // at this point and this method may read from the calling context. |
| func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) { |
| var ( |
| namespace string |
| encoded *types.Any |
| envelope events.Envelope |
| ) |
| |
| namespace, err = namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return errors.Wrapf(err, "failed publishing event") |
| } |
| if err := validateTopic(topic); err != nil { |
| return errors.Wrapf(err, "envelope topic %q", topic) |
| } |
| |
| encoded, err = typeurl.MarshalAny(event) |
| if err != nil { |
| return err |
| } |
| |
| envelope.Timestamp = time.Now().UTC() |
| envelope.Namespace = namespace |
| envelope.Topic = topic |
| envelope.Event = encoded |
| |
| defer func() { |
| logger := log.G(ctx).WithFields(logrus.Fields{ |
| "topic": envelope.Topic, |
| "ns": envelope.Namespace, |
| "type": envelope.Event.TypeUrl, |
| }) |
| |
| if err != nil { |
| logger.WithError(err).Error("error publishing event") |
| } else { |
| logger.Debug("event published") |
| } |
| }() |
| |
| return e.broadcaster.Write(&envelope) |
| } |
| |
| // Subscribe to events on the exchange. Events are sent through the returned |
| // channel ch. If an error is encountered, it will be sent on channel errs and |
| // errs will be closed. To end the subscription, cancel the provided context. |
| // |
| // Zero or more filters may be provided as strings. Only events that match |
| // *any* of the provided filters will be sent on the channel. The filters use |
| // the standard containerd filters package syntax. |
| func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) { |
| var ( |
| evch = make(chan *events.Envelope) |
| errq = make(chan error, 1) |
| channel = goevents.NewChannel(0) |
| queue = goevents.NewQueue(channel) |
| dst goevents.Sink = queue |
| ) |
| |
| closeAll := func() { |
| defer close(errq) |
| defer e.broadcaster.Remove(dst) |
| defer queue.Close() |
| defer channel.Close() |
| } |
| |
| ch = evch |
| errs = errq |
| |
| if len(fs) > 0 { |
| filter, err := filters.ParseAll(fs...) |
| if err != nil { |
| errq <- errors.Wrapf(err, "failed parsing subscription filters") |
| closeAll() |
| return |
| } |
| |
| dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool { |
| return filter.Match(adapt(gev)) |
| })) |
| } |
| |
| e.broadcaster.Add(dst) |
| |
| go func() { |
| defer closeAll() |
| |
| var err error |
| loop: |
| for { |
| select { |
| case ev := <-channel.C: |
| env, ok := ev.(*events.Envelope) |
| if !ok { |
| // TODO(stevvooe): For the most part, we are well protected |
| // from this condition. Both Forward and Publish protect |
| // from this. |
| err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev) |
| break |
| } |
| |
| select { |
| case evch <- env: |
| case <-ctx.Done(): |
| break loop |
| } |
| case <-ctx.Done(): |
| break loop |
| } |
| } |
| |
| if err == nil { |
| if cerr := ctx.Err(); cerr != context.Canceled { |
| err = cerr |
| } |
| } |
| |
| errq <- err |
| }() |
| |
| return |
| } |
| |
| func validateTopic(topic string) error { |
| if topic == "" { |
| return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty") |
| } |
| |
| if topic[0] != '/' { |
| return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'") |
| } |
| |
| if len(topic) == 1 { |
| return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component") |
| } |
| |
| components := strings.Split(topic[1:], "/") |
| for _, component := range components { |
| if err := identifiers.Validate(component); err != nil { |
| return errors.Wrapf(err, "failed validation on component %q", component) |
| } |
| } |
| |
| return nil |
| } |
| |
| func validateEnvelope(envelope *events.Envelope) error { |
| if err := namespaces.Validate(envelope.Namespace); err != nil { |
| return errors.Wrapf(err, "event envelope has invalid namespace") |
| } |
| |
| if err := validateTopic(envelope.Topic); err != nil { |
| return errors.Wrapf(err, "envelope topic %q", envelope.Topic) |
| } |
| |
| if envelope.Timestamp.IsZero() { |
| return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event") |
| } |
| |
| return nil |
| } |
| |
| func adapt(ev interface{}) filters.Adaptor { |
| if adaptor, ok := ev.(filters.Adaptor); ok { |
| return adaptor |
| } |
| |
| return filters.AdapterFunc(func(fieldpath []string) (string, bool) { |
| return "", false |
| }) |
| } |