| package gelf |
| |
| import ( |
| "bufio" |
| "encoding/json" |
| "fmt" |
| "net" |
| "time" |
| ) |
| |
| type TCPReader struct { |
| listener *net.TCPListener |
| conn net.Conn |
| messages chan []byte |
| } |
| |
| type connChannels struct { |
| drop chan string |
| confirm chan string |
| } |
| |
| func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) { |
| var err error |
| tcpAddr, err := net.ResolveTCPAddr("tcp", addr) |
| if err != nil { |
| return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err) |
| } |
| |
| listener, err := net.ListenTCP("tcp", tcpAddr) |
| if err != nil { |
| return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err) |
| } |
| |
| r := &TCPReader{ |
| listener: listener, |
| messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages |
| } |
| |
| closeSignal := make(chan string, 1) |
| doneSignal := make(chan string, 1) |
| |
| go r.listenUntilCloseSignal(closeSignal, doneSignal) |
| |
| return r, closeSignal, doneSignal, nil |
| } |
| |
| func (r *TCPReader) accepter(connections chan net.Conn) { |
| for { |
| conn, err := r.listener.Accept() |
| if err != nil { |
| break |
| } |
| connections <- conn |
| } |
| } |
| |
| func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) { |
| defer func() { doneSignal <- "done" }() |
| defer r.listener.Close() |
| var conns []connChannels |
| connectionsChannel := make(chan net.Conn, 1) |
| go r.accepter(connectionsChannel) |
| for { |
| select { |
| case conn := <-connectionsChannel: |
| dropSignal := make(chan string, 1) |
| dropConfirm := make(chan string, 1) |
| channels := connChannels{drop: dropSignal, confirm: dropConfirm} |
| go handleConnection(conn, r.messages, dropSignal, dropConfirm) |
| conns = append(conns, channels) |
| default: |
| } |
| |
| select { |
| case sig := <-closeSignal: |
| if sig == "stop" || sig == "drop" { |
| if len(conns) >= 1 { |
| for _, s := range conns { |
| if s.drop != nil { |
| s.drop <- "drop" |
| <-s.confirm |
| conns = append(conns[:0], conns[1:]...) |
| } |
| } |
| if sig == "stop" { |
| return |
| } |
| } else if sig == "stop" { |
| closeSignal <- "stop" |
| } |
| if sig == "drop" { |
| doneSignal <- "done" |
| } |
| } |
| default: |
| } |
| } |
| } |
| |
| func (r *TCPReader) addr() string { |
| return r.listener.Addr().String() |
| } |
| |
| func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) { |
| defer func() { dropConfirm <- "done" }() |
| defer conn.Close() |
| reader := bufio.NewReader(conn) |
| |
| var b []byte |
| var err error |
| drop := false |
| canDrop := false |
| |
| for { |
| conn.SetDeadline(time.Now().Add(2 * time.Second)) |
| if b, err = reader.ReadBytes(0); err != nil { |
| if drop { |
| return |
| } |
| } else if len(b) > 0 { |
| messages <- b |
| canDrop = true |
| if drop { |
| return |
| } |
| } else if drop { |
| return |
| } |
| select { |
| case sig := <-dropSignal: |
| if sig == "drop" { |
| drop = true |
| time.Sleep(1 * time.Second) |
| if canDrop { |
| return |
| } |
| } |
| default: |
| } |
| } |
| } |
| |
| func (r *TCPReader) readMessage() (*Message, error) { |
| b := <-r.messages |
| |
| var msg Message |
| if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil { |
| return nil, fmt.Errorf("json.Unmarshal: %s", err) |
| } |
| |
| return &msg, nil |
| } |
| |
| func (r *TCPReader) Close() { |
| r.listener.Close() |
| } |