| package client |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "io" |
| "sort" |
| "strings" |
| "sync" |
| "time" |
| |
| "golang.org/x/net/context" |
| |
| "github.com/Sirupsen/logrus" |
| Cli "github.com/docker/docker/cli" |
| "github.com/docker/docker/opts" |
| "github.com/docker/docker/pkg/jsonlog" |
| flag "github.com/docker/docker/pkg/mflag" |
| "github.com/docker/engine-api/types" |
| eventtypes "github.com/docker/engine-api/types/events" |
| "github.com/docker/engine-api/types/filters" |
| ) |
| |
| // CmdEvents prints a live stream of real time events from the server. |
| // |
| // Usage: docker events [OPTIONS] |
| func (cli *DockerCli) CmdEvents(args ...string) error { |
| cmd := Cli.Subcmd("events", nil, Cli.DockerCommands["events"].Description, true) |
| since := cmd.String([]string{"-since"}, "", "Show all events created since timestamp") |
| until := cmd.String([]string{"-until"}, "", "Stream events until this timestamp") |
| flFilter := opts.NewListOpts(nil) |
| cmd.Var(&flFilter, []string{"f", "-filter"}, "Filter output based on conditions provided") |
| cmd.Require(flag.Exact, 0) |
| |
| cmd.ParseFlags(args, true) |
| |
| eventFilterArgs := filters.NewArgs() |
| |
| // Consolidate all filter flags, and sanity check them early. |
| // They'll get process in the daemon/server. |
| for _, f := range flFilter.GetAll() { |
| var err error |
| eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs) |
| if err != nil { |
| return err |
| } |
| } |
| |
| options := types.EventsOptions{ |
| Since: *since, |
| Until: *until, |
| Filters: eventFilterArgs, |
| } |
| |
| responseBody, err := cli.client.Events(context.Background(), options) |
| if err != nil { |
| return err |
| } |
| defer responseBody.Close() |
| |
| return streamEvents(responseBody, cli.out) |
| } |
| |
| // streamEvents decodes prints the incoming events in the provided output. |
| func streamEvents(input io.Reader, output io.Writer) error { |
| return decodeEvents(input, func(event eventtypes.Message, err error) error { |
| if err != nil { |
| return err |
| } |
| printOutput(event, output) |
| return nil |
| }) |
| } |
| |
| type eventProcessor func(event eventtypes.Message, err error) error |
| |
| func decodeEvents(input io.Reader, ep eventProcessor) error { |
| dec := json.NewDecoder(input) |
| for { |
| var event eventtypes.Message |
| err := dec.Decode(&event) |
| if err != nil && err == io.EOF { |
| break |
| } |
| |
| if procErr := ep(event, err); procErr != nil { |
| return procErr |
| } |
| } |
| return nil |
| } |
| |
| // printOutput prints all types of event information. |
| // Each output includes the event type, actor id, name and action. |
| // Actor attributes are printed at the end if the actor has any. |
| func printOutput(event eventtypes.Message, output io.Writer) { |
| if event.TimeNano != 0 { |
| fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(jsonlog.RFC3339NanoFixed)) |
| } else if event.Time != 0 { |
| fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(jsonlog.RFC3339NanoFixed)) |
| } |
| |
| fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID) |
| |
| if len(event.Actor.Attributes) > 0 { |
| var attrs []string |
| var keys []string |
| for k := range event.Actor.Attributes { |
| keys = append(keys, k) |
| } |
| sort.Strings(keys) |
| for _, k := range keys { |
| v := event.Actor.Attributes[k] |
| attrs = append(attrs, fmt.Sprintf("%s=%s", k, v)) |
| } |
| fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", ")) |
| } |
| fmt.Fprint(output, "\n") |
| } |
| |
| type eventHandler struct { |
| handlers map[string]func(eventtypes.Message) |
| mu sync.Mutex |
| } |
| |
| func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) { |
| w.mu.Lock() |
| w.handlers[action] = h |
| w.mu.Unlock() |
| } |
| |
| // Watch ranges over the passed in event chan and processes the events based on the |
| // handlers created for a given action. |
| // To stop watching, close the event chan. |
| func (w *eventHandler) Watch(c <-chan eventtypes.Message) { |
| for e := range c { |
| w.mu.Lock() |
| h, exists := w.handlers[e.Action] |
| w.mu.Unlock() |
| if !exists { |
| continue |
| } |
| logrus.Debugf("event handler: received event: %v", e) |
| go h(e) |
| } |
| } |