blob: 37b77bb539c40e0e23a02e28776551d459a07e98 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This file contains tests related to the following proposals:
// https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
// https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md
// https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md
package transport
import (
"context"
"io"
"net"
"testing"
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/keepalive"
)
// TestMaxConnectionIdle tests that a server will send GoAway to an idle
// client. An idle client is one who doesn't make any RPC calls for a duration
// of MaxConnectionIdle time.
func (s) TestMaxConnectionIdle(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 2 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
server.stop()
cancel()
}()
stream, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
client.CloseStream(stream, io.EOF)
// Wait for the server's MaxConnectionIdle timeout to kick in, and for it
// to send a GoAway.
timeout := time.NewTimer(time.Second * 4)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayNoReason {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
}
case <-timeout.C:
t.Fatalf("MaxConnectionIdle timeout expired, expected a GoAway from the server.")
}
}
// TestMaxConenctionIdleBusyClient tests that a server will not send GoAway to
// a busy client.
func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 2 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
server.stop()
cancel()
}()
_, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
// Wait for double the MaxConnectionIdle time to make sure the server does
// not send a GoAway, as the client has an open stream.
timeout := time.NewTimer(time.Second * 4)
select {
case <-client.GoAway():
if !timeout.Stop() {
<-timeout.C
}
t.Fatalf("A non-idle client received a GoAway.")
case <-timeout.C:
}
}
// TestMaxConnectionAge tests that a server will send GoAway after a duration
// of MaxConnectionAge.
func (s) TestMaxConnectionAge(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionAge: 1 * time.Second,
MaxConnectionAgeGrace: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
server.stop()
cancel()
}()
_, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
// Wait for the server's MaxConnectionAge timeout to kick in, and for it
// to send a GoAway.
timeout := time.NewTimer(4 * time.Second)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayNoReason {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
}
case <-timeout.C:
t.Fatalf("MaxConnectionAge timeout expired, expected a GoAway from the server.")
}
}
const (
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
// TestKeepaliveServerClosesUnresponsiveClient tests that a server closes
// the connection with a client that doesn't respond to keepalive pings.
//
// This test creates a regular net.Conn connection to the server and sends the
// clientPreface and the initial Settings frame, and then remains unresponsive.
func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
server.stop()
cancel()
}()
addr := server.addr()
conn, err := net.Dial("tcp", addr)
if err != nil {
t.Fatalf("net.Dial(tcp, %v) failed: %v", addr, err)
}
defer conn.Close()
if n, err := conn.Write(clientPreface); err != nil || n != len(clientPreface) {
t.Fatalf("conn.Write(clientPreface) failed: n=%v, err=%v", n, err)
}
framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, 0)
if err := framer.fr.WriteSettings(http2.Setting{}); err != nil {
t.Fatal("framer.WriteSettings(http2.Setting{}) failed:", err)
}
framer.writer.Flush()
// We read from the net.Conn till we get an error, which is expected when
// the server closes the connection as part of the keepalive logic.
errCh := make(chan error)
go func() {
b := make([]byte, 24)
for {
if _, err = conn.Read(b); err != nil {
errCh <- err
return
}
}
}()
// Server waits for KeepaliveParams.Time seconds before sending out a ping,
// and then waits for KeepaliveParams.Timeout for a ping ack.
timeout := time.NewTimer(4 * time.Second)
select {
case err := <-errCh:
if err != io.EOF {
t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
}
case <-timeout.C:
t.Fatalf("keepalive timeout expired, server should have closed the connection.")
}
}
// TestKeepaliveServerWithResponsiveClient tests that a server doesn't close
// the connection with a client that responds to keepalive pings.
func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
server.stop()
cancel()
}()
// Give keepalive logic some time by sleeping.
time.Sleep(4 * time.Second)
// Make sure the client transport is healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
}
// TestKeepaliveClientClosesUnresponsiveServer creates a server which does not
// respond to keepalive pings, and makes sure that the client closes the
// transport once the keepalive logic kicks in. Here, we set the
// `PermitWithoutStream` parameter to true which ensures that the keepalive
// logic is running even without any active streams.
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
}}, connCh)
defer cancel()
defer client.Close()
conn, ok := <-connCh
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Sleep for keepalive to close the connection.
time.Sleep(4 * time.Second)
// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}
// TestKeepaliveClientOpenWithUnresponsiveServer creates a server which does
// not respond to keepalive pings, and makes sure that the client does not
// close the transport. Here, we do not set the `PermitWithoutStream` parameter
// to true which ensures that the keepalive logic is turned off without any
// active streams, and therefore the transport stays open.
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
conn, ok := <-connCh
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Make sure the client transport is healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
}
// TestKeepaliveClientClosesWithActiveStreams creates a server which does not
// respond to keepalive pings, and makes sure that the client closes the
// transport even when there is an active stream.
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
conn, ok := <-connCh
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Create a stream, but send no data on it.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}
// TestKeepaliveClientStaysHealthyWithResponsiveServer creates a server which
// responds to keepalive pings, and makes sure than a client transport stays
// healthy without any active streams.
func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, normal, ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
}})
defer func() {
client.Close()
server.stop()
cancel()
}()
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Make sure the client transport is healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
}
// TestKeepaliveClientFrequency creates a server which expects at most 1 client
// ping for every 1.2 seconds, while the client is configured to send a ping
// every 1 second. So, this configuration should end up with the client
// transport being closed. But we had a bug wherein the client was sending one
// ping every [Time+Timeout] instead of every [Time] period, and this test
// explicitly makes sure the fix works and the client sends a ping every [Time]
// period.
func (s) TestKeepaliveClientFrequency(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 1200 * time.Millisecond, // 1.2 seconds
PermitWithoutStream: true,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 2 * time.Second,
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()
timeout := time.NewTimer(6 * time.Second)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
t.Fatalf("client transport still healthy; expected GoAway from the server.")
}
// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}
// TestKeepaliveServerEnforcementWithAbusiveClientNoRPC verifies that the
// server closes a client transport when it sends too many keepalive pings
// (when there are no active streams), based on the configured
// EnforcementPolicy.
func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 2 * time.Second,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()
timeout := time.NewTimer(4 * time.Second)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
t.Fatalf("client transport still healthy; expected GoAway from the server.")
}
// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}
// TestKeepaliveServerEnforcementWithAbusiveClientWithRPC verifies that the
// server closes a client transport when it sends too many keepalive pings
// (even when there is an active stream), based on the configured
// EnforcementPolicy.
func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 2 * time.Second,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
timeout := time.NewTimer(4 * time.Second)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
t.Fatalf("client transport still healthy; expected GoAway from the server.")
}
// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}
// TestKeepaliveServerEnforcementWithObeyingClientNoRPC verifies that the
// server does not close a client transport (with no active streams) which
// sends keepalive pings in accordance to the configured keepalive
// EnforcementPolicy.
func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
PermitWithoutStream: true,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 101 * time.Millisecond,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()
// Give keepalive enough time.
time.Sleep(3 * time.Second)
// Make sure the client transport is healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
}
// TestKeepaliveServerEnforcementWithObeyingClientWithRPC verifies that the
// server does not close a client transport (with active streams) which
// sends keepalive pings in accordance to the configured keepalive
// EnforcementPolicy.
func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 101 * time.Millisecond,
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
// Give keepalive enough time.
time.Sleep(3 * time.Second)
// Make sure the client transport is healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
}
// TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient verifies that the
// server does not closes a client transport, which has been configured to send
// more pings than allowed by the server's EnforcementPolicy. This client
// transport does not have any active streams and `PermitWithoutStream` is set
// to false. This should ensure that the keepalive functionality on the client
// side enters a dormant state.
func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 2 * time.Second,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()
// No active streams on the client. Give keepalive enough time.
time.Sleep(5 * time.Second)
// Make sure the client transport is healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
}
// TestTCPUserTimeout tests that the TCP_USER_TIMEOUT socket option is set to
// the keepalive timeout, as detailed in proposal A18.
func (s) TestTCPUserTimeout(t *testing.T) {
tests := []struct {
time time.Duration
timeout time.Duration
wantTimeout time.Duration
}{
{
10 * time.Second,
10 * time.Second,
10 * 1000 * time.Millisecond,
},
{
0,
0,
0,
},
}
for _, tt := range tests {
server, client, cancel := setUpWithOptions(
t,
0,
&ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: tt.timeout,
Timeout: tt.timeout,
},
},
normal,
ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: tt.time,
Timeout: tt.timeout,
},
},
)
defer func() {
client.Close()
server.stop()
cancel()
}()
stream, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("client.NewStream() failed: %v", err)
}
client.CloseStream(stream, io.EOF)
opt, err := syscall.GetTCPUserTimeout(client.conn)
if err != nil {
t.Fatalf("syscall.GetTCPUserTimeout() failed: %v", err)
}
if opt < 0 {
t.Skipf("skipping test on unsupported environment")
}
if gotTimeout := time.Duration(opt) * time.Millisecond; gotTimeout != tt.wantTimeout {
t.Fatalf("syscall.GetTCPUserTimeout() = %d, want %d", gotTimeout, tt.wantTimeout)
}
}
}