Support max age(#1119)

The new logic added to the server does the following:
- Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
- Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
- Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
- Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection after an additional duration of keepalive.Timeout.
diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go
index 20672e4..b915a2a 100644
--- a/keepalive/keepalive.go
+++ b/keepalive/keepalive.go
@@ -50,3 +50,20 @@
 	// If true, client runs keepalive checks even with no active RPCs.
 	PermitWithoutStream bool
 }
+
+// ServerParameters is used to set keepalive and max-age parameters on the server-side.
+type ServerParameters struct {
+	// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
+	// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
+	MaxConnectionIdle time.Duration
+	// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
+	// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
+	MaxConnectionAge time.Duration
+	// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
+	MaxConnectionAgeGrace time.Duration
+	// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
+	Time time.Duration
+	// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
+	// the connection is closed.
+	Timeout time.Duration
+}
diff --git a/server.go b/server.go
index 157f35e..b19a3c4 100644
--- a/server.go
+++ b/server.go
@@ -53,6 +53,7 @@
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal"
+	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/stats"
 	"google.golang.org/grpc/tap"
@@ -117,6 +118,7 @@
 	maxConcurrentStreams uint32
 	useHandlerImpl       bool // use http.Handler-based server
 	unknownStreamDesc    *StreamDesc
+	keepaliveParams      keepalive.ServerParameters
 }
 
 var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@@ -124,6 +126,13 @@
 // A ServerOption sets options.
 type ServerOption func(*options)
 
+// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
+func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
+	return func(o *options) {
+		o.keepaliveParams = kp
+	}
+}
+
 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
 func CustomCodec(codec Codec) ServerOption {
 	return func(o *options) {
@@ -465,10 +474,11 @@
 // transport.NewServerTransport).
 func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
 	config := &transport.ServerConfig{
-		MaxStreams:   s.opts.maxConcurrentStreams,
-		AuthInfo:     authInfo,
-		InTapHandle:  s.opts.inTapHandle,
-		StatsHandler: s.opts.statsHandler,
+		MaxStreams:      s.opts.maxConcurrentStreams,
+		AuthInfo:        authInfo,
+		InTapHandle:     s.opts.inTapHandle,
+		StatsHandler:    s.opts.statsHandler,
+		KeepaliveParams: s.opts.keepaliveParams,
 	}
 	st, err := transport.NewServerTransport("http2", c, config)
 	if err != nil {
diff --git a/transport/control.go b/transport/control.go
index 0edbe53..64d22f8 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -46,12 +46,17 @@
 	// The default value of flow control window size in HTTP2 spec.
 	defaultWindowSize = 65535
 	// The initial window size for flow control.
-	initialWindowSize       = defaultWindowSize      // for an RPC
-	initialConnWindowSize   = defaultWindowSize * 16 // for a connection
-	infinity                = time.Duration(math.MaxInt64)
-	defaultKeepaliveTime    = infinity
-	defaultKeepaliveTimeout = time.Duration(20 * time.Second)
-	defaultMaxStreamsClient = 100
+	initialWindowSize             = defaultWindowSize      // for an RPC
+	initialConnWindowSize         = defaultWindowSize * 16 // for a connection
+	infinity                      = time.Duration(math.MaxInt64)
+	defaultClientKeepaliveTime    = infinity
+	defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
+	defaultMaxStreamsClient       = 100
+	defaultMaxConnectionIdle      = infinity
+	defaultMaxConnectionAge       = infinity
+	defaultMaxConnectionAgeGrace  = infinity
+	defaultServerKeepaliveTime    = time.Duration(2 * time.Hour)
+	defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
 )
 
 // The following defines various control items which could flow through
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 627a590..e68c9fd 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -194,10 +194,10 @@
 	kp := opts.KeepaliveParams
 	// Validate keepalive parameters.
 	if kp.Time == 0 {
-		kp.Time = defaultKeepaliveTime
+		kp.Time = defaultClientKeepaliveTime
 	}
 	if kp.Timeout == 0 {
-		kp.Timeout = defaultKeepaliveTimeout
+		kp.Timeout = defaultClientKeepaliveTimeout
 	}
 	var buf bytes.Buffer
 	t := &http2Client{
diff --git a/transport/http2_server.go b/transport/http2_server.go
index f5c590f..e810d19 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -38,9 +38,12 @@
 	"errors"
 	"io"
 	"math"
+	"math/rand"
 	"net"
 	"strconv"
 	"sync"
+	"sync/atomic"
+	"time"
 
 	"golang.org/x/net/context"
 	"golang.org/x/net/http2"
@@ -48,6 +51,7 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/peer"
 	"google.golang.org/grpc/stats"
@@ -90,11 +94,22 @@
 
 	stats stats.Handler
 
+	// Flag to keep track of reading activity on transport.
+	// 1 is true and 0 is false.
+	activity uint32 // Accessed atomically.
+	// Keepalive and max-age parameters for the server.
+	kp keepalive.ServerParameters
+
 	mu            sync.Mutex // guard the following
 	state         transportState
 	activeStreams map[uint32]*Stream
 	// the per-stream outbound flow control window size set by the peer.
 	streamSendQuota uint32
+	// idle is the time instant when the connection went idle.
+	// This is either the begining of the connection or when the number of
+	// RPCs go down to 0.
+	// When the connection is busy, this value is set to 0.
+	idle time.Time
 }
 
 // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -128,6 +143,24 @@
 			return nil, connectionErrorf(true, err, "transport: %v", err)
 		}
 	}
