| // Copyright 2016 Michal Witkowski. All Rights Reserved. |
| // See LICENSE for licensing terms. |
| |
| // gRPC Prometheus monitoring interceptors for client-side gRPC. |
| |
| package grpc_prometheus |
| |
| import ( |
| "io" |
| |
| "golang.org/x/net/context" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| ) |
| |
| // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs. |
| func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| monitor := newClientReporter(Unary, method) |
| monitor.SentMessage() |
| err := invoker(ctx, method, req, reply, cc, opts...) |
| if err != nil { |
| monitor.ReceivedMessage() |
| } |
| monitor.Handled(grpc.Code(err)) |
| return err |
| } |
| |
| // StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs. |
| func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { |
| monitor := newClientReporter(clientStreamType(desc), method) |
| clientStream, err := streamer(ctx, desc, cc, method, opts...) |
| if err != nil { |
| monitor.Handled(grpc.Code(err)) |
| return nil, err |
| } |
| return &monitoredClientStream{clientStream, monitor}, nil |
| } |
| |
| func clientStreamType(desc *grpc.StreamDesc) grpcType { |
| if desc.ClientStreams && !desc.ServerStreams { |
| return ClientStream |
| } else if !desc.ClientStreams && desc.ServerStreams { |
| return ServerStream |
| } |
| return BidiStream |
| } |
| |
| // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters. |
| type monitoredClientStream struct { |
| grpc.ClientStream |
| monitor *clientReporter |
| } |
| |
| func (s *monitoredClientStream) SendMsg(m interface{}) error { |
| err := s.ClientStream.SendMsg(m) |
| if err == nil { |
| s.monitor.SentMessage() |
| } |
| return err |
| } |
| |
| func (s *monitoredClientStream) RecvMsg(m interface{}) error { |
| err := s.ClientStream.RecvMsg(m) |
| if err == nil { |
| s.monitor.ReceivedMessage() |
| } else if err == io.EOF { |
| s.monitor.Handled(codes.OK) |
| } else { |
| s.monitor.Handled(grpc.Code(err)) |
| } |
| return err |
| } |