| package session |
| |
| import ( |
| "context" |
| "net" |
| "sync/atomic" |
| "time" |
| |
| "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" |
| opentracing "github.com/opentracing/opentracing-go" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "golang.org/x/net/http2" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/health/grpc_health_v1" |
| ) |
| |
| func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) { |
| go func() { |
| <-ctx.Done() |
| conn.Close() |
| }() |
| logrus.Debugf("serving grpc connection") |
| (&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer}) |
| } |
| |
| func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) { |
| var dialCount int64 |
| dialer := grpc.WithDialer(func(addr string, d time.Duration) (net.Conn, error) { |
| if c := atomic.AddInt64(&dialCount, 1); c > 1 { |
| return nil, errors.Errorf("only one connection allowed") |
| } |
| return conn, nil |
| }) |
| |
| dialOpts := []grpc.DialOption{ |
| dialer, |
| grpc.WithInsecure(), |
| } |
| |
| if span := opentracing.SpanFromContext(ctx); span != nil { |
| tracer := span.Tracer() |
| dialOpts = append(dialOpts, |
| grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer, traceFilter())), |
| grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer, traceFilter())), |
| ) |
| } |
| |
| cc, err := grpc.DialContext(ctx, "", dialOpts...) |
| if err != nil { |
| return nil, nil, errors.Wrap(err, "failed to create grpc client") |
| } |
| |
| ctx, cancel := context.WithCancel(ctx) |
| go monitorHealth(ctx, cc, cancel) |
| |
| return ctx, cc, nil |
| } |
| |
| func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func()) { |
| defer cancelConn() |
| defer cc.Close() |
| |
| ticker := time.NewTicker(1 * time.Second) |
| defer ticker.Stop() |
| healthClient := grpc_health_v1.NewHealthClient(cc) |
| |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-ticker.C: |
| ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| _, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) |
| cancel() |
| if err != nil { |
| return |
| } |
| } |
| } |
| } |