+	kp := config.KeepaliveParams
+	if kp.MaxConnectionIdle == 0 {
+		kp.MaxConnectionIdle = defaultMaxConnectionIdle
+	}
+	if kp.MaxConnectionAge == 0 {
+		kp.MaxConnectionAge = defaultMaxConnectionAge
+	}
+	// Add a jitter to MaxConnectionAge.
+	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
+	if kp.MaxConnectionAgeGrace == 0 {
+		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
+	}
+	if kp.Time == 0 {
+		kp.Time = defaultServerKeepaliveTime
+	}
+	if kp.Timeout == 0 {
+		kp.Timeout = defaultServerKeepaliveTimeout
+	}
 	var buf bytes.Buffer
 	t := &http2Server{
 		ctx:             context.Background(),
@@ -149,6 +182,8 @@
 		activeStreams:   make(map[uint32]*Stream),
 		streamSendQuota: defaultWindowSize,
 		stats:           config.StatsHandler,
+		kp:              kp,
+		idle:            time.Now(),
 	}
 	if t.stats != nil {
 		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@@ -159,6 +194,7 @@
 		t.stats.HandleConn(t.ctx, connBegin)
 	}
 	go t.controller()
+	go t.keepalive()
 	t.writableChan <- 0
 	return t, nil
 }
@@ -248,6 +284,9 @@
 	t.maxStreamID = s.id
 	s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
 	t.activeStreams[s.id] = s
+	if len(t.activeStreams) == 1 {
+		t.idle = time.Time{}
+	}
 	t.mu.Unlock()
 	s.windowHandler = func(n int) {
 		t.updateWindow(s, uint32(n))
@@ -295,6 +334,7 @@
 		t.Close()
 		return
 	}
