Merge pull request #35765 from ghislainbourgeois/35613-update-go-gelf-v2-for-bugfix
Update Graylog2/go-gelf vendoring. Fixes #35613
diff --git a/vendor.conf b/vendor.conf
index 9ff5290..87f7930 100644
--- a/vendor.conf
+++ b/vendor.conf
@@ -77,7 +77,7 @@
github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4
# gelf logging driver deps
-github.com/Graylog2/go-gelf v2
+github.com/Graylog2/go-gelf 4143646226541087117ff2f83334ea48b3201841
github.com/fluent/fluent-logger-golang v1.3.0
# fluent-logger-golang deps
diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
index 8f22c9a..74255ec 100644
--- a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
+++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
@@ -5,6 +5,7 @@
"encoding/json"
"fmt"
"net"
+ "time"
)
type TCPReader struct {
@@ -13,16 +14,21 @@
messages chan []byte
}
-func newTCPReader(addr string) (*TCPReader, chan string, error) {
+type connChannels struct {
+ drop chan string
+ confirm chan string
+}
+
+func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
- return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
+ return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
- return nil, nil, fmt.Errorf("ListenTCP: %s", err)
+ return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
}
r := &TCPReader{
@@ -30,26 +36,61 @@
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}
- signal := make(chan string, 1)
+ closeSignal := make(chan string, 1)
+ doneSignal := make(chan string, 1)
- go r.listenUntilCloseSignal(signal)
+ go r.listenUntilCloseSignal(closeSignal, doneSignal)
- return r, signal, nil
+ return r, closeSignal, doneSignal, nil
}
-func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
- defer func() { signal <- "done" }()
- defer r.listener.Close()
+func (r *TCPReader) accepter(connections chan net.Conn) {
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
- go handleConnection(conn, r.messages)
+ connections <- conn
+ }
+}
+
+func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
+ defer func() { doneSignal <- "done" }()
+ defer r.listener.Close()
+ var conns []connChannels
+ connectionsChannel := make(chan net.Conn, 1)
+ go r.accepter(connectionsChannel)
+ for {
select {
- case sig := <-signal:
- if sig == "stop" {
- break
+ case conn := <-connectionsChannel:
+ dropSignal := make(chan string, 1)
+ dropConfirm := make(chan string, 1)
+ channels := connChannels{drop: dropSignal, confirm: dropConfirm}
+ go handleConnection(conn, r.messages, dropSignal, dropConfirm)
+ conns = append(conns, channels)
+ default:
+ }
+
+ select {
+ case sig := <-closeSignal:
+ if sig == "stop" || sig == "drop" {
+ if len(conns) >= 1 {
+ for _, s := range conns {
+ if s.drop != nil {
+ s.drop <- "drop"
+ <-s.confirm
+ conns = append(conns[:0], conns[1:]...)
+ }
+ }
+ if sig == "stop" {
+ return
+ }
+ } else if sig == "stop" {
+ closeSignal <- "stop"
+ }
+ if sig == "drop" {
+ doneSignal <- "done"
+ }
}
default:
}
@@ -60,19 +101,41 @@
return r.listener.Addr().String()
}
-func handleConnection(conn net.Conn, messages chan<- []byte) {
+func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
+ defer func() { dropConfirm <- "done" }()
defer conn.Close()
reader := bufio.NewReader(conn)
var b []byte
var err error
+ drop := false
+ canDrop := false
for {
+ conn.SetDeadline(time.Now().Add(2 * time.Second))
if b, err = reader.ReadBytes(0); err != nil {
- continue
- }
- if len(b) > 0 {
+ if drop {
+ return
+ }
+ } else if len(b) > 0 {
messages <- b
+ canDrop = true
+ if drop {
+ return
+ }
+ } else if drop {
+ return
+ }
+ select {
+ case sig := <-dropSignal:
+ if sig == "drop" {
+ drop = true
+ time.Sleep(1 * time.Second)
+ if canDrop {
+ return
+ }
+ }
+ default:
}
}
}
diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go
index ab95cbc..da1390d 100644
--- a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go
+++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go
@@ -75,12 +75,17 @@
func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) {
var errConn error
+ var i int
w.mu.Lock()
- for i := 0; n <= w.MaxReconnect; i++ {
+ for i = 0; i <= w.MaxReconnect; i++ {
errConn = nil
- n, err = w.conn.Write(zBytes)
+ if w.conn != nil {
+ n, err = w.conn.Write(zBytes)
+ } else {
+ err = fmt.Errorf("Connection was nil, will attempt reconnect")
+ }
if err != nil {
time.Sleep(w.ReconnectDelay * time.Second)
w.conn, errConn = net.Dial("tcp", w.addr)
@@ -90,6 +95,9 @@
}
w.mu.Unlock()
+ if i > w.MaxReconnect {
+ return 0, fmt.Errorf("Maximum reconnection attempts was reached; giving up")
+ }
if errConn != nil {
return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
}
diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go
index 93c3692..153be2c 100644
--- a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go
+++ b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go
@@ -27,5 +27,8 @@
// Close connection and interrupt blocked Read or Write operations
func (w *GelfWriter) Close() error {
+ if w.conn == nil {
+ return nil
+ }
return w.conn.Close()
}