blob: 655f6233ca0aeaecdf6031820b1277f5c69bd0c5 [file] [log] [blame]
package fluent
import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net"
"reflect"
"strconv"
"sync"
"time"
)
const (
defaultHost = "127.0.0.1"
defaultNetwork = "tcp"
defaultSocketPath = ""
defaultPort = 24224
defaultTimeout = 3 * time.Second
defaultBufferLimit = 8 * 1024 * 1024
defaultRetryWait = 500
defaultMaxRetry = 13
defaultReconnectWaitIncreRate = 1.5
)
type Config struct {
FluentPort int `json:"fluent_port"`
FluentHost string `json:"fluent_host"`
FluentNetwork string `json:"fluent_network"`
FluentSocketPath string `json:"fluent_socket_path"`
Timeout time.Duration `json:"timeout"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
TagPrefix string `json:"tag_prefix"`
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
}
type Fluent struct {
Config
mubuff sync.Mutex
pending []byte
muconn sync.Mutex
conn io.WriteCloser
reconnecting bool
}
// New creates a new Logger.
func New(config Config) (f *Fluent, err error) {
if config.FluentNetwork == "" {
config.FluentNetwork = defaultNetwork
}
if config.FluentHost == "" {
config.FluentHost = defaultHost
}
if config.FluentPort == 0 {
config.FluentPort = defaultPort
}
if config.FluentSocketPath == "" {
config.FluentSocketPath = defaultSocketPath
}
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.BufferLimit == 0 {
config.BufferLimit = defaultBufferLimit
}
if config.RetryWait == 0 {
config.RetryWait = defaultRetryWait
}
if config.MaxRetry == 0 {
config.MaxRetry = defaultMaxRetry
}
if config.AsyncConnect {
f = &Fluent{Config: config, reconnecting: true}
go f.reconnect()
} else {
f = &Fluent{Config: config, reconnecting: false}
err = f.connect()
}
return
}
// Post writes the output for a logging event.
//
// Examples:
//
// // send string
// f.Post("tag_name", "data")
//
// // send map[string]
// mapStringData := map[string]string{
// "foo": "bar",
// }
// f.Post("tag_name", mapStringData)
//
// // send message with specified time
// mapStringData := map[string]string{
// "foo": "bar",
// }
// tm := time.Now()
// f.PostWithTime("tag_name", tm, mapStringData)
//
// // send struct
// structData := struct {
// Name string `msg:"name"`
// } {
// "john smith",
// }
// f.Post("tag_name", structData)
//
func (f *Fluent) Post(tag string, message interface{}) error {
timeNow := time.Now()
return f.PostWithTime(tag, timeNow, message)
}
func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
if len(f.TagPrefix) > 0 {
tag = f.TagPrefix + "." + tag
}
msg := reflect.ValueOf(message)
msgtype := msg.Type()
if msgtype.Kind() == reflect.Struct {
// message should be tagged by "codec" or "msg"
kv := make(map[string]interface{})
fields := msgtype.NumField()
for i := 0; i < fields; i++ {
field := msgtype.Field(i)
name := field.Name
if n1 := field.Tag.Get("msg"); n1 != "" {
name = n1
} else if n2 := field.Tag.Get("codec"); n2 != "" {
name = n2
}
kv[name] = msg.FieldByIndex(field.Index).Interface()
}
return f.EncodeAndPostData(tag, tm, kv)
}
if msgtype.Kind() != reflect.Map {
return errors.New("fluent#PostWithTime: message must be a map")
} else if msgtype.Key().Kind() != reflect.String {
return errors.New("fluent#PostWithTime: map keys must be strings")
}
kv := make(map[string]interface{})
for _, k := range msg.MapKeys() {
kv[k.String()] = msg.MapIndex(k).Interface()
}
return f.EncodeAndPostData(tag, tm, kv)
}
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
var data []byte
var err error
if data, err = f.EncodeData(tag, tm, message); err != nil {
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
}
return f.postRawData(data)
}
// Deprecated: Use EncodeAndPostData instead
func (f *Fluent) PostRawData(data []byte) {
f.postRawData(data)
}
func (f *Fluent) postRawData(data []byte) error {
if err := f.appendBuffer(data); err != nil {
return err
}
if err := f.send(); err != nil {
f.close()
return err
}
return nil
}
// For sending forward protocol adopted JSON
type MessageChunk struct {
message Message
}
// Golang default marshaler does not support
// ["value", "value2", {"key":"value"}] style marshaling.
// So, it should write JSON marshaler by hand.
func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
data, err := json.Marshal(chunk.message.Record)
return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
chunk.message.Time, data)), err
}
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
timeUnix := tm.Unix()
if f.Config.MarshalAsJSON {
msg := Message{Tag: tag, Time: timeUnix, Record: message}
chunk := &MessageChunk{message: msg}
data, err = json.Marshal(chunk)
} else {
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
data, err = msg.MarshalMsg(nil)
}
return
}
// Close closes the connection.
func (f *Fluent) Close() (err error) {
if len(f.pending) > 0 {
err = f.send()
}
f.close()
return
}
// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(data []byte) error {
f.mubuff.Lock()
defer f.mubuff.Unlock()
if len(f.pending)+len(data) > f.Config.BufferLimit {
return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
}
f.pending = append(f.pending, data...)
return nil
}
// close closes the connection.
func (f *Fluent) close() {
f.muconn.Lock()
if f.conn != nil {
f.conn.Close()
f.conn = nil
}
f.muconn.Unlock()
}
// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {
f.muconn.Lock()
defer f.muconn.Unlock()
switch f.Config.FluentNetwork {
case "tcp":
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
case "unix":
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
default:
err = net.UnknownNetworkError(f.Config.FluentNetwork)
}
if err == nil {
f.reconnecting = false
}
return
}
func e(x, y float64) int {
return int(math.Pow(x, y))
}
func (f *Fluent) reconnect() {
for i := 0; ; i++ {
err := f.connect()
if err == nil {
f.send()
return
}
if i == f.Config.MaxRetry {
// TODO: What we can do when connection failed MaxRetry times?
panic("fluent#reconnect: failed to reconnect!")
}
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
}
}
func (f *Fluent) send() error {
f.muconn.Lock()
defer f.muconn.Unlock()
if f.conn == nil {
if f.reconnecting == false {
f.reconnecting = true
go f.reconnect()
}
return errors.New("fluent#send: can't send logs, client is reconnecting")
}
f.mubuff.Lock()
defer f.mubuff.Unlock()
var err error
if len(f.pending) > 0 {
_, err = f.conn.Write(f.pending)
if err != nil {
f.conn.Close()
f.conn = nil
} else {
f.pending = f.pending[:0]
}
}
return err
}