| // Copyright 2012 SocialCode. All rights reserved. |
| // Use of this source code is governed by the MIT |
| // license that can be found in the LICENSE file. |
| |
| package gelf |
| |
| import ( |
| "bytes" |
| "compress/flate" |
| "compress/gzip" |
| "compress/zlib" |
| "crypto/rand" |
| "encoding/json" |
| "fmt" |
| "io" |
| "net" |
| "os" |
| "path" |
| "runtime" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| // Writer implements io.Writer and is used to send both discrete |
| // messages to a graylog2 server, or data from a stream-oriented |
| // interface (like the functions in log). |
| type Writer struct { |
| mu sync.Mutex |
| conn net.Conn |
| hostname string |
| Facility string // defaults to current process name |
| CompressionLevel int // one of the consts from compress/flate |
| CompressionType CompressType |
| } |
| |
| // What compression type the writer should use when sending messages |
| // to the graylog2 server |
| type CompressType int |
| |
| const ( |
| CompressGzip CompressType = iota |
| CompressZlib |
| CompressNone |
| ) |
| |
| // Message represents the contents of the GELF message. It is gzipped |
| // before sending. |
| type Message struct { |
| Version string `json:"version"` |
| Host string `json:"host"` |
| Short string `json:"short_message"` |
| Full string `json:"full_message,omitempty"` |
| TimeUnix float64 `json:"timestamp"` |
| Level int32 `json:"level,omitempty"` |
| Facility string `json:"facility,omitempty"` |
| Extra map[string]interface{} `json:"-"` |
| RawExtra json.RawMessage `json:"-"` |
| } |
| |
| // Used to control GELF chunking. Should be less than (MTU - len(UDP |
| // header)). |
| // |
| // TODO: generate dynamically using Path MTU Discovery? |
| const ( |
| ChunkSize = 1420 |
| chunkedHeaderLen = 12 |
| chunkedDataLen = ChunkSize - chunkedHeaderLen |
| ) |
| |
| var ( |
| magicChunked = []byte{0x1e, 0x0f} |
| magicZlib = []byte{0x78} |
| magicGzip = []byte{0x1f, 0x8b} |
| ) |
| |
| // Syslog severity levels |
| const ( |
| LOG_EMERG = int32(0) |
| LOG_ALERT = int32(1) |
| LOG_CRIT = int32(2) |
| LOG_ERR = int32(3) |
| LOG_WARNING = int32(4) |
| LOG_NOTICE = int32(5) |
| LOG_INFO = int32(6) |
| LOG_DEBUG = int32(7) |
| ) |
| |
| // numChunks returns the number of GELF chunks necessary to transmit |
| // the given compressed buffer. |
| func numChunks(b []byte) int { |
| lenB := len(b) |
| if lenB <= ChunkSize { |
| return 1 |
| } |
| return len(b)/chunkedDataLen + 1 |
| } |
| |
| // New returns a new GELF Writer. This writer can be used to send the |
| // output of the standard Go log functions to a central GELF server by |
| // passing it to log.SetOutput() |
| func NewWriter(addr string) (*Writer, error) { |
| var err error |
| w := new(Writer) |
| w.CompressionLevel = flate.BestSpeed |
| |
| if w.conn, err = net.Dial("udp", addr); err != nil { |
| return nil, err |
| } |
| if w.hostname, err = os.Hostname(); err != nil { |
| return nil, err |
| } |
| |
| w.Facility = path.Base(os.Args[0]) |
| |
| return w, nil |
| } |
| |
| // writes the gzip compressed byte array to the connection as a series |
| // of GELF chunked messages. The header format is documented at |
| // https://github.com/Graylog2/graylog2-docs/wiki/GELF as: |
| // |
| // 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte |
| // total, chunk-data |
| func (w *Writer) writeChunked(zBytes []byte) (err error) { |
| b := make([]byte, 0, ChunkSize) |
| buf := bytes.NewBuffer(b) |
| nChunksI := numChunks(zBytes) |
| if nChunksI > 255 { |
| return fmt.Errorf("msg too large, would need %d chunks", nChunksI) |
| } |
| nChunks := uint8(nChunksI) |
| // use urandom to get a unique message id |
| msgId := make([]byte, 8) |
| n, err := io.ReadFull(rand.Reader, msgId) |
| if err != nil || n != 8 { |
| return fmt.Errorf("rand.Reader: %d/%s", n, err) |
| } |
| |
| bytesLeft := len(zBytes) |
| for i := uint8(0); i < nChunks; i++ { |
| buf.Reset() |
| // manually write header. Don't care about |
| // host/network byte order, because the spec only |
| // deals in individual bytes. |
| buf.Write(magicChunked) //magic |
| buf.Write(msgId) |
| buf.WriteByte(i) |
| buf.WriteByte(nChunks) |
| // slice out our chunk from zBytes |
| chunkLen := chunkedDataLen |
| if chunkLen > bytesLeft { |
| chunkLen = bytesLeft |
| } |
| off := int(i) * chunkedDataLen |
| chunk := zBytes[off : off+chunkLen] |
| buf.Write(chunk) |
| |
| // write this chunk, and make sure the write was good |
| n, err := w.conn.Write(buf.Bytes()) |
| if err != nil { |
| return fmt.Errorf("Write (chunk %d/%d): %s", i, |
| nChunks, err) |
| } |
| if n != len(buf.Bytes()) { |
| return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)", |
| i, nChunks, n, len(buf.Bytes())) |
| } |
| |
| bytesLeft -= chunkLen |
| } |
| |
| if bytesLeft != 0 { |
| return fmt.Errorf("error: %d bytes left after sending", bytesLeft) |
| } |
| return nil |
| } |
| |
| // 1k bytes buffer by default |
| var bufPool = sync.Pool{ |
| New: func() interface{} { |
| return bytes.NewBuffer(make([]byte, 0, 1024)) |
| }, |
| } |
| |
| func newBuffer() *bytes.Buffer { |
| b := bufPool.Get().(*bytes.Buffer) |
| if b != nil { |
| b.Reset() |
| return b |
| } |
| return bytes.NewBuffer(nil) |
| } |
| |
| // WriteMessage sends the specified message to the GELF server |
| // specified in the call to New(). It assumes all the fields are |
| // filled out appropriately. In general, clients will want to use |
| // Write, rather than WriteMessage. |
| func (w *Writer) WriteMessage(m *Message) (err error) { |
| mBuf := newBuffer() |
| defer bufPool.Put(mBuf) |
| if err = m.MarshalJSONBuf(mBuf); err != nil { |
| return err |
| } |
| mBytes := mBuf.Bytes() |
| |
| var ( |
| zBuf *bytes.Buffer |
| zBytes []byte |
| ) |
| |
| var zw io.WriteCloser |
| switch w.CompressionType { |
| case CompressGzip: |
| zBuf = newBuffer() |
| defer bufPool.Put(zBuf) |
| zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) |
| case CompressZlib: |
| zBuf = newBuffer() |
| defer bufPool.Put(zBuf) |
| zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) |
| case CompressNone: |
| zBytes = mBytes |
| default: |
| panic(fmt.Sprintf("unknown compression type %d", |
| w.CompressionType)) |
| } |
| if zw != nil { |
| if err != nil { |
| return |
| } |
| if _, err = zw.Write(mBytes); err != nil { |
| zw.Close() |
| return |
| } |
| zw.Close() |
| zBytes = zBuf.Bytes() |
| } |
| |
| if numChunks(zBytes) > 1 { |
| return w.writeChunked(zBytes) |
| } |
| n, err := w.conn.Write(zBytes) |
| if err != nil { |
| return |
| } |
| if n != len(zBytes) { |
| return fmt.Errorf("bad write (%d/%d)", n, len(zBytes)) |
| } |
| |
| return nil |
| } |
| |
| // Close connection and interrupt blocked Read or Write operations |
| func (w *Writer) Close() error { |
| return w.conn.Close() |
| } |
| |
| /* |
| func (w *Writer) Alert(m string) (err error) |
| func (w *Writer) Close() error |
| func (w *Writer) Crit(m string) (err error) |
| func (w *Writer) Debug(m string) (err error) |
| func (w *Writer) Emerg(m string) (err error) |
| func (w *Writer) Err(m string) (err error) |
| func (w *Writer) Info(m string) (err error) |
| func (w *Writer) Notice(m string) (err error) |
| func (w *Writer) Warning(m string) (err error) |
| */ |
| |
| // getCaller returns the filename and the line info of a function |
| // further down in the call stack. Passing 0 in as callDepth would |
| // return info on the function calling getCallerIgnoringLog, 1 the |
| // parent function, and so on. Any suffixes passed to getCaller are |
| // path fragments like "/pkg/log/log.go", and functions in the call |
| // stack from that file are ignored. |
| func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) { |
| // bump by 1 to ignore the getCaller (this) stackframe |
| callDepth++ |
| outer: |
| for { |
| var ok bool |
| _, file, line, ok = runtime.Caller(callDepth) |
| if !ok { |
| file = "???" |
| line = 0 |
| break |
| } |
| |
| for _, s := range suffixesToIgnore { |
| if strings.HasSuffix(file, s) { |
| callDepth++ |
| continue outer |
| } |
| } |
| break |
| } |
| return |
| } |
| |
| func getCallerIgnoringLogMulti(callDepth int) (string, int) { |
| // the +1 is to ignore this (getCallerIgnoringLogMulti) frame |
| return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go") |
| } |
| |
| // Write encodes the given string in a GELF message and sends it to |
| // the server specified in New(). |
| func (w *Writer) Write(p []byte) (n int, err error) { |
| |
| // 1 for the function that called us. |
| file, line := getCallerIgnoringLogMulti(1) |
| |
| // remove trailing and leading whitespace |
| p = bytes.TrimSpace(p) |
| |
| // If there are newlines in the message, use the first line |
| // for the short message and set the full message to the |
| // original input. If the input has no newlines, stick the |
| // whole thing in Short. |
| short := p |
| full := []byte("") |
| if i := bytes.IndexRune(p, '\n'); i > 0 { |
| short = p[:i] |
| full = p |
| } |
| |
| m := Message{ |
| Version: "1.1", |
| Host: w.hostname, |
| Short: string(short), |
| Full: string(full), |
| TimeUnix: float64(time.Now().Unix()), |
| Level: 6, // info |
| Facility: w.Facility, |
| Extra: map[string]interface{}{ |
| "_file": file, |
| "_line": line, |
| }, |
| } |
| |
| if err = w.WriteMessage(&m); err != nil { |
| return 0, err |
| } |
| |
| return len(p), nil |
| } |
| |
| func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error { |
| b, err := json.Marshal(m) |
| if err != nil { |
| return err |
| } |
| // write up until the final } |
| if _, err = buf.Write(b[:len(b)-1]); err != nil { |
| return err |
| } |
| if len(m.Extra) > 0 { |
| eb, err := json.Marshal(m.Extra) |
| if err != nil { |
| return err |
| } |
| // merge serialized message + serialized extra map |
| if err = buf.WriteByte(','); err != nil { |
| return err |
| } |
| // write serialized extra bytes, without enclosing quotes |
| if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil { |
| return err |
| } |
| } |
| |
| if len(m.RawExtra) > 0 { |
| if err := buf.WriteByte(','); err != nil { |
| return err |
| } |
| |
| // write serialized extra bytes, without enclosing quotes |
| if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil { |
| return err |
| } |
| } |
| |
| // write final closing quotes |
| return buf.WriteByte('}') |
| } |
| |
| func (m *Message) UnmarshalJSON(data []byte) error { |
| i := make(map[string]interface{}, 16) |
| if err := json.Unmarshal(data, &i); err != nil { |
| return err |
| } |
| for k, v := range i { |
| if k[0] == '_' { |
| if m.Extra == nil { |
| m.Extra = make(map[string]interface{}, 1) |
| } |
| m.Extra[k] = v |
| continue |
| } |
| switch k { |
| case "version": |
| m.Version = v.(string) |
| case "host": |
| m.Host = v.(string) |
| case "short_message": |
| m.Short = v.(string) |
| case "full_message": |
| m.Full = v.(string) |
| case "timestamp": |
| m.TimeUnix = v.(float64) |
| case "level": |
| m.Level = int32(v.(float64)) |
| case "facility": |
| m.Facility = v.(string) |
| } |
| } |
| return nil |
| } |