merge master
diff --git a/.travis.yml b/.travis.yml
index b3577c7..9032f8d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,19 +1,20 @@
language: go
go:
- - 1.6.3
- - 1.7
- - 1.8
+ - 1.6.x
+ - 1.7.x
+ - 1.8.x
go_import_path: google.golang.org/grpc
before_install:
- - go get github.com/golang/lint/golint
+ - if [[ $TRAVIS_GO_VERSION = 1.8* ]]; then go get -u github.com/golang/lint/golint honnef.co/go/tools/cmd/staticcheck; fi
- go get -u golang.org/x/tools/cmd/goimports github.com/axw/gocov/gocov github.com/mattn/goveralls golang.org/x/tools/cmd/cover
script:
- '! gofmt -s -d -l . 2>&1 | read'
- '! goimports -l . | read'
- - '! golint ./... | grep -vE "(_mock|_string|\.pb)\.go:"'
- - '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf" | grep -vF .pb.go:' # https://github.com/golang/protobuf/issues/214
+ - 'if [[ $TRAVIS_GO_VERSION = 1.8* ]]; then ! golint ./... | grep -vE "(_mock|_string|\.pb)\.go:"; fi'
+ - 'if [[ $TRAVIS_GO_VERSION = 1.8* ]]; then ! go tool vet -all . 2>&1 | grep -vF .pb.go:; fi' # https://github.com/golang/protobuf/issues/214
- make test testrace
+ - 'if [[ $TRAVIS_GO_VERSION = 1.8* ]]; then staticcheck -ignore google.golang.org/grpc/transport/transport_test.go:SA2002 ./...; fi' # TODO(menghanl): fix these
diff --git a/README.md b/README.md
index ae0236f..c1e43c7 100644
--- a/README.md
+++ b/README.md
@@ -26,9 +26,13 @@
-------------
See [API documentation](https://godoc.org/google.golang.org/grpc) for package and API descriptions and find examples in the [examples directory](examples/).
+Performance
+-----------
+See the current benchmarks for some of the languages supported in [this dashboard](https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584&widget=490377658&container=1286539696).
+
Status
------
-GA
+General Availability [Google Cloud Platform Launch Stages](https://cloud.google.com/terms/launch-stages).
FAQ
---
diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go
index 8fe3fa1..b0c3030 100644
--- a/benchmark/benchmark_test.go
+++ b/benchmark/benchmark_test.go
@@ -73,7 +73,7 @@
streamCaller(stream)
}
- ch := make(chan int, maxConcurrentCalls*4)
+ ch := make(chan struct{}, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
@@ -82,11 +82,11 @@
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
+ stream, err := tc.StreamingCall(context.Background())
+ if err != nil {
+ b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
+ }
go func() {
- stream, err := tc.StreamingCall(context.Background())
- if err != nil {
- b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
- }
for range ch {
start := time.Now()
streamCaller(stream)
@@ -100,7 +100,7 @@
}
b.StartTimer()
for i := 0; i < b.N; i++ {
- ch <- i
+ ch <- struct{}{}
}
b.StopTimer()
close(ch)
diff --git a/clientconn_test.go b/clientconn_test.go
index 2db470e..fdb261a 100644
--- a/clientconn_test.go
+++ b/clientconn_test.go
@@ -295,17 +295,20 @@
func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
- dialDone := make(chan struct{})
+ defer cancel()
+ dialDone := make(chan error)
go func() {
- conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
- if err != nil {
- t.Fatalf("unexpected error dialing connection: %v", err)
- }
- conn.Close()
- close(dialDone)
+ dialDone <- func() error {
+ conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
+ if err != nil {
+ return err
+ }
+ return conn.Close()
+ }()
}()
- <-dialDone
- cancel()
+ if err := <-dialDone; err != nil {
+ t.Fatalf("unexpected error dialing connection: %s", err)
+ }
}
func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
diff --git a/credentials/oauth/oauth.go b/credentials/oauth/oauth.go
index 25393cc..126bc78 100644
--- a/credentials/oauth/oauth.go
+++ b/credentials/oauth/oauth.go
@@ -37,6 +37,7 @@
import (
"fmt"
"io/ioutil"
+ "sync"
"golang.org/x/net/context"
"golang.org/x/oauth2"
@@ -132,20 +133,27 @@
// serviceAccount represents PerRPCCredentials via JWT signing key.
type serviceAccount struct {
+ mu sync.Mutex
config *jwt.Config
+ t *oauth2.Token
}
-func (s serviceAccount) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
- token, err := s.config.TokenSource(ctx).Token()
- if err != nil {
- return nil, err
+func (s *serviceAccount) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.t.Valid() {
+ var err error
+ s.t, err = s.config.TokenSource(ctx).Token()
+ if err != nil {
+ return nil, err
+ }
}
return map[string]string{
- "authorization": token.TokenType + " " + token.AccessToken,
+ "authorization": s.t.TokenType + " " + s.t.AccessToken,
}, nil
}
-func (s serviceAccount) RequireTransportSecurity() bool {
+func (s *serviceAccount) RequireTransportSecurity() bool {
return true
}
@@ -156,7 +164,7 @@
if err != nil {
return nil, err
}
- return serviceAccount{config: config}, nil
+ return &serviceAccount{config: config}, nil
}
// NewServiceAccountFromFile constructs the PerRPCCredentials using the JSON key file
diff --git a/rpc_util.go b/rpc_util.go
index 7da0da0..4a4ca40 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -161,6 +161,14 @@
after(*callInfo)
}
+// EmptyCallOption does not alter the Call configuration.
+// It can be embedded in another structure to carry satellite data for use
+// by interceptors.
+type EmptyCallOption struct{}
+
+func (EmptyCallOption) before(*callInfo) error { return nil }
+func (EmptyCallOption) after(*callInfo) {}
+
type beforeCall func(c *callInfo) error
func (o beforeCall) before(c *callInfo) error { return o(c) }
diff --git a/rpc_util_test.go b/rpc_util_test.go
index b2b85c7..d832b12 100644
--- a/rpc_util_test.go
+++ b/rpc_util_test.go
@@ -47,6 +47,8 @@
"google.golang.org/grpc/transport"
)
+var _ CallOption = EmptyCallOption{} // ensure EmptyCallOption implements the interface
+
func TestSimpleParsing(t *testing.T) {
bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
for _, test := range []struct {
diff --git a/server.go b/server.go
index 4ae3372..5807dae 100644
--- a/server.go
+++ b/server.go
@@ -712,139 +712,137 @@
stream.SetSendCompress(s.opts.cp.Type())
}
p := &parser{r: stream}
- for { // TODO: delete
- pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
+ pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ }
+ if err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
+ }
+ }
+ return err
+ }
+
+ if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ return err
+ }
+ if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ }
+ var inPayload *stats.InPayload
+ if sh != nil {
+ inPayload = &stats.InPayload{
+ RecvTime: time.Now(),
+ }
+ }
+ df := func(v interface{}) error {
+ if inPayload != nil {
+ inPayload.WireLength = len(req)
+ }
+ if pf == compressionMade {
+ var err error
+ req, err = s.opts.dc.Do(bytes.NewReader(req))
+ if err != nil {
+ return Errorf(codes.Internal, err.Error())
+ }
+ }
+ if len(req) > s.opts.maxReceiveMessageSize {
+ // TODO: Revisit the error code. Currently keep it consistent with
+ // java implementation.
+ return status.Errorf(codes.ResourceExhausted, "Received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
+ }
+ if err := s.opts.codec.Unmarshal(req, v); err != nil {
+ return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
+ }
+ if inPayload != nil {
+ inPayload.Payload = v
+ inPayload.Data = req
+ inPayload.Length = len(req)
+ sh.HandleRPC(stream.Context(), inPayload)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+ }
+ return nil
+ }
+ reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ // Convert appErr if it is not a grpc status error.
+ appErr = status.Error(convertCode(appErr), appErr.Error())
+ appStatus, _ = status.FromError(appErr)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ trInfo.tr.SetError()
+ }
+ if e := t.WriteStatus(stream, appStatus); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ return appErr
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer("OK"), false)
+ }
+ opts := &transport.Options{
+ Last: true,
+ Delay: false,
+ }
+ if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}
- if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
- }
- if err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
- }
- }
- return err
- }
-
- if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- return err
- }
- if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
-
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
- }
- var inPayload *stats.InPayload
- if sh != nil {
- inPayload = &stats.InPayload{
- RecvTime: time.Now(),
- }
- }
- df := func(v interface{}) error {
- if inPayload != nil {
- inPayload.WireLength = len(req)
- }
- if pf == compressionMade {
- var err error
- req, err = s.opts.dc.Do(bytes.NewReader(req))
- if err != nil {
- return Errorf(codes.Internal, err.Error())
- }
- }
- if len(req) > s.opts.maxReceiveMessageSize {
- // TODO: Revisit the error code. Currently keep it consistent with
- // java implementation.
- return status.Errorf(codes.ResourceExhausted, "Received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
- }
- if err := s.opts.codec.Unmarshal(req, v); err != nil {
- return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
- }
- if inPayload != nil {
- inPayload.Payload = v
- inPayload.Data = req
- inPayload.Length = len(req)
- sh.HandleRPC(stream.Context(), inPayload)
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
- }
- return nil
- }
- reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
- if appErr != nil {
- appStatus, ok := status.FromError(appErr)
- if !ok {
- // Convert appErr if it is not a grpc status error.
- appErr = status.Error(convertCode(appErr), appErr.Error())
- appStatus, _ = status.FromError(appErr)
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
- trInfo.tr.SetError()
- }
- if e := t.WriteStatus(stream, appStatus); e != nil {
+ if s, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, s); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
}
- return appErr
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(stringer("OK"), false)
- }
- opts := &transport.Options{
- Last: true,
- Delay: false,
- }
- if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
- }
- if s, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, s); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
- }
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
- return err
}
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
- }
- // TODO: Should we be logging if writing status failed here, like above?
- // Should the logging be in WriteStatus? Should we ignore the WriteStatus
- // error or allow the stats handler to see it?
- return t.WriteStatus(stream, status.New(codes.OK, ""))
+ return err
}
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
+ }
+ // TODO: Should we be logging if writing status failed here, like above?
+ // Should the logging be in WriteStatus? Should we ignore the WriteStatus
+ // error or allow the stats handler to see it?
+ return t.WriteStatus(stream, status.New(codes.OK, ""))
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 241cc90..5f72c79 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -3030,32 +3030,35 @@
if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
}
- var index int
- for index < len(reqSizes) {
- respParam := []*testpb.ResponseParameters{
- {
- Size: proto.Int32(int32(respSizes[index])),
- },
- }
+ err = func() error {
+ for index := 0; index < len(reqSizes); index++ {
+ respParam := []*testpb.ResponseParameters{
+ {
+ Size: proto.Int32(int32(respSizes[index])),
+ },
+ }
- payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
- if err != nil {
- t.Fatal(err)
- }
+ payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
+ if err != nil {
+ return err
+ }
- req := &testpb.StreamingOutputCallRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
- ResponseParameters: respParam,
- Payload: payload,
+ req := &testpb.StreamingOutputCallRequest{
+ ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
+ ResponseParameters: respParam,
+ Payload: payload,
+ }
+ if err := stream.Send(req); err != nil {
+ return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
+ }
}
- if err := stream.Send(req); err != nil {
- t.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
- return
- }
- index++
- }
+ return nil
+ }()
// Tell the server we're done sending args.
stream.CloseSend()
+ if err != nil {
+ t.Error(err)
+ }
}()
for {
if _, err := stream.Recv(); err != nil {
@@ -3451,7 +3454,8 @@
defer wg.Done()
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
if err != nil {
- t.Fatal(err)
+ t.Error(err)
+ return
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),