Merge pull request #35765 from ghislainbourgeois/35613-update-go-gelf-v2-for-bugfix

Update Graylog2/go-gelf vendoring. Fixes #35613
diff --git a/vendor.conf b/vendor.conf
index 9ff5290..87f7930 100644
--- a/vendor.conf
+++ b/vendor.conf
@@ -77,7 +77,7 @@
 github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4
 
 # gelf logging driver deps
-github.com/Graylog2/go-gelf v2
+github.com/Graylog2/go-gelf 4143646226541087117ff2f83334ea48b3201841
 
 github.com/fluent/fluent-logger-golang v1.3.0
 # fluent-logger-golang deps
diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
index 8f22c9a..74255ec 100644
--- a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
+++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
@@ -5,6 +5,7 @@
 	"encoding/json"
 	"fmt"
 	"net"
+	"time"
 )
 
 type TCPReader struct {
@@ -13,16 +14,21 @@
 	messages chan []byte
 }
 
-func newTCPReader(addr string) (*TCPReader, chan string, error) {
+type connChannels struct {
+	drop    chan string
+	confirm chan string
+}
+
+func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
 	var err error
 	tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
 	if err != nil {
-		return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
+		return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
 	}
 
 	listener, err := net.ListenTCP("tcp", tcpAddr)
 	if err != nil {
-		return nil, nil, fmt.Errorf("ListenTCP: %s", err)
+		return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
 	}
 
 	r := &TCPReader{
@@ -30,26 +36,61 @@
 		messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
 	}
 
-	signal := make(chan string, 1)
+	closeSignal := make(chan string, 1)
+	doneSignal := make(chan string, 1)
 
-	go r.listenUntilCloseSignal(signal)
+	go r.listenUntilCloseSignal(closeSignal, doneSignal)
 
-	return r, signal, nil
+	return r, closeSignal, doneSignal, nil
 }
 
-func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
-	defer func() { signal <- "done" }()
-	defer r.listener.Close()
+func (r *TCPReader) accepter(connections chan net.Conn) {
 	for {
 		conn, err := r.listener.Accept()
 		if err != nil {
 			break
 		}
-		go handleConnection(conn, r.messages)
+		connections <- conn
+	}
+}
+
+func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
+	defer func() { doneSignal <- "done" }()
+	defer r.listener.Close()
+	var conns []connChannels
+	connectionsChannel := make(chan net.Conn, 1)
+	go r.accepter(connectionsChannel)
+	for {
 		select {
-		case sig := <-signal:
-			if sig == "stop" {
-				break
+		case conn := <-connectionsChannel:
+			dropSignal := make(chan string, 1)
+			dropConfirm := make(chan string, 1)
+			channels := connChannels{drop: dropSignal, confirm: dropConfirm}
+			go handleConnection(conn, r.messages, dropSignal, dropConfirm)
+			conns = append(conns, channels)
+		default:
+		}
+
+		select {
+		case sig := <-closeSignal:
+			if sig == "stop" || sig == "drop" {
+				if len(conns) >= 1 {
+					for _, s := range conns {
+						if s.drop != nil {
+							s.drop <- "drop"
+							<-s.confirm
+							conns = append(conns[:0], conns[1:]...)
+						}
+					}
+					if sig == "stop" {
+						return
+					}
+				} else if sig == "stop" {
+					closeSignal <- "stop"
+				}
+				if sig == "drop" {
+					doneSignal <- "done"
+				}
 			}
 		default:
 		}
@@ -60,19 +101,41 @@
 	return r.listener.Addr().String()
 }
 
-func handleConnection(conn net.Conn, messages chan<- []byte) {
+func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
+	defer func() { dropConfirm <- "done" }()
 	defer conn.Close()
 	reader := bufio.NewReader(conn)
 
 	var b []byte
 	var err error
+	drop := false
+	canDrop := false
 
 	for {
+		conn.SetDeadline(time.Now().Add(2 * time.Second))
 		if b, err = reader.ReadBytes(0); err != nil {
-			continue
-		}
-		if len(b) > 0 {
+			if drop {
+				return
+			}
+		} else if len(b) > 0 {
 			messages <- b
+			canDrop = true
+			if drop {
+				return
+			}
+		} else if drop {
+			return
+		}
+		select {
+		case sig := <-dropSignal:
+			if sig == "drop" {
+				drop = true
+				time.Sleep(1 * time.Second)
+				if canDrop {
+					return
+				}
+			}
+		default:
 		}
 	}
 }
diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go
index ab95cbc..da1390d 100644
--- a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go
+++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go
@@ -75,12 +75,17 @@
 
 func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) {
 	var errConn error
+	var i int
 
 	w.mu.Lock()
-	for i := 0; n <= w.MaxReconnect; i++ {
+	for i = 0; i <= w.MaxReconnect; i++ {
 		errConn = nil
 
-		n, err = w.conn.Write(zBytes)
+		if w.conn != nil {
+			n, err = w.conn.Write(zBytes)
+		} else {
+			err = fmt.Errorf("Connection was nil, will attempt reconnect")
+		}
 		if err != nil {
 			time.Sleep(w.ReconnectDelay * time.Second)
 			w.conn, errConn = net.Dial("tcp", w.addr)
@@ -90,6 +95,9 @@
 	}
 	w.mu.Unlock()
 
+	if i > w.MaxReconnect {
+		return 0, fmt.Errorf("Maximum reconnection attempts was reached; giving up")
+	}
 	if errConn != nil {
 		return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
 	}
diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go
index 93c3692..153be2c 100644
--- a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go
+++ b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go
@@ -27,5 +27,8 @@
 
 // Close connection and interrupt blocked Read or Write operations
 func (w *GelfWriter) Close() error {
+	if w.conn == nil {
+		return nil
+	}
 	return w.conn.Close()
 }