blob: 9521460cf5e07169697cacbe577da9ab2c76d954 [file] [log] [blame]
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcreplay
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"sync"
pb "cloud.google.com/go/rpcreplay/proto/rpcreplay"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
// A Recorder records RPCs for later playback.
type Recorder struct {
mu sync.Mutex
w *bufio.Writer
f *os.File
next int
err error
// BeforeFunc defines a function that can inspect and modify requests and responses
// written to the replay file. It does not modify messages sent to the service.
// It is run once before a request is written to the replay file, and once before a response
// is written to the replay file.
// The function is called with the method name and the message that triggered the callback.
// If the function returns an error, the error will be returned to the client.
// This is only executed for unary RPCs; streaming RPCs are not supported.
BeforeFunc func(string, proto.Message) error
}
// NewRecorder creates a recorder that writes to filename. The file will
// also store the initial bytes for retrieval during replay.
//
// You must call Close on the Recorder to ensure that all data is written.
func NewRecorder(filename string, initial []byte) (*Recorder, error) {
f, err := os.Create(filename)
if err != nil {
return nil, err
}
rec, err := NewRecorderWriter(f, initial)
if err != nil {
_ = f.Close()
return nil, err
}
rec.f = f
return rec, nil
}
// NewRecorderWriter creates a recorder that writes to w. The initial
// bytes will also be written to w for retrieval during replay.
//
// You must call Close on the Recorder to ensure that all data is written.
func NewRecorderWriter(w io.Writer, initial []byte) (*Recorder, error) {
bw := bufio.NewWriter(w)
if err := writeHeader(bw, initial); err != nil {
return nil, err
}
return &Recorder{w: bw, next: 1}, nil
}
// DialOptions returns the options that must be passed to grpc.Dial
// to enable recording.
func (r *Recorder) DialOptions() []grpc.DialOption {
return []grpc.DialOption{
grpc.WithUnaryInterceptor(r.interceptUnary),
grpc.WithStreamInterceptor(r.interceptStream),
}
}
// Close saves any unwritten information.
func (r *Recorder) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return r.err
}
err := r.w.Flush()
if r.f != nil {
if err2 := r.f.Close(); err == nil {
err = err2
}
}
return err
}
// Intercepts all unary (non-stream) RPCs.
func (r *Recorder) interceptUnary(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ereq := &entry{
kind: pb.Entry_REQUEST,
method: method,
msg: message{msg: proto.Clone(req.(proto.Message))},
}
if r.BeforeFunc != nil {
if err := r.BeforeFunc(method, ereq.msg.msg); err != nil {
return err
}
}
refIndex, err := r.writeEntry(ereq)
if err != nil {
return err
}
ierr := invoker(ctx, method, req, res, cc, opts...)
eres := &entry{
kind: pb.Entry_RESPONSE,
refIndex: refIndex,
}
// If the error is not a gRPC status, then something more
// serious is wrong. More significantly, we have no way
// of serializing an arbitrary error. So just return it
// without recording the response.
if _, ok := status.FromError(ierr); !ok {
r.mu.Lock()
r.err = fmt.Errorf("saw non-status error in %s response: %w (%T)", method, ierr, ierr)
r.mu.Unlock()
return ierr
}
eres.msg.set(proto.Clone(res.(proto.Message)), ierr)
if r.BeforeFunc != nil {
if err := r.BeforeFunc(method, eres.msg.msg); err != nil {
return err
}
}
if _, err := r.writeEntry(eres); err != nil {
return err
}
return ierr
}
func (r *Recorder) writeEntry(e *entry) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return 0, r.err
}
err := writeEntry(r.w, e)
if err != nil {
r.err = err
return 0, err
}
n := r.next
r.next++
return n, nil
}
func (r *Recorder) interceptStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
cstream, serr := streamer(ctx, desc, cc, method, opts...)
e := &entry{
kind: pb.Entry_CREATE_STREAM,
method: method,
}
e.msg.set(nil, serr)
refIndex, err := r.writeEntry(e)
if err != nil {
return nil, err
}
return &recClientStream{
ctx: ctx,
rec: r,
cstream: cstream,
refIndex: refIndex,
}, serr
}
// A recClientStream implements the gprc.ClientStream interface.
// It behaves exactly like the default ClientStream, but also
// records all messages sent and received.
type recClientStream struct {
ctx context.Context
rec *Recorder
cstream grpc.ClientStream
refIndex int
}
func (rcs *recClientStream) Context() context.Context { return rcs.ctx }
func (rcs *recClientStream) SendMsg(m interface{}) error {
serr := rcs.cstream.SendMsg(m)
e := &entry{
kind: pb.Entry_SEND,
refIndex: rcs.refIndex,
}
e.msg.set(m, serr)
if _, err := rcs.rec.writeEntry(e); err != nil {
return err
}
return serr
}
func (rcs *recClientStream) RecvMsg(m interface{}) error {
serr := rcs.cstream.RecvMsg(m)
e := &entry{
kind: pb.Entry_RECV,
refIndex: rcs.refIndex,
}
e.msg.set(m, serr)
if _, err := rcs.rec.writeEntry(e); err != nil {
return err
}
return serr
}
func (rcs *recClientStream) Header() (metadata.MD, error) {
// TODO(jba): record.
return rcs.cstream.Header()
}
func (rcs *recClientStream) Trailer() metadata.MD {
// TODO(jba): record.
return rcs.cstream.Trailer()
}
func (rcs *recClientStream) CloseSend() error {
// TODO(jba): record.
return rcs.cstream.CloseSend()
}
// A Replayer replays a set of RPCs saved by a Recorder.
type Replayer struct {
initial []byte // initial state
log func(format string, v ...interface{}) // for debugging
mu sync.Mutex
calls []*call
streams []*stream
// BeforeFunc defines a function that can inspect and modify requests before they
// are matched for responses from the replay file.
// The function is called with the method name and the message that triggered the callback.
// If the function returns an error, the error will be returned to the client.
// This is only executed for unary RPCs; streaming RPCs are not supported.
BeforeFunc func(string, proto.Message) error
}
// A call represents a unary RPC, with a request and response (or error).
type call struct {
method string
request proto.Message
response message
}
// A stream represents a gRPC stream, with an initial create-stream call, followed by
// zero or more sends and/or receives.
type stream struct {
method string
createIndex int
createErr error // error from create call
sends []message
recvs []message
}
// NewReplayer creates a Replayer that reads from filename.
func NewReplayer(filename string) (*Replayer, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
return NewReplayerReader(f)
}
// NewReplayerReader creates a Replayer that reads from r.
func NewReplayerReader(r io.Reader) (*Replayer, error) {
rep := &Replayer{
log: func(string, ...interface{}) {},
}
if err := rep.read(r); err != nil {
return nil, err
}
return rep, nil
}
// read reads the stream of recorded entries.
// It matches requests with responses, with each pair grouped
// into a call struct.
func (rep *Replayer) read(r io.Reader) error {
r = bufio.NewReader(r)
bytes, err := readHeader(r)
if err != nil {
return err
}
rep.initial = bytes
callsByIndex := map[int]*call{}
streamsByIndex := map[int]*stream{}
for i := 1; ; i++ {
e, err := readEntry(r)
if err != nil {
return err
}
if e == nil {
break
}
switch e.kind {
case pb.Entry_REQUEST:
callsByIndex[i] = &call{
method: e.method,
request: e.msg.msg,
}
case pb.Entry_RESPONSE:
call := callsByIndex[e.refIndex]
if call == nil {
return fmt.Errorf("replayer: no request for response #%d", i)
}
delete(callsByIndex, e.refIndex)
call.response = e.msg
rep.calls = append(rep.calls, call)
case pb.Entry_CREATE_STREAM:
s := &stream{method: e.method, createIndex: i}
s.createErr = e.msg.err
streamsByIndex[i] = s
rep.streams = append(rep.streams, s)
case pb.Entry_SEND:
s := streamsByIndex[e.refIndex]
if s == nil {
return fmt.Errorf("replayer: no stream for send #%d", i)
}
s.sends = append(s.sends, e.msg)
case pb.Entry_RECV:
s := streamsByIndex[e.refIndex]
if s == nil {
return fmt.Errorf("replayer: no stream for recv #%d", i)
}
s.recvs = append(s.recvs, e.msg)
default:
return fmt.Errorf("replayer: unknown kind %s", e.kind)
}
}
if len(callsByIndex) > 0 {
return fmt.Errorf("replayer: %d unmatched requests", len(callsByIndex))
}
return nil
}
// DialOptions returns the options that must be passed to grpc.Dial
// to enable replaying.
func (rep *Replayer) DialOptions() []grpc.DialOption {
return []grpc.DialOption{
// On replay, we make no RPCs, which means the connection may be closed
// before the normally async Dial completes. Making the Dial synchronous
// fixes that.
grpc.WithBlock(),
grpc.WithUnaryInterceptor(rep.interceptUnary),
grpc.WithStreamInterceptor(rep.interceptStream),
}
}
// Connection returns a fake gRPC connection suitable for replaying.
func (rep *Replayer) Connection() (*grpc.ClientConn, error) {
// We don't need an actual connection, not even a loopback one.
// But we do need something to attach gRPC interceptors to.
// So we start a local server and connect to it, then close it down.
srv := grpc.NewServer()
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}
go func() {
if err := srv.Serve(l); err != nil {
panic(err) // we should never get an error because we just connect and stop
}
}()
conn, err := grpc.Dial(l.Addr().String(),
append([]grpc.DialOption{grpc.WithInsecure()}, rep.DialOptions()...)...)
if err != nil {
return nil, err
}
conn.Close()
srv.Stop()
return conn, nil
}
// Initial returns the initial state saved by the Recorder.
func (rep *Replayer) Initial() []byte { return rep.initial }
// SetLogFunc sets a function to be used for debug logging. The function
// should be safe to be called from multiple goroutines.
func (rep *Replayer) SetLogFunc(f func(format string, v ...interface{})) {
rep.log = f
}
// Close closes the Replayer.
func (rep *Replayer) Close() error {
return nil
}
func (rep *Replayer) interceptUnary(_ context.Context, method string, req, res interface{}, _ *grpc.ClientConn, _ grpc.UnaryInvoker, _ ...grpc.CallOption) error {
mreq := req.(proto.Message)
if rep.BeforeFunc != nil {
if err := rep.BeforeFunc(method, mreq); err != nil {
return err
}
}
rep.log("request %s (%s)", method, req)
call := rep.extractCall(method, mreq)
if call == nil {
return fmt.Errorf("replayer: request not found: %s", mreq)
}
rep.log("returning %v", call.response)
if call.response.err != nil {
return call.response.err
}
proto.Merge(res.(proto.Message), call.response.msg) // copy msg into res
return nil
}
func (rep *Replayer) interceptStream(ctx context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, method string, _ grpc.Streamer, _ ...grpc.CallOption) (grpc.ClientStream, error) {
rep.log("create-stream %s", method)
return &repClientStream{ctx: ctx, rep: rep, method: method}, nil
}
type repClientStream struct {
ctx context.Context
rep *Replayer
method string
str *stream
}
func (rcs *repClientStream) Context() context.Context { return rcs.ctx }
func (rcs *repClientStream) SendMsg(req interface{}) error {
if rcs.str == nil {
if err := rcs.setStream(rcs.method, req.(proto.Message)); err != nil {
return err
}
}
if len(rcs.str.sends) == 0 {
return fmt.Errorf("replayer: no more sends for stream %s, created at index %d",
rcs.str.method, rcs.str.createIndex)
}
// TODO(jba): Do not assume that the sends happen in the same order on replay.
msg := rcs.str.sends[0]
rcs.str.sends = rcs.str.sends[1:]
return msg.err
}
func (rcs *repClientStream) setStream(method string, req proto.Message) error {
str := rcs.rep.extractStream(method, req)
if str == nil {
return fmt.Errorf("replayer: stream not found for method %s and request %v", method, req)
}
if str.createErr != nil {
return str.createErr
}
rcs.str = str
return nil
}
func (rcs *repClientStream) RecvMsg(m interface{}) error {
if rcs.str == nil {
// Receive before send; fall back to matching stream by method only.
if err := rcs.setStream(rcs.method, nil); err != nil {
return err
}
}
if len(rcs.str.recvs) == 0 {
return fmt.Errorf("replayer: no more receives for stream %s, created at index %d",
rcs.str.method, rcs.str.createIndex)
}
msg := rcs.str.recvs[0]
rcs.str.recvs = rcs.str.recvs[1:]
if msg.err != nil {
return msg.err
}
proto.Merge(m.(proto.Message), msg.msg) // copy msg into m
return nil
}
func (rcs *repClientStream) Header() (metadata.MD, error) {
log.Printf("replay: stream metadata not supported")
return nil, nil
}
func (rcs *repClientStream) Trailer() metadata.MD {
log.Printf("replay: stream metadata not supported")
return nil
}
func (rcs *repClientStream) CloseSend() error {
return nil
}
// extractCall finds the first call in the list with the same method
// and request. It returns nil if it can't find such a call.
func (rep *Replayer) extractCall(method string, req proto.Message) *call {
rep.mu.Lock()
defer rep.mu.Unlock()
for i, call := range rep.calls {
if call == nil {
continue
}
if method == call.method && proto.Equal(req, call.request) {
rep.calls[i] = nil // nil out this call so we don't reuse it
return call
}
}
return nil
}
// extractStream find the first stream in the list with the same method and the same
// first request sent. If req is nil, that means a receive occurred before a send, so
// it matches only on method.
func (rep *Replayer) extractStream(method string, req proto.Message) *stream {
rep.mu.Lock()
defer rep.mu.Unlock()
for i, stream := range rep.streams {
// Skip stream if it is nil (already extracted) or its method doesn't match.
if stream == nil || stream.method != method {
continue
}
// If there is a first request, skip stream if it has no requests or its first
// request doesn't match.
if req != nil && len(stream.sends) > 0 && !proto.Equal(req, stream.sends[0].msg) {
continue
}
rep.streams[i] = nil // nil out this stream so we don't reuse it
return stream
}
return nil
}
// Fprint reads the entries from filename and writes them to w in human-readable form.
// It is intended for debugging.
func Fprint(w io.Writer, filename string) error {
f, err := os.Open(filename)
if err != nil {
return err
}
defer f.Close()
return FprintReader(w, f)
}
// FprintReader reads the entries from r and writes them to w in human-readable form.
// It is intended for debugging.
func FprintReader(w io.Writer, r io.Reader) error {
initial, err := readHeader(r)
if err != nil {
return err
}
fmt.Fprintf(w, "initial state: %q\n", string(initial))
for i := 1; ; i++ {
e, err := readEntry(r)
if err != nil {
return err
}
if e == nil {
return nil
}
fmt.Fprintf(w, "#%d: kind: %s, method: %s, ref index: %d", i, e.kind, e.method, e.refIndex)
switch {
case e.msg.msg != nil:
fmt.Fprintf(w, ", message:\n")
b, err := prototext.Marshal(e.msg.msg)
if err != nil {
return err
}
if _, err := io.Copy(w, bytes.NewReader(b)); err != nil {
return err
}
case e.msg.err != nil:
fmt.Fprintf(w, ", error: %v\n", e.msg.err)
default:
fmt.Fprintln(w)
}
}
}
// An entry holds one gRPC action (request, response, etc.).
type entry struct {
kind pb.Entry_Kind
method string
msg message
refIndex int // index of corresponding request or create-stream
}
func (e1 *entry) equal(e2 *entry) bool {
if e1 == nil && e2 == nil {
return true
}
if e1 == nil || e2 == nil {
return false
}
return e1.kind == e2.kind &&
e1.method == e2.method &&
proto.Equal(e1.msg.msg, e2.msg.msg) &&
errEqual(e1.msg.err, e2.msg.err) &&
e1.refIndex == e2.refIndex
}
func errEqual(e1, e2 error) bool {
if e1 == e2 {
return true
}
s1, ok1 := status.FromError(e1)
s2, ok2 := status.FromError(e2)
if !ok1 || !ok2 {
return false
}
return proto.Equal(s1.Proto(), s2.Proto())
}
// message holds either a single proto.Message or an error.
type message struct {
msg proto.Message
err error
}
func (m *message) set(msg interface{}, err error) {
m.err = err
if err != io.EOF && msg != nil {
m.msg = msg.(proto.Message)
}
}
// File format:
// header
// sequence of Entry protos
//
// Header format:
// magic string
// a record containing the bytes of the initial state
const magic = "RPCReplay"
func writeHeader(w io.Writer, initial []byte) error {
if _, err := io.WriteString(w, magic); err != nil {
return err
}
return writeRecord(w, initial)
}
func readHeader(r io.Reader) ([]byte, error) {
var buf [len(magic)]byte
if _, err := io.ReadFull(r, buf[:]); err != nil {
if err == io.EOF {
err = errors.New("rpcreplay: empty replay file")
}
return nil, err
}
if string(buf[:]) != magic {
return nil, errors.New("rpcreplay: not a replay file (does not begin with magic string)")
}
bytes, err := readRecord(r)
if err == io.EOF {
err = errors.New("rpcreplay: missing initial state")
}
return bytes, err
}
func writeEntry(w io.Writer, e *entry) error {
var m proto.Message
if e.msg.err != nil && e.msg.err != io.EOF {
s, ok := status.FromError(e.msg.err)
if !ok {
return fmt.Errorf("rpcreplay: error %w is not a Status", e.msg.err)
}
m = s.Proto()
} else {
m = e.msg.msg
}
var a *anypb.Any
var err error
if m != nil {
a, err = anypb.New(m)
if err != nil {
return err
}
}
pe := &pb.Entry{
Kind: e.kind,
Method: e.method,
Message: a,
IsError: e.msg.err != nil,
RefIndex: int32(e.refIndex),
}
bytes, err := proto.Marshal(pe)
if err != nil {
return err
}
return writeRecord(w, bytes)
}
func readEntry(r io.Reader) (*entry, error) {
buf, err := readRecord(r)
if err == io.EOF {
return nil, nil
}
if err != nil {
return nil, err
}
var pe pb.Entry
if err := proto.Unmarshal(buf, &pe); err != nil {
return nil, err
}
var msg message
if pe.Message != nil {
if pe.IsError {
s := &spb.Status{}
err := anypb.UnmarshalTo(pe.Message, s, proto.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true})
if err != nil {
return nil, err
}
msg.err = status.ErrorProto(s)
} else {
m, err := anypb.UnmarshalNew(pe.Message, proto.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true})
if err != nil {
return nil, err
}
msg.msg = m
}
} else if pe.IsError {
msg.err = io.EOF
} else if pe.Kind != pb.Entry_CREATE_STREAM {
return nil, errors.New("rpcreplay: entry with nil message and false is_error")
}
return &entry{
kind: pe.Kind,
method: pe.Method,
msg: msg,
refIndex: int(pe.RefIndex),
}, nil
}
// A record consists of an unsigned 32-bit little-endian length L followed by L
// bytes.
func writeRecord(w io.Writer, data []byte) error {
if err := binary.Write(w, binary.LittleEndian, uint32(len(data))); err != nil {
return err
}
_, err := w.Write(data)
return err
}
func readRecord(r io.Reader) ([]byte, error) {
var size uint32
if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
return nil, err
}
buf := make([]byte, size)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, err
}
return buf, nil
}