| // Copyright 2012 SocialCode. All rights reserved. |
| // Use of this source code is governed by the MIT |
| // license that can be found in the LICENSE file. |
| |
| package gelf |
| |
| import ( |
| "bytes" |
| "compress/flate" |
| "compress/gzip" |
| "compress/zlib" |
| "crypto/rand" |
| "fmt" |
| "io" |
| "net" |
| "os" |
| "path" |
| "sync" |
| ) |
| |
| type UDPWriter struct { |
| GelfWriter |
| CompressionLevel int // one of the consts from compress/flate |
| CompressionType CompressType |
| } |
| |
| // What compression type the writer should use when sending messages |
| // to the graylog2 server |
| type CompressType int |
| |
| const ( |
| CompressGzip CompressType = iota |
| CompressZlib |
| CompressNone |
| ) |
| |
| // Used to control GELF chunking. Should be less than (MTU - len(UDP |
| // header)). |
| // |
| // TODO: generate dynamically using Path MTU Discovery? |
| const ( |
| ChunkSize = 1420 |
| chunkedHeaderLen = 12 |
| chunkedDataLen = ChunkSize - chunkedHeaderLen |
| ) |
| |
| var ( |
| magicChunked = []byte{0x1e, 0x0f} |
| magicZlib = []byte{0x78} |
| magicGzip = []byte{0x1f, 0x8b} |
| ) |
| |
| // numChunks returns the number of GELF chunks necessary to transmit |
| // the given compressed buffer. |
| func numChunks(b []byte) int { |
| lenB := len(b) |
| if lenB <= ChunkSize { |
| return 1 |
| } |
| return len(b)/chunkedDataLen + 1 |
| } |
| |
| // New returns a new GELF Writer. This writer can be used to send the |
| // output of the standard Go log functions to a central GELF server by |
| // passing it to log.SetOutput() |
| func NewUDPWriter(addr string) (*UDPWriter, error) { |
| var err error |
| w := new(UDPWriter) |
| w.CompressionLevel = flate.BestSpeed |
| |
| if w.conn, err = net.Dial("udp", addr); err != nil { |
| return nil, err |
| } |
| if w.hostname, err = os.Hostname(); err != nil { |
| return nil, err |
| } |
| |
| w.Facility = path.Base(os.Args[0]) |
| |
| return w, nil |
| } |
| |
| // writes the gzip compressed byte array to the connection as a series |
| // of GELF chunked messages. The format is documented at |
| // http://docs.graylog.org/en/2.1/pages/gelf.html as: |
| // |
| // 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte |
| // total, chunk-data |
| func (w *GelfWriter) writeChunked(zBytes []byte) (err error) { |
| b := make([]byte, 0, ChunkSize) |
| buf := bytes.NewBuffer(b) |
| nChunksI := numChunks(zBytes) |
| if nChunksI > 128 { |
| return fmt.Errorf("msg too large, would need %d chunks", nChunksI) |
| } |
| nChunks := uint8(nChunksI) |
| // use urandom to get a unique message id |
| msgId := make([]byte, 8) |
| n, err := io.ReadFull(rand.Reader, msgId) |
| if err != nil || n != 8 { |
| return fmt.Errorf("rand.Reader: %d/%s", n, err) |
| } |
| |
| bytesLeft := len(zBytes) |
| for i := uint8(0); i < nChunks; i++ { |
| buf.Reset() |
| // manually write header. Don't care about |
| // host/network byte order, because the spec only |
| // deals in individual bytes. |
| buf.Write(magicChunked) //magic |
| buf.Write(msgId) |
| buf.WriteByte(i) |
| buf.WriteByte(nChunks) |
| // slice out our chunk from zBytes |
| chunkLen := chunkedDataLen |
| if chunkLen > bytesLeft { |
| chunkLen = bytesLeft |
| } |
| off := int(i) * chunkedDataLen |
| chunk := zBytes[off : off+chunkLen] |
| buf.Write(chunk) |
| |
| // write this chunk, and make sure the write was good |
| n, err := w.conn.Write(buf.Bytes()) |
| if err != nil { |
| return fmt.Errorf("Write (chunk %d/%d): %s", i, |
| nChunks, err) |
| } |
| if n != len(buf.Bytes()) { |
| return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)", |
| i, nChunks, n, len(buf.Bytes())) |
| } |
| |
| bytesLeft -= chunkLen |
| } |
| |
| if bytesLeft != 0 { |
| return fmt.Errorf("error: %d bytes left after sending", bytesLeft) |
| } |
| return nil |
| } |
| |
| // 1k bytes buffer by default |
| var bufPool = sync.Pool{ |
| New: func() interface{} { |
| return bytes.NewBuffer(make([]byte, 0, 1024)) |
| }, |
| } |
| |
| func newBuffer() *bytes.Buffer { |
| b := bufPool.Get().(*bytes.Buffer) |
| if b != nil { |
| b.Reset() |
| return b |
| } |
| return bytes.NewBuffer(nil) |
| } |
| |
| // WriteMessage sends the specified message to the GELF server |
| // specified in the call to New(). It assumes all the fields are |
| // filled out appropriately. In general, clients will want to use |
| // Write, rather than WriteMessage. |
| func (w *UDPWriter) WriteMessage(m *Message) (err error) { |
| mBuf := newBuffer() |
| defer bufPool.Put(mBuf) |
| if err = m.MarshalJSONBuf(mBuf); err != nil { |
| return err |
| } |
| mBytes := mBuf.Bytes() |
| |
| var ( |
| zBuf *bytes.Buffer |
| zBytes []byte |
| ) |
| |
| var zw io.WriteCloser |
| switch w.CompressionType { |
| case CompressGzip: |
| zBuf = newBuffer() |
| defer bufPool.Put(zBuf) |
| zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) |
| case CompressZlib: |
| zBuf = newBuffer() |
| defer bufPool.Put(zBuf) |
| zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) |
| case CompressNone: |
| zBytes = mBytes |
| default: |
| panic(fmt.Sprintf("unknown compression type %d", |
| w.CompressionType)) |
| } |
| if zw != nil { |
| if err != nil { |
| return |
| } |
| if _, err = zw.Write(mBytes); err != nil { |
| zw.Close() |
| return |
| } |
| zw.Close() |
| zBytes = zBuf.Bytes() |
| } |
| |
| if numChunks(zBytes) > 1 { |
| return w.writeChunked(zBytes) |
| } |
| n, err := w.conn.Write(zBytes) |
| if err != nil { |
| return |
| } |
| if n != len(zBytes) { |
| return fmt.Errorf("bad write (%d/%d)", n, len(zBytes)) |
| } |
| |
| return nil |
| } |
| |
| // Write encodes the given string in a GELF message and sends it to |
| // the server specified in New(). |
| func (w *UDPWriter) Write(p []byte) (n int, err error) { |
| // 1 for the function that called us. |
| file, line := getCallerIgnoringLogMulti(1) |
| |
| m := constructMessage(p, w.hostname, w.Facility, file, line) |
| |
| if err = w.WriteMessage(m); err != nil { |
| return 0, err |
| } |
| |
| return len(p), nil |
| } |