merge master
diff --git a/clientconn.go b/clientconn.go
index e01b138..8232dfd 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -333,7 +333,14 @@
}
cc.mkp = cc.dopts.copts.KeepaliveParams
- grpcUA := "grpc-go/" + Version
+ if cc.dopts.copts.Dialer == nil {
+ cc.dopts.copts.Dialer = newProxyDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
+ return dialContext(ctx, "tcp", addr)
+ },
+ )
+ }
+
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
diff --git a/examples/gotutorial.md b/examples/gotutorial.md
index 6770b52..a86b5b6 100644
--- a/examples/gotutorial.md
+++ b/examples/gotutorial.md
@@ -1,4 +1,4 @@
-#gRPC Basics: Go
+# gRPC Basics: Go
This tutorial provides a basic Go programmer's introduction to working with gRPC. By walking through this example you'll learn how to:
diff --git a/go16.go b/go16.go
new file mode 100644
index 0000000..b61c57e
--- /dev/null
+++ b/go16.go
@@ -0,0 +1,56 @@
+// +build go1.6,!go1.7
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "fmt"
+ "net"
+ "net/http"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
+}
+
+func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
+ req.Cancel = ctx.Done()
+ if err := req.Write(conn); err != nil {
+ return fmt.Errorf("failed to write the HTTP request: %v", err)
+ }
+ return nil
+}
diff --git a/go17.go b/go17.go
new file mode 100644
index 0000000..844f0e1
--- /dev/null
+++ b/go17.go
@@ -0,0 +1,55 @@
+// +build go1.7
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "net"
+ "net/http"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{}).DialContext(ctx, network, address)
+}
+
+func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
+ req = req.WithContext(ctx)
+ if err := req.Write(conn); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/proxy.go b/proxy.go
new file mode 100644
index 0000000..10188dc
--- /dev/null
+++ b/proxy.go
@@ -0,0 +1,145 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "bufio"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/http/httputil"
+ "net/url"
+
+ "golang.org/x/net/context"
+)
+
+var (
+ // errDisabled indicates that proxy is disabled for the address.
+ errDisabled = errors.New("proxy is disabled for the address")
+ // The following variable will be overwritten in the tests.
+ httpProxyFromEnvironment = http.ProxyFromEnvironment
+)
+
+func mapAddress(ctx context.Context, address string) (string, error) {
+ req := &http.Request{
+ URL: &url.URL{
+ Scheme: "https",
+ Host: address,
+ },
+ }
+ url, err := httpProxyFromEnvironment(req)
+ if err != nil {
+ return "", err
+ }
+ if url == nil {
+ return "", errDisabled
+ }
+ return url.Host, nil
+}
+
+// To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader.
+// It's possible that this reader reads more than what's need for the response and stores
+// those bytes in the buffer.
+// bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the
+// bytes in the buffer.
+type bufConn struct {
+ net.Conn
+ r io.Reader
+}
+
+func (c *bufConn) Read(b []byte) (int, error) {
+ return c.r.Read(b)
+}
+
+func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, addr string) (_ net.Conn, err error) {
+ defer func() {
+ if err != nil {
+ conn.Close()
+ }
+ }()
+
+ req := (&http.Request{
+ Method: http.MethodConnect,
+ URL: &url.URL{Host: addr},
+ Header: map[string][]string{"User-Agent": {grpcUA}},
+ })
+
+ if err := sendHTTPRequest(ctx, req, conn); err != nil {
+ return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
+ }
+
+ r := bufio.NewReader(conn)
+ resp, err := http.ReadResponse(r, req)
+ if err != nil {
+ return nil, fmt.Errorf("reading server HTTP response: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ dump, err := httputil.DumpResponse(resp, true)
+ if err != nil {
+ return nil, fmt.Errorf("failed to do connect handshake, status code: %s", resp.Status)
+ }
+ return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
+ }
+
+ return &bufConn{Conn: conn, r: r}, nil
+}
+
+// newProxyDialer returns a dialer that connects to proxy first if necessary.
+// The returned dialer checks if a proxy is necessary, dial to the proxy with the
+// provided dialer, does HTTP CONNECT handshake and returns the connection.
+func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
+ return func(ctx context.Context, addr string) (conn net.Conn, err error) {
+ var skipHandshake bool
+ newAddr, err := mapAddress(ctx, addr)
+ if err != nil {
+ if err != errDisabled {
+ return nil, err
+ }
+ skipHandshake = true
+ newAddr = addr
+ }
+
+ conn, err = dialer(ctx, newAddr)
+ if err != nil {
+ return
+ }
+ if !skipHandshake {
+ conn, err = doHTTPConnectHandshake(ctx, conn, addr)
+ }
+ return
+ }
+}
diff --git a/proxy_test.go b/proxy_test.go
new file mode 100644
index 0000000..846b396
--- /dev/null
+++ b/proxy_test.go
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "bufio"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "testing"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+const (
+ envTestAddr = "1.2.3.4:8080"
+ envProxyAddr = "2.3.4.5:7687"
+)
+
+// overwriteAndRestore overwrite function httpProxyFromEnvironment and
+// returns a function to restore the default values.
+func overwrite(hpfe func(req *http.Request) (*url.URL, error)) func() {
+ backHPFE := httpProxyFromEnvironment
+ httpProxyFromEnvironment = hpfe
+ return func() {
+ httpProxyFromEnvironment = backHPFE
+ }
+}
+
+func TestMapAddressEnv(t *testing.T) {
+ // Overwrite the function in the test and restore them in defer.
+ hpfe := func(req *http.Request) (*url.URL, error) {
+ if req.URL.Host == envTestAddr {
+ return &url.URL{
+ Scheme: "https",
+ Host: envProxyAddr,
+ }, nil
+ }
+ return nil, nil
+ }
+ defer overwrite(hpfe)()
+
+ // envTestAddr should be handled by ProxyFromEnvironment.
+ got, err := mapAddress(context.Background(), envTestAddr)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != envProxyAddr {
+ t.Errorf("want %v, got %v", envProxyAddr, got)
+ }
+}
+
+type proxyServer struct {
+ t *testing.T
+ lis net.Listener
+ in net.Conn
+ out net.Conn
+}
+
+func (p *proxyServer) run() {
+ in, err := p.lis.Accept()
+ if err != nil {
+ return
+ }
+ p.in = in
+
+ req, err := http.ReadRequest(bufio.NewReader(in))
+ if err != nil {
+ p.t.Errorf("failed to read CONNECT req: %v", err)
+ return
+ }
+ if req.Method != http.MethodConnect || req.UserAgent() != grpcUA {
+ resp := http.Response{StatusCode: http.StatusMethodNotAllowed}
+ resp.Write(p.in)
+ p.in.Close()
+ p.t.Errorf("get wrong CONNECT req: %+v", req)
+ return
+ }
+
+ out, err := net.Dial("tcp", req.URL.Host)
+ if err != nil {
+ p.t.Errorf("failed to dial to server: %v", err)
+ return
+ }
+ resp := http.Response{StatusCode: http.StatusOK, Proto: "HTTP/1.0"}
+ resp.Write(p.in)
+ p.out = out
+ go io.Copy(p.in, p.out)
+ go io.Copy(p.out, p.in)
+}
+
+func (p *proxyServer) stop() {
+ p.lis.Close()
+ if p.in != nil {
+ p.in.Close()
+ }
+ if p.out != nil {
+ p.out.Close()
+ }
+}
+
+func TestHTTPConnect(t *testing.T) {
+ plis, err := net.Listen("tcp", ":0")
+ if err != nil {
+ t.Fatalf("failed to listen: %v", err)
+ }
+ p := &proxyServer{t: t, lis: plis}
+ go p.run()
+ defer p.stop()
+
+ blis, err := net.Listen("tcp", ":0")
+ if err != nil {
+ t.Fatalf("failed to listen: %v", err)
+ }
+
+ msg := []byte{4, 3, 5, 2}
+ recvBuf := make([]byte, len(msg), len(msg))
+ done := make(chan struct{})
+ go func() {
+ in, err := blis.Accept()
+ if err != nil {
+ t.Errorf("failed to accept: %v", err)
+ return
+ }
+ defer in.Close()
+ in.Read(recvBuf)
+ close(done)
+ }()
+
+ // Overwrite the function in the test and restore them in defer.
+ hpfe := func(req *http.Request) (*url.URL, error) {
+ return &url.URL{Host: plis.Addr().String()}, nil
+ }
+ defer overwrite(hpfe)()
+
+ // Dial to proxy server.
+ dialer := newProxyDialer(func(ctx context.Context, addr string) (net.Conn, error) {
+ if deadline, ok := ctx.Deadline(); ok {
+ return net.DialTimeout("tcp", addr, deadline.Sub(time.Now()))
+ }
+ return net.Dial("tcp", addr)
+ })
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ c, err := dialer(ctx, blis.Addr().String())
+ if err != nil {
+ t.Fatalf("http connect Dial failed: %v", err)
+ }
+ defer c.Close()
+
+ // Send msg on the connection.
+ c.Write(msg)
+ <-done
+
+ // Check received msg.
+ if string(recvBuf) != string(msg) {
+ t.Fatalf("received msg: %v, want %v", recvBuf, msg)
+ }
+}
diff --git a/rpc_util.go b/rpc_util.go
index e45e93b..f921a55 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -482,3 +482,5 @@
}
return b
}
+
+const grpcUA = "grpc-go/" + Version
diff --git a/stream.go b/stream.go
index 1b83fc4..44ee2f2 100644
--- a/stream.go
+++ b/stream.go
@@ -268,6 +268,9 @@
select {
case <-t.Error():
// Incur transport error, simply exit.
+ case <-cc.ctx.Done():
+ cs.finish(ErrClientConnClosing)
+ cs.closeTransportStream(ErrClientConnClosing)
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 3504423..c85c33d 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -1021,6 +1021,41 @@
awaitNewConnLogOutput()
}
+func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
+ defer leakCheck(t)()
+ for _, e := range listTestEnv() {
+ if e.name == "handler-tls" {
+ continue
+ }
+ testClientConnCloseAfterGoAwayWithActiveStream(t, e)
+ }
+}
+
+func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
+ te := newTest(t, e)
+ te.startServer(&testServer{security: e.security})
+ defer te.tearDown()
+ cc := te.clientConn()
+ tc := testpb.NewTestServiceClient(cc)
+
+ if _, err := tc.FullDuplexCall(context.Background()); err != nil {
+ t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
+ }
+ done := make(chan struct{})
+ go func() {
+ te.srv.GracefulStop()
+ close(done)
+ }()
+ time.Sleep(time.Second)
+ cc.Close()
+ timeout := time.NewTimer(time.Second)
+ select {
+ case <-done:
+ case <-timeout.C:
+ t.Fatalf("Test timed-out.")
+ }
+}
+
func TestFailFast(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {