transport: fix a race that could lead to memory leaks (#2765)
* When a RST_STREAM is received by the server transport, a cleanupStream
item is placed into controlbuf no matter what.
* Updates comments.
* Replaces getCleanupStream with inline struct initialization.
diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go
index 7fbbf59..435092e 100644
--- a/internal/transport/http2_server.go
+++ b/internal/transport/http2_server.go
@@ -437,7 +437,7 @@
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
- t.closeStream(s, true, se.Code, nil, false)
+ t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
@@ -579,7 +579,7 @@
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
- t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
+ t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -604,11 +604,18 @@
}
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
+ if s, ok := t.getStream(f); ok {
+ t.closeStream(s, false, 0, false)
return
}
- t.closeStream(s, false, 0, nil, false)
+ // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
+ t.controlBuf.put(&cleanupStream{
+ streamID: f.Header().StreamID,
+ rst: false,
+ rstCode: 0,
+ onWrite: func() {},
+ })
}
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -772,7 +779,7 @@
if err != nil {
return err
}
- t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
@@ -836,12 +843,12 @@
if err != nil {
return err
}
- t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
- t.closeStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
+ t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
@@ -1012,16 +1019,24 @@
}
// deleteStream deletes the stream s from transport's active streams.
-func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
- t.mu.Lock()
- if _, ok := t.activeStreams[s.id]; !ok {
- t.mu.Unlock()
- return
+func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
+ oldState = s.swapState(streamDone)
+ if oldState == streamDone {
+ // If the stream was already done, return.
+ return oldState
}
- delete(t.activeStreams, s.id)
- if len(t.activeStreams) == 0 {
- t.idle = time.Now()
+ // In case stream sending and receiving are invoked in separate
+ // goroutines (e.g., bi-directional streaming), cancel needs to be
+ // called to interrupt the potential blocking on other goroutines.
+ s.cancel()
+
+ t.mu.Lock()
+ if _, ok := t.activeStreams[s.id]; ok {
+ delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
}
t.mu.Unlock()
@@ -1032,55 +1047,38 @@
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
+
+ return oldState
}
-// closeStream clears the footprint of a stream when the stream is not needed
-// any more.
-func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
- // Mark the stream as done
- oldState := s.swapState(streamDone)
+// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
+func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
+ oldState := t.deleteStream(s, eosReceived)
+ // If the stream is already closed, then don't put trailing header to controlbuf.
+ if oldState == streamDone {
+ return
+ }
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), cancel needs to be
- // called to interrupt the potential blocking on other goroutines.
- s.cancel()
-
- // Deletes the stream from active streams
- t.deleteStream(s, eosReceived)
-
- cleanup := &cleanupStream{
+ hdr.cleanup = &cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {},
}
-
- // No trailer. Puts cleanupFrame into transport's control buffer.
- if hdr == nil {
- t.controlBuf.put(cleanup)
- return
- }
-
- // We do the check here, because of the following scenario:
- // 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
- // is put to control buffer.
- // 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
- // some point. So loopy can't act on trailer
- // 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
- // the result of the received RST_STREAM.
- // If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
- // response to received RST_STREAM into the control buffer and outStream in loopy writer will
- // never get cleaned up.
-
- // If the stream is already done, don't send the trailer.
- if oldState == streamDone {
- return
- }
-
- hdr.cleanup = cleanup
t.controlBuf.put(hdr)
}
+// closeStream clears the footprint of a stream when the stream is not needed any more.
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
+ t.deleteStream(s, eosReceived)
+ t.controlBuf.put(&cleanupStream{
+ streamID: s.id,
+ rst: rst,
+ rstCode: rstCode,
+ onWrite: func() {},
+ })
+}
+
func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
}