| //go:build linux && cgo && !static_build && journald |
| |
| package journald // import "github.com/docker/docker/daemon/logger/journald" |
| |
| import ( |
| "context" |
| "errors" |
| "runtime" |
| "strconv" |
| "sync/atomic" |
| "time" |
| |
| "github.com/containerd/log" |
| "github.com/coreos/go-systemd/v22/journal" |
| "github.com/docker/docker/api/types/backend" |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" |
| ) |
| |
| const closedDrainTimeout = 5 * time.Second |
| |
| // Fields which we know are not user-provided attribute fields. |
| var wellKnownFields = map[string]bool{ |
| "MESSAGE": true, |
| "MESSAGE_ID": true, |
| "PRIORITY": true, |
| "CODE_FILE": true, |
| "CODE_LINE": true, |
| "CODE_FUNC": true, |
| "ERRNO": true, |
| "SYSLOG_FACILITY": true, |
| fieldSyslogIdentifier: true, |
| "SYSLOG_PID": true, |
| fieldSyslogTimestamp: true, |
| fieldContainerName: true, |
| fieldContainerID: true, |
| fieldContainerIDFull: true, |
| fieldContainerTag: true, |
| fieldImageName: true, |
| fieldPLogID: true, |
| fieldPLogOrdinal: true, |
| fieldPLogLast: true, |
| fieldPartialMessage: true, |
| fieldLogEpoch: true, |
| fieldLogOrdinal: true, |
| } |
| |
| type reader struct { |
| s *journald |
| j *sdjournal.Journal |
| logWatcher *logger.LogWatcher |
| config logger.ReadConfig |
| maxOrdinal uint64 |
| initialized bool |
| ready chan struct{} |
| } |
| |
| func getMessage(d map[string]string) (line []byte, ok bool) { |
| m, ok := d["MESSAGE"] |
| if ok { |
| line = []byte(m) |
| if d[fieldPartialMessage] != "true" { |
| line = append(line, "\n"...) |
| } |
| } |
| return line, ok |
| } |
| |
| func getPriority(d map[string]string) (journal.Priority, bool) { |
| if pri, ok := d["PRIORITY"]; ok { |
| i, err := strconv.Atoi(pri) |
| return journal.Priority(i), err == nil |
| } |
| return -1, false |
| } |
| |
| // getSource recovers the stream name from the entry data by mapping from the |
| // journal priority field back to the stream that we would have assigned that |
| // value. |
| func getSource(d map[string]string) string { |
| source := "" |
| if priority, ok := getPriority(d); ok { |
| if priority == journal.PriErr { |
| source = "stderr" |
| } else if priority == journal.PriInfo { |
| source = "stdout" |
| } |
| } |
| return source |
| } |
| |
| func getAttrs(d map[string]string) []backend.LogAttr { |
| var attrs []backend.LogAttr |
| for k, v := range d { |
| if k[0] != '_' && !wellKnownFields[k] { |
| attrs = append(attrs, backend.LogAttr{Key: k, Value: v}) |
| } |
| } |
| return attrs |
| } |
| |
| // errDrainDone is the error returned by drainJournal to signal that there are |
| // no more log entries to send to the log watcher. |
| var errDrainDone = errors.New("journald drain done") |
| |
| // drainJournal reads and sends log messages from the journal. |
| // |
| // drainJournal returns errDrainDone when a terminal stopping condition has been |
| // reached: the watch consumer is gone, a log entry is read which has a |
| // timestamp after until (if until is nonzero), or the log driver is closed and |
| // the last message logged has been sent from the journal. If the end of the |
| // journal is reached without encountering a terminal stopping condition, a nil |
| // error is returned. |
| func (r *reader) drainJournal() error { |
| if !r.initialized { |
| defer func() { |
| r.signalReady() |
| r.initialized = true |
| }() |
| |
| var ( |
| err error |
| seekedToTail bool |
| ) |
| if r.config.Tail >= 0 { |
| if r.config.Until.IsZero() { |
| err = r.j.SeekTail() |
| seekedToTail = true |
| } else { |
| err = r.j.SeekRealtime(r.config.Until) |
| } |
| } else { |
| if r.config.Since.IsZero() { |
| err = r.j.SeekHead() |
| } else { |
| err = r.j.SeekRealtime(r.config.Since) |
| } |
| } |
| if err != nil { |
| return err |
| } |
| |
| // SeekTail() followed by Next() behaves incorrectly, so we need |
| // to work around the bug by ensuring the first discrete |
| // movement of the read pointer is Previous() or PreviousSkip(). |
| // PreviousSkip() is called inside the loop when config.Tail > 0 |
| // so the only special case requiring special handling is |
| // config.Tail == 0. |
| // https://github.com/systemd/systemd/issues/9934 |
| if seekedToTail && r.config.Tail == 0 { |
| // Resolve the read pointer to the last entry in the |
| // journal so that the call to Next() inside the loop |
| // advances past it. |
| if ok, err := r.j.Previous(); err != nil || !ok { |
| return err |
| } |
| } |
| } |
| |
| for i := 0; ; i++ { |
| if !r.initialized && i == 0 && r.config.Tail > 0 { |
| if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 { |
| return err |
| } |
| } else if ok, err := r.j.Next(); err != nil || !ok { |
| return err |
| } |
| |
| if !r.initialized && i == 0 { |
| // The cursor is in a position which will be unaffected |
| // by subsequent logging. |
| r.signalReady() |
| } |
| |
| // Read the entry's timestamp. |
| timestamp, err := r.j.Realtime() |
| if err != nil { |
| return err |
| } |
| // Check if the PreviousSkip went too far back. Check only the |
| // initial position as we are comparing wall-clock timestamps, |
| // which may not be monotonic. We don't want to skip over |
| // messages sent later in time just because the clock moved |
| // backwards. |
| if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) { |
| r.j.SeekRealtime(r.config.Since) |
| continue |
| } |
| if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) { |
| return errDrainDone |
| } |
| |
| // Read and send the logged message, if there is one to read. |
| data, err := r.j.Data() |
| if err != nil { |
| return err |
| } |
| |
| if data[fieldLogEpoch] == r.s.epoch { |
| seq, err := strconv.ParseUint(data[fieldLogOrdinal], 10, 64) |
| if err == nil && seq > r.maxOrdinal { |
| r.maxOrdinal = seq |
| } |
| } |
| |
| if line, ok := getMessage(data); ok { |
| // Send the log message, unless the consumer is gone |
| msg := &logger.Message{ |
| Line: line, |
| Source: getSource(data), |
| Timestamp: timestamp.In(time.UTC), |
| Attrs: getAttrs(data), |
| } |
| // The daemon timestamp will differ from the "trusted" |
| // timestamp of when the event was received by journald. |
| // We can efficiently seek around the journal by the |
| // event timestamp, and the event timestamp is what |
| // journalctl displays. The daemon timestamp is just an |
| // application-supplied field with no special |
| // significance; libsystemd won't help us seek to the |
| // entry with the closest timestamp. |
| /* |
| if sts := data["SYSLOG_TIMESTAMP"]; sts != "" { |
| if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil { |
| msg.Timestamp = tv |
| } |
| } |
| */ |
| select { |
| case <-r.logWatcher.WatchConsumerGone(): |
| return errDrainDone |
| case r.logWatcher.Msg <- msg: |
| } |
| } |
| |
| // Call sd_journal_process() periodically during the processing loop |
| // to close any opened file descriptors for rotated (deleted) journal files. |
| if i != 0 && i%1024 == 0 { |
| if _, err := r.j.Process(); err != nil { |
| // log a warning but ignore it for now |
| log.G(context.TODO()).WithField("container", r.s.vars[fieldContainerIDFull]). |
| WithField("error", err). |
| Warn("journald: error processing journal") |
| } |
| } |
| } |
| } |
| |
| func (r *reader) readJournal() error { |
| caughtUp := atomic.LoadUint64(&r.s.ordinal) |
| if err := r.drainJournal(); err != nil { |
| if err != errDrainDone { |
| return err |
| } |
| return nil |
| } |
| |
| var drainTimeout <-chan time.Time |
| if !r.config.Follow { |
| if r.s.readSyncTimeout == 0 { |
| return nil |
| } |
| tmr := time.NewTimer(r.s.readSyncTimeout) |
| defer tmr.Stop() |
| drainTimeout = tmr.C |
| } |
| |
| for { |
| status, err := r.j.Wait(250 * time.Millisecond) |
| if err != nil { |
| return err |
| } |
| select { |
| case <-r.logWatcher.WatchConsumerGone(): |
| return nil // won't be able to write anything anymore |
| case <-drainTimeout: |
| // Container is gone but we haven't found the end of the |
| // logs within the timeout. Maybe it was dropped by |
| // journald, e.g. due to rate-limiting. |
| return nil |
| case <-r.s.closed: |
| // container is gone, drain journal |
| lastSeq := atomic.LoadUint64(&r.s.ordinal) |
| if r.maxOrdinal >= lastSeq { |
| // All caught up with the logger! |
| return nil |
| } |
| if drainTimeout == nil { |
| tmr := time.NewTimer(closedDrainTimeout) |
| defer tmr.Stop() |
| drainTimeout = tmr.C |
| } |
| default: |
| // container is still alive |
| if status == sdjournal.StatusNOP { |
| // no new data -- keep waiting |
| continue |
| } |
| } |
| err = r.drainJournal() |
| if err != nil { |
| if err != errDrainDone { |
| return err |
| } |
| return nil |
| } |
| if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp { |
| return nil |
| } |
| } |
| } |
| |
| func (r *reader) readLogs() { |
| defer close(r.logWatcher.Msg) |
| |
| // Make sure the ready channel is closed in the event of an early |
| // return. |
| defer r.signalReady() |
| |
| // Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html: |
| // Functions that operate on sd_journal objects are thread |
| // agnostic — given sd_journal pointer may only be used from one |
| // specific thread at all times (and it has to be the very same one |
| // during the entire lifetime of the object), but multiple, |
| // independent threads may use multiple, independent objects safely. |
| // |
| // sdjournal.Open returns a value which wraps an sd_journal pointer so |
| // we need to abide by those rules. |
| runtime.LockOSThread() |
| defer runtime.UnlockOSThread() |
| |
| // Get a handle to the journal. |
| var err error |
| if r.s.journalReadDir != "" { |
| r.j, err = sdjournal.OpenDir(r.s.journalReadDir, 0) |
| } else { |
| r.j, err = sdjournal.Open(0) |
| } |
| if err != nil { |
| r.logWatcher.Err <- err |
| return |
| } |
| defer r.j.Close() |
| |
| if r.config.Follow { |
| // Initialize library inotify watches early |
| if err := r.j.InitializeInotify(); err != nil { |
| r.logWatcher.Err <- err |
| return |
| } |
| } |
| |
| // Remove limits on the size of data items that we'll retrieve. |
| if err := r.j.SetDataThreshold(0); err != nil { |
| r.logWatcher.Err <- err |
| return |
| } |
| // Add a match to have the library do the searching for us. |
| if err := r.j.AddMatch(fieldContainerIDFull, r.s.vars[fieldContainerIDFull]); err != nil { |
| r.logWatcher.Err <- err |
| return |
| } |
| |
| if err := r.readJournal(); err != nil { |
| r.logWatcher.Err <- err |
| return |
| } |
| } |
| |
| func (r *reader) signalReady() { |
| select { |
| case <-r.ready: |
| default: |
| close(r.ready) |
| } |
| } |
| |
| func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { |
| r := &reader{ |
| s: s, |
| logWatcher: logger.NewLogWatcher(), |
| config: config, |
| ready: make(chan struct{}), |
| } |
| go r.readLogs() |
| // Block until the reader is in position to read from the current config |
| // location to prevent race conditions in tests. |
| <-r.ready |
| return r.logWatcher |
| } |
| |
| func waitUntilFlushedImpl(s *journald) error { |
| if s.readSyncTimeout == 0 { |
| return nil |
| } |
| |
| ordinal := atomic.LoadUint64(&s.ordinal) |
| if ordinal == 0 { |
| return nil // No logs were sent; nothing to wait for. |
| } |
| |
| flushed := make(chan error) |
| go func() { |
| defer close(flushed) |
| runtime.LockOSThread() |
| defer runtime.UnlockOSThread() |
| |
| var ( |
| j *sdjournal.Journal |
| err error |
| ) |
| if s.journalReadDir != "" { |
| j, err = sdjournal.OpenDir(s.journalReadDir, 0) |
| } else { |
| j, err = sdjournal.Open(0) |
| } |
| if err != nil { |
| flushed <- err |
| return |
| } |
| defer j.Close() |
| |
| if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil { |
| flushed <- err |
| return |
| } |
| if err := j.AddMatch(fieldLogEpoch, s.epoch); err != nil { |
| flushed <- err |
| return |
| } |
| if err := j.AddMatch(fieldLogOrdinal, strconv.FormatUint(ordinal, 10)); err != nil { |
| flushed <- err |
| return |
| } |
| |
| deadline := time.Now().Add(s.readSyncTimeout) |
| for time.Now().Before(deadline) { |
| if ok, err := j.Next(); ok { |
| // Found it! |
| return |
| } else if err != nil { |
| flushed <- err |
| return |
| } |
| if _, err := j.Wait(100 * time.Millisecond); err != nil { |
| flushed <- err |
| return |
| } |
| } |
| log.G(context.TODO()).WithField("container", s.vars[fieldContainerIDFull]). |
| Warn("journald: deadline exceeded waiting for logs to be committed to journal") |
| }() |
| return <-flushed |
| } |
| |
| func init() { |
| waitUntilFlushed = waitUntilFlushedImpl |
| } |