blob: ff719fc7143b777a7c826c6ff6fa50bfe44cf337 [file] [log] [blame]
// Copyright 2012 SocialCode. All rights reserved.
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package gelf
import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync"
)
type Reader struct {
mu sync.Mutex
conn net.Conn
}
func NewReader(addr string) (*Reader, error) {
var err error
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, fmt.Errorf("ListenUDP: %s", err)
}
r := new(Reader)
r.conn = conn
return r, nil
}
func (r *Reader) Addr() string {
return r.conn.LocalAddr().String()
}
// FIXME: this will discard data if p isn't big enough to hold the
// full message.
func (r *Reader) Read(p []byte) (int, error) {
msg, err := r.ReadMessage()
if err != nil {
return -1, err
}
var data string
if msg.Full == "" {
data = msg.Short
} else {
data = msg.Full
}
return strings.NewReader(data).Read(p)
}
func (r *Reader) ReadMessage() (*Message, error) {
cBuf := make([]byte, ChunkSize)
var (
err error
n, length int
cid, ocid []byte
seq, total uint8
cHead []byte
cReader io.Reader
chunks [][]byte
)
for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
if n, err = r.conn.Read(cBuf); err != nil {
return nil, fmt.Errorf("Read: %s", err)
}
cHead, cBuf = cBuf[:2], cBuf[:n]
if bytes.Equal(cHead, magicChunked) {
//fmt.Printf("chunked %v\n", cBuf[:14])
cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
if ocid != nil && !bytes.Equal(cid, ocid) {
return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
} else if ocid == nil {
ocid = cid
chunks = make([][]byte, total)
}
n = len(cBuf) - chunkedHeaderLen
//fmt.Printf("setting chunks[%d]: %d\n", seq, n)
chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
length += n
} else { //not chunked
if total > 0 {
return nil, fmt.Errorf("out-of-band message (not chunked)")
}
break
}
}
//fmt.Printf("\nchunks: %v\n", chunks)
if length > 0 {
if cap(cBuf) < length {
cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
}
cBuf = cBuf[:0]
for i := range chunks {
//fmt.Printf("appending %d %v\n", i, chunks[i])
cBuf = append(cBuf, chunks[i]...)
}
cHead = cBuf[:2]
}
// the data we get from the wire is compressed
if bytes.Equal(cHead, magicGzip) {
cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
} else if cHead[0] == magicZlib[0] &&
(int(cHead[0])*256+int(cHead[1]))%31 == 0 {
// zlib is slightly more complicated, but correct
cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
} else {
// compliance with https://github.com/Graylog2/graylog2-server
// treating all messages as uncompressed if they are not gzip, zlib or
// chunked
cReader = bytes.NewReader(cBuf)
}
if err != nil {
return nil, fmt.Errorf("NewReader: %s", err)
}
msg := new(Message)
if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return msg, nil
}