Support max age(#1119)
The new logic added to the server does the following:
- Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
- Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
- Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
- Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection after an additional duration of keepalive.Timeout.
diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go
index 20672e4..b915a2a 100644
--- a/keepalive/keepalive.go
+++ b/keepalive/keepalive.go
@@ -50,3 +50,20 @@
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}
+
+// ServerParameters is used to set keepalive and max-age parameters on the server-side.
+type ServerParameters struct {
+ // MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
+ // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
+ MaxConnectionIdle time.Duration
+ // MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
+ // A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
+ MaxConnectionAge time.Duration
+ // MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
+ MaxConnectionAgeGrace time.Duration
+ // After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
+ Time time.Duration
+ // After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
+ // the connection is closed.
+ Timeout time.Duration
+}
diff --git a/server.go b/server.go
index 157f35e..b19a3c4 100644
--- a/server.go
+++ b/server.go
@@ -53,6 +53,7 @@
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
@@ -117,6 +118,7 @@
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@@ -124,6 +126,13 @@
// A ServerOption sets options.
type ServerOption func(*options)
+// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
+func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
+ return func(o *options) {
+ o.keepaliveParams = kp
+ }
+}
+
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
@@ -465,10 +474,11 @@
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
diff --git a/transport/control.go b/transport/control.go
index 0edbe53..64d22f8 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -46,12 +46,17 @@
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
- initialWindowSize = defaultWindowSize // for an RPC
- initialConnWindowSize = defaultWindowSize * 16 // for a connection
- infinity = time.Duration(math.MaxInt64)
- defaultKeepaliveTime = infinity
- defaultKeepaliveTimeout = time.Duration(20 * time.Second)
- defaultMaxStreamsClient = 100
+ initialWindowSize = defaultWindowSize // for an RPC
+ initialConnWindowSize = defaultWindowSize * 16 // for a connection
+ infinity = time.Duration(math.MaxInt64)
+ defaultClientKeepaliveTime = infinity
+ defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
+ defaultMaxStreamsClient = 100
+ defaultMaxConnectionIdle = infinity
+ defaultMaxConnectionAge = infinity
+ defaultMaxConnectionAgeGrace = infinity
+ defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
+ defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
)
// The following defines various control items which could flow through
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 627a590..e68c9fd 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -194,10 +194,10 @@
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
- kp.Time = defaultKeepaliveTime
+ kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 {
- kp.Timeout = defaultKeepaliveTimeout
+ kp.Timeout = defaultClientKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
diff --git a/transport/http2_server.go b/transport/http2_server.go
index f5c590f..e810d19 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -38,9 +38,12 @@
"errors"
"io"
"math"
+ "math/rand"
"net"
"strconv"
"sync"
+ "sync/atomic"
+ "time"
"golang.org/x/net/context"
"golang.org/x/net/http2"
@@ -48,6 +51,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@@ -90,11 +94,22 @@
stats stats.Handler
+ // Flag to keep track of reading activity on transport.
+ // 1 is true and 0 is false.
+ activity uint32 // Accessed atomically.
+ // Keepalive and max-age parameters for the server.
+ kp keepalive.ServerParameters
+
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
+ // idle is the time instant when the connection went idle.
+ // This is either the begining of the connection or when the number of
+ // RPCs go down to 0.
+ // When the connection is busy, this value is set to 0.
+ idle time.Time
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -128,6 +143,24 @@
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
+ kp := config.KeepaliveParams
+ if kp.MaxConnectionIdle == 0 {
+ kp.MaxConnectionIdle = defaultMaxConnectionIdle
+ }
+ if kp.MaxConnectionAge == 0 {
+ kp.MaxConnectionAge = defaultMaxConnectionAge
+ }
+ // Add a jitter to MaxConnectionAge.
+ kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
+ if kp.MaxConnectionAgeGrace == 0 {
+ kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
+ }
+ if kp.Time == 0 {
+ kp.Time = defaultServerKeepaliveTime
+ }
+ if kp.Timeout == 0 {
+ kp.Timeout = defaultServerKeepaliveTimeout
+ }
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
@@ -149,6 +182,8 @@
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
+ kp: kp,
+ idle: time.Now(),
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@@ -159,6 +194,7 @@
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
+ go t.keepalive()
t.writableChan <- 0
return t, nil
}
@@ -248,6 +284,9 @@
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
+ if len(t.activeStreams) == 1 {
+ t.idle = time.Time{}
+ }
t.mu.Unlock()
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
@@ -295,6 +334,7 @@
t.Close()
return
}
+ atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -305,6 +345,7 @@
for {
frame, err := t.framer.readFrame()
+ atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
@@ -735,6 +776,91 @@
}
}
+// keepalive running in a separate goroutine does the following:
+// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
+// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
+// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
+// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
+// after an additional duration of keepalive.Timeout.
+func (t *http2Server) keepalive() {
+ p := &ping{}
+ var pingSent bool
+ maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
+ maxAge := time.NewTimer(t.kp.MaxConnectionAge)
+ keepalive := time.NewTimer(t.kp.Time)
+ // NOTE: All exit paths of this function should reset their
+ // respecitve timers. A failure to do so will cause the
+ // following clean-up to deadlock and eventually leak.
+ defer func() {
+ if !maxIdle.Stop() {
+ <-maxIdle.C
+ }
+ if !maxAge.Stop() {
+ <-maxAge.C
+ }
+ if !keepalive.Stop() {
+ <-keepalive.C
+ }
+ }()
+ for {
+ select {
+ case <-maxIdle.C:
+ t.mu.Lock()
+ idle := t.idle
+ if idle.IsZero() { // The connection is non-idle.
+ t.mu.Unlock()
+ maxIdle.Reset(t.kp.MaxConnectionIdle)
+ continue
+ }
+ val := t.kp.MaxConnectionIdle - time.Since(idle)
+ if val <= 0 {
+ // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
+ // Gracefully close the connection.
+ t.state = draining
+ t.mu.Unlock()
+ t.Drain()
+ // Reseting the timer so that the clean-up doesn't deadlock.
+ maxIdle.Reset(infinity)
+ return
+ }
+ t.mu.Unlock()
+ maxIdle.Reset(val)
+ case <-maxAge.C:
+ t.mu.Lock()
+ t.state = draining
+ t.mu.Unlock()
+ t.Drain()
+ maxAge.Reset(t.kp.MaxConnectionAgeGrace)
+ select {
+ case <-maxAge.C:
+ // Close the connection after grace period.
+ t.Close()
+ // Reseting the timer so that the clean-up doesn't deadlock.
+ maxAge.Reset(infinity)
+ case <-t.shutdownChan:
+ }
+ return
+ case <-keepalive.C:
+ if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
+ pingSent = false
+ keepalive.Reset(t.kp.Time)
+ continue
+ }
+ if pingSent {
+ t.Close()
+ // Reseting the timer so that the clean-up doesn't deadlock.
+ keepalive.Reset(infinity)
+ return
+ }
+ pingSent = true
+ t.controlBuf.put(p)
+ keepalive.Reset(t.kp.Timeout)
+ case <-t.shutdownChan:
+ return
+ }
+ }
+}
+
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
@@ -816,6 +942,9 @@
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
if t.state == draining && len(t.activeStreams) == 0 {
defer t.Close()
}
@@ -845,3 +974,15 @@
func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{})
}
+
+var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+func getJitter(v time.Duration) time.Duration {
+ if v == infinity {
+ return 0
+ }
+ // Generate a jitter between +/- 10% of the value.
+ r := int64(v / 10)
+ j := rgen.Int63n(2*r) - r
+ return time.Duration(j)
+}
diff --git a/transport/transport.go b/transport/transport.go
index beb0a52..51d71e3 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -365,10 +365,11 @@
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
- MaxStreams uint32
- AuthInfo credentials.AuthInfo
- InTapHandle tap.ServerInHandle
- StatsHandler stats.Handler
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+ StatsHandler stats.Handler
+ KeepaliveParams keepalive.ServerParameters
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 382675d..6203878 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -156,7 +156,7 @@
}
// start starts server. Other goroutines should block on s.readyChan for further operations.
-func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
+func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
var err error
if port == 0 {
s.lis, err = net.Listen("tcp", "localhost:0")
@@ -180,10 +180,7 @@
if err != nil {
return
}
- config := &ServerConfig{
- MaxStreams: maxStreams,
- }
- transport, err := NewServerTransport("http2", conn, config)
+ transport, err := NewServerTransport("http2", conn, serverConfig)
if err != nil {
return
}
@@ -252,12 +249,12 @@
}
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
- return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
+ return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
}
-func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
+func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
server := &server{startedErr: make(chan error, 1)}
- go server.start(t, port, maxStreams, ht)
+ go server.start(t, port, serverConfig, ht)
server.wait(t, 2*time.Second)
addr := "localhost:" + server.port
var (
@@ -302,6 +299,136 @@
return tr
}
+// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
+// An idle client is one who doesn't make any RPC calls for a duration of
+// MaxConnectionIdle time.
+func TestMaxConnectionIdle(t *testing.T) {
+ serverConfig := &ServerConfig{
+ KeepaliveParams: keepalive.ServerParameters{
+ MaxConnectionIdle: 2 * time.Second,
+ },
+ }
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer client.Close()
+ stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Client failed to create RPC request: %v", err)
+ }
+ stream.mu.Lock()
+ stream.rstStream = true
+ stream.mu.Unlock()
+ client.CloseStream(stream, nil)
+ // wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
+ timeout := time.NewTimer(time.Second * 4)
+ select {
+ case <-client.GoAway():
+ case <-timeout.C:
+ t.Fatalf("Test timed out, expected a GoAway from the server.")
+ }
+}
+
+// TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
+func TestMaxConnectionIdleNegative(t *testing.T) {
+ serverConfig := &ServerConfig{
+ KeepaliveParams: keepalive.ServerParameters{
+ MaxConnectionIdle: 2 * time.Second,
+ },
+ }
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer client.Close()
+ _, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Client failed to create RPC request: %v", err)
+ }
+ timeout := time.NewTimer(time.Second * 4)
+ select {
+ case <-client.GoAway():
+ t.Fatalf("A non-idle client received a GoAway.")
+ case <-timeout.C:
+ }
+
+}
+
+// TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge.
+func TestMaxConnectionAge(t *testing.T) {
+ serverConfig := &ServerConfig{
+ KeepaliveParams: keepalive.ServerParameters{
+ MaxConnectionAge: 2 * time.Second,
+ },
+ }
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer client.Close()
+ _, err := client.NewStream(context.Background(), &CallHdr{})
+ if err != nil {
+ t.Fatalf("Client failed to create stream: %v", err)
+ }
+ // Wait for max-age logic to send GoAway.
+ timeout := time.NewTimer(4 * time.Second)
+ select {
+ case <-client.GoAway():
+ case <-timeout.C:
+ t.Fatalf("Test timer out, expected a GoAway from the server.")
+ }
+}
+
+// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
+func TestKeepaliveServer(t *testing.T) {
+ serverConfig := &ServerConfig{
+ KeepaliveParams: keepalive.ServerParameters{
+ Time: 2 * time.Second,
+ Timeout: 1 * time.Second,
+ },
+ }
+ server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer c.Close()
+ client, err := net.Dial("tcp", server.lis.Addr().String())
+ if err != nil {
+ t.Fatalf("Failed to dial: %v", err)
+ }
+ defer client.Close()
+ // Set read deadline on client conn so that it doesn't block forever in errorsome cases.
+ client.SetReadDeadline(time.Now().Add(10 * time.Second))
+ // Wait for keepalive logic to close the connection.
+ time.Sleep(4 * time.Second)
+ b := make([]byte, 24)
+ for {
+ _, err = client.Read(b)
+ if err == nil {
+ continue
+ }
+ if err != io.EOF {
+ t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
+ }
+ break
+ }
+}
+
+// TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
+func TestKeepaliveServerNegative(t *testing.T) {
+ serverConfig := &ServerConfig{
+ KeepaliveParams: keepalive.ServerParameters{
+ Time: 2 * time.Second,
+ Timeout: 1 * time.Second,
+ },
+ }
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer client.Close()
+ // Give keepalive logic some time by sleeping.
+ time.Sleep(4 * time.Second)
+ // Assert that client is still active.
+ clientTr := client.(*http2Client)
+ clientTr.mu.Lock()
+ defer clientTr.mu.Unlock()
+ if clientTr.state != reachable {
+ t.Fatalf("Test failed: Expected server-client connection to be healthy.")
+ }
+}
+
func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
@@ -362,7 +489,7 @@
}
defer conn.Close()
// Create a stream.
- _, err := tr.NewStream(context.Background(), &CallHdr{})
+ _, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create a new stream: %v", err)
}
@@ -378,7 +505,7 @@
}
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
- s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
+ s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.