+	atomic.StoreUint32(&t.activity, 1)
 	sf, ok := frame.(*http2.SettingsFrame)
 	if !ok {
 		grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -305,6 +345,7 @@
 
 	for {
 		frame, err := t.framer.readFrame()
+		atomic.StoreUint32(&t.activity, 1)
 		if err != nil {
 			if se, ok := err.(http2.StreamError); ok {
 				t.mu.Lock()
@@ -735,6 +776,91 @@
 	}
 }
 
+// keepalive running in a separate goroutine does the following:
+// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
+// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
+// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
+// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
+// after an additional duration of keepalive.Timeout.
+func (t *http2Server) keepalive() {
+	p := &ping{}
+	var pingSent bool
+	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
+	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
+	keepalive := time.NewTimer(t.kp.Time)
+	// NOTE: All exit paths of this function should reset their
+	// respecitve timers. A failure to do so will cause the
+	// following clean-up to deadlock and eventually leak.
+	defer func() {
+		if !maxIdle.Stop() {
+			<-maxIdle.C
+		}
+		if !maxAge.Stop() {
+			<-maxAge.C
+		}
+		if !keepalive.Stop() {
+			<-keepalive.C
+		}
+	}()
+	for {
+		select {
+		case <-maxIdle.C:
+			t.mu.Lock()
+			idle := t.idle
+			if idle.IsZero() { // The connection is non-idle.
+				t.mu.Unlock()
+				maxIdle.Reset(t.kp.MaxConnectionIdle)
+				continue
+			}
+			val := t.kp.MaxConnectionIdle - time.Since(idle)
+			if val <= 0 {
+				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
+				// Gracefully close the connection.
+				t.state = draining
+				t.mu.Unlock()
+				t.Drain()
+				// Reseting the timer so that the clean-up doesn't deadlock.
+				maxIdle.Reset(infinity)
+				return
+			}
+			t.mu.Unlock()
+			maxIdle.Reset(val)
+		case <-maxAge.C:
+			t.mu.Lock()
+			t.state = draining
+			t.mu.Unlock()
+			t.Drain()
+			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
+			select {
+			case <-maxAge.C:
+				// Close the connection after grace period.
+				t.Close()
+				// Reseting the timer so that the clean-up doesn't deadlock.
+				maxAge.Reset(infinity)
+			case <-t.shutdownChan:
+			}
+			return
+		case <-keepalive.C:
+			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
+				pingSent = false
+				keepalive.Reset(t.kp.Time)
+				continue
+			}
+			if pingSent {
+				t.Close()
+				// Reseting the timer so that the clean-up doesn't deadlock.
+				keepalive.Reset(infinity)
+				return
+			}
+			pingSent = true
+			t.controlBuf.put(p)
+			keepalive.Reset(t.kp.Timeout)
+		case <-t.shutdownChan:
+			return
+		}
+	}
+}
+
 // controller running in a separate goroutine takes charge of sending control
 // frames (e.g., window update, reset stream, setting, etc.) to the server.
 func (t *http2Server) controller() {
@@ -816,6 +942,9 @@
 func (t *http2Server) closeStream(s *Stream) {
 	t.mu.Lock()
 	delete(t.activeStreams, s.id)
+	if len(t.activeStreams) == 0 {
+		t.idle = time.Now()
+	}
 	if t.state == draining && len(t.activeStreams) == 0 {
 		defer t.Close()
 	}
@@ -845,3 +974,15 @@
 func (t *http2Server) Drain() {
 	t.controlBuf.put(&goAway{})
 }
+
+var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+func getJitter(v time.Duration) time.Duration {
+	if v == infinity {
+		return 0
+	}
+	// Generate a jitter between +/- 10% of the value.
+	r := int64(v / 10)
+	j := rgen.Int63n(2*r) - r
+	return time.Duration(j)
+}
diff --git a/transport/transport.go b/transport/transport.go
index beb0a52..51d71e3 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -365,10 +365,11 @@
 
 // ServerConfig consists of all the configurations to establish a server transport.
 type ServerConfig struct {
-	MaxStreams   uint32
-	AuthInfo     credentials.AuthInfo
-	InTapHandle  tap.ServerInHandle
-	StatsHandler stats.Handler
+	MaxStreams      uint32
+	AuthInfo        credentials.AuthInfo
+	InTapHandle     tap.ServerInHandle
+	StatsHandler    stats.Handler
+	KeepaliveParams keepalive.ServerParameters
 }
 
 // NewServerTransport creates a ServerTransport with conn or non-nil error
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 382675d..6203878 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -156,7 +156,7 @@
 }
 
 // start starts server. Other goroutines should block on s.readyChan for further operations.
-func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
+func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
 	var err error
 	if port == 0 {
 		s.lis, err = net.Listen("tcp", "localhost:0")
@@ -180,10 +180,7 @@
 		if err != nil {
 			return
 		}
-		config := &ServerConfig{
-			MaxStreams: maxStreams,
-		}
-		transport, err := NewServerTransport("http2", conn, config)
+		transport, err := NewServerTransport("http2", conn, serverConfig)
 		if err != nil {
 			return
 		}
@@ -252,12 +249,12 @@
 }
 
 func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
-	return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
+	return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
 }
 
-func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
+func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
 	server := &server{startedErr: make(chan error, 1)}
-	go server.start(t, port, maxStreams, ht)
+	go server.start(t, port, serverConfig, ht)
 	server.wait(t, 2*time.Second)
 	addr := "localhost:" + server.port
 	var (
@@ -302,6 +299,136 @@
 	return tr
 }
 
+// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
+// An idle client is one who doesn't make any RPC calls for a duration of
+// MaxConnectionIdle time.
+func TestMaxConnectionIdle(t *testing.T) {
+	serverConfig := &ServerConfig{
+		KeepaliveParams: keepalive.ServerParameters{
+			MaxConnectionIdle: 2 * time.Second,
+		},
+	}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer client.Close()
+	stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Client failed to create RPC request: %v", err)
+	}
+	stream.mu.Lock()
+	stream.rstStream = true
+	stream.mu.Unlock()
+	client.CloseStream(stream, nil)
+	// wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
+	timeout := time.NewTimer(time.Second * 4)
+	select {
+	case <-client.GoAway():
+	case <-timeout.C:
+		t.Fatalf("Test timed out, expected a GoAway from the server.")
+	}
+}
+
+// TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
+func TestMaxConnectionIdleNegative(t *testing.T) {
+	serverConfig := &ServerConfig{
+		KeepaliveParams: keepalive.ServerParameters{
+			MaxConnectionIdle: 2 * time.Second,
+		},
+	}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer client.Close()
+	_, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Client failed to create RPC request: %v", err)
+	}
+	timeout := time.NewTimer(time.Second * 4)
+	select {
+	case <-client.GoAway():
+		t.Fatalf("A non-idle client received a GoAway.")
+	case <-timeout.C:
+	}
+
+}
+
+// TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge.
+func TestMaxConnectionAge(t *testing.T) {
+	serverConfig := &ServerConfig{
+		KeepaliveParams: keepalive.ServerParameters{
+			MaxConnectionAge: 2 * time.Second,
+		},
+	}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer client.Close()
+	_, err := client.NewStream(context.Background(), &CallHdr{})
+	if err != nil {
+		t.Fatalf("Client failed to create stream: %v", err)
+	}
+	// Wait for max-age logic to send GoAway.
+	timeout := time.NewTimer(4 * time.Second)
+	select {
+	case <-client.GoAway():
+	case <-timeout.C:
+		t.Fatalf("Test timer out, expected a GoAway from the server.")
+	}
+}
+
+// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
+func TestKeepaliveServer(t *testing.T) {
+	serverConfig := &ServerConfig{
+		KeepaliveParams: keepalive.ServerParameters{
+			Time:    2 * time.Second,
+			Timeout: 1 * time.Second,
+		},
+	}
+	server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer c.Close()
+	client, err := net.Dial("tcp", server.lis.Addr().String())
+	if err != nil {
+		t.Fatalf("Failed to dial: %v", err)
+	}
+	defer client.Close()
+	// Set read deadline on client conn so that it doesn't block forever in errorsome cases.
+	client.SetReadDeadline(time.Now().Add(10 * time.Second))
+	// Wait for keepalive logic to close the connection.
+	time.Sleep(4 * time.Second)
+	b := make([]byte, 24)
+	for {
+		_, err = client.Read(b)
+		if err == nil {
+			continue
+		}
+		if err != io.EOF {
+			t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
+		}
+		break
+	}
+}
+
+// TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
+func TestKeepaliveServerNegative(t *testing.T) {
+	serverConfig := &ServerConfig{
+		KeepaliveParams: keepalive.ServerParameters{
+			Time:    2 * time.Second,
+			Timeout: 1 * time.Second,
+		},
+	}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer client.Close()
+	// Give keepalive logic some time by sleeping.
+	time.Sleep(4 * time.Second)
+	// Assert that client is still active.
+	clientTr := client.(*http2Client)
+	clientTr.mu.Lock()
+	defer clientTr.mu.Unlock()
+	if clientTr.state != reachable {
+		t.Fatalf("Test failed: Expected server-client connection to be healthy.")
+	}
+}
+
 func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
 	done := make(chan net.Conn, 1)
 	tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
@@ -362,7 +489,7 @@
 	}
 	defer conn.Close()
 	// Create a stream.
-	_, err := tr.NewStream(context.Background(), &CallHdr{})
+	_, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
 	if err != nil {
 		t.Fatalf("Failed to create a new stream: %v", err)
 	}
@@ -378,7 +505,7 @@
 }
 
 func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
-	s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
+	s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
 		Time:                2 * time.Second, // Keepalive time = 2 sec.
 		Timeout:             1 * time.Second, // Keepalive timeout = 1 sec.
 		PermitWithoutStream: true,            // Run keepalive even with no RPCs.