blob: 74255ec3bea91958812fd6d6425bfc7c9632bc42 [file] [log] [blame]
package gelf
import (
"bufio"
"encoding/json"
"fmt"
"net"
"time"
)
type TCPReader struct {
listener *net.TCPListener
conn net.Conn
messages chan []byte
}
type connChannels struct {
drop chan string
confirm chan string
}
func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
}
r := &TCPReader{
listener: listener,
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}
closeSignal := make(chan string, 1)
doneSignal := make(chan string, 1)
go r.listenUntilCloseSignal(closeSignal, doneSignal)
return r, closeSignal, doneSignal, nil
}
func (r *TCPReader) accepter(connections chan net.Conn) {
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
connections <- conn
}
}
func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
defer func() { doneSignal <- "done" }()
defer r.listener.Close()
var conns []connChannels
connectionsChannel := make(chan net.Conn, 1)
go r.accepter(connectionsChannel)
for {
select {
case conn := <-connectionsChannel:
dropSignal := make(chan string, 1)
dropConfirm := make(chan string, 1)
channels := connChannels{drop: dropSignal, confirm: dropConfirm}
go handleConnection(conn, r.messages, dropSignal, dropConfirm)
conns = append(conns, channels)
default:
}
select {
case sig := <-closeSignal:
if sig == "stop" || sig == "drop" {
if len(conns) >= 1 {
for _, s := range conns {
if s.drop != nil {
s.drop <- "drop"
<-s.confirm
conns = append(conns[:0], conns[1:]...)
}
}
if sig == "stop" {
return
}
} else if sig == "stop" {
closeSignal <- "stop"
}
if sig == "drop" {
doneSignal <- "done"
}
}
default:
}
}
}
func (r *TCPReader) addr() string {
return r.listener.Addr().String()
}
func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
defer func() { dropConfirm <- "done" }()
defer conn.Close()
reader := bufio.NewReader(conn)
var b []byte
var err error
drop := false
canDrop := false
for {
conn.SetDeadline(time.Now().Add(2 * time.Second))
if b, err = reader.ReadBytes(0); err != nil {
if drop {
return
}
} else if len(b) > 0 {
messages <- b
canDrop = true
if drop {
return
}
} else if drop {
return
}
select {
case sig := <-dropSignal:
if sig == "drop" {
drop = true
time.Sleep(1 * time.Second)
if canDrop {
return
}
}
default:
}
}
}
func (r *TCPReader) readMessage() (*Message, error) {
b := <-r.messages
var msg Message
if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return &msg, nil
}
func (r *TCPReader) Close() {
r.listener.Close()
}