rpcreplay: file format and I/O

Recording writes a file that replaying will read. This CL describes
the file format and implements reading and writing. I use a proto
for Entry so the files can be accessed from other languages.

The messageOrError type may seem superfluous, but it will turn
out to be useful to support streaming.

Change-Id: I9d2a8892d7c213aba8f0692c56ccce717d6a0d92
Reviewed-on: https://code-review.googlesource.com/13750
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Sarah Adams <shadams@google.com>
diff --git a/internal/rpcreplay/Makefile b/internal/rpcreplay/Makefile
new file mode 100644
index 0000000..f41293f
--- /dev/null
+++ b/internal/rpcreplay/Makefile
@@ -0,0 +1,32 @@
+# Copyright 2017 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Makefile for building Go files from protos.
+
+# Change these to match your environment.
+PROTOC=$(HOME)/bin/protoc
+PROTOC_GO_PLUGIN_DIR=$(GOPATH)/bin
+PROTOBUF_REPO=$(HOME)/git-repos/protobuf
+
+gen-protos:  sync-protobuf
+	for d in proto/*; do \
+	    PATH=$(PATH):$(PROTOC_GO_PLUGIN_DIR) \
+	    $(PROTOC) --go_out=plugins=grpc:$$d \
+	    -I $$d -I $(PROTOBUF_REPO)/src $$d/*.proto; \
+	done
+
+
+sync-protobuf:
+	cd $(PROTOBUF_REPO); git pull
+
diff --git a/internal/rpcreplay/proto/rpcreplay/rpcreplay.pb.go b/internal/rpcreplay/proto/rpcreplay/rpcreplay.pb.go
new file mode 100644
index 0000000..8e76a39
--- /dev/null
+++ b/internal/rpcreplay/proto/rpcreplay/rpcreplay.pb.go
@@ -0,0 +1,170 @@
+// Code generated by protoc-gen-go.
+// source: rpcreplay.proto
+// DO NOT EDIT!
+
+/*
+Package rpcreplay is a generated protocol buffer package.
+
+It is generated from these files:
+	rpcreplay.proto
+
+It has these top-level messages:
+	Entry
+*/
+package rpcreplay
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import google_protobuf "github.com/golang/protobuf/ptypes/any"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Entry_Kind int32
+
+const (
+	Entry_TYPE_UNSPECIFIED Entry_Kind = 0
+	// A unary request.
+	// method: the full name of the method
+	// message: the request proto
+	// is_error: false
+	// ref_index: 0
+	Entry_REQUEST Entry_Kind = 1
+	// A unary response.
+	// method: the full name of the method
+	// message:
+	//   if is_error: a google.rpc.Status proto
+	//   else:        the response proto
+	// ref_index: index in the sequence of Entries of matching request (1-based)
+	Entry_RESPONSE Entry_Kind = 2
+	// A method that creates a stream.
+	// method: the full name of the method
+	// message:
+	//   if is_error: a google.rpc.Status proto
+	//   else:        nil
+	// ref_index: 0
+	Entry_CREATE_STREAM Entry_Kind = 3
+	// A call to Send on the client returned by a stream-creating method.
+	// method: unset
+	// message: the proto being sent
+	// is_error: false
+	// ref_index: index of matching CREATE_STREAM entry (1-based)
+	Entry_SEND Entry_Kind = 4
+	// A call to Recv on the client returned by a stream-creating method.
+	// method: unset
+	// message:
+	//   if is_error: a google.rpc.Status proto, or nil on EOF
+	//   else:        the received message
+	// ref_index: index of matching CREATE_STREAM entry
+	Entry_RECV Entry_Kind = 5
+)
+
+var Entry_Kind_name = map[int32]string{
+	0: "TYPE_UNSPECIFIED",
+	1: "REQUEST",
+	2: "RESPONSE",
+	3: "CREATE_STREAM",
+	4: "SEND",
+	5: "RECV",
+}
+var Entry_Kind_value = map[string]int32{
+	"TYPE_UNSPECIFIED": 0,
+	"REQUEST":          1,
+	"RESPONSE":         2,
+	"CREATE_STREAM":    3,
+	"SEND":             4,
+	"RECV":             5,
+}
+
+func (x Entry_Kind) String() string {
+	return proto.EnumName(Entry_Kind_name, int32(x))
+}
+func (Entry_Kind) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
+
+// An Entry represents a single RPC activity, typically a request or response.
+type Entry struct {
+	Kind     Entry_Kind           `protobuf:"varint,1,opt,name=kind,enum=rpcreplay.Entry_Kind" json:"kind,omitempty"`
+	Method   string               `protobuf:"bytes,2,opt,name=method" json:"method,omitempty"`
+	Message  *google_protobuf.Any `protobuf:"bytes,3,opt,name=message" json:"message,omitempty"`
+	IsError  bool                 `protobuf:"varint,4,opt,name=is_error,json=isError" json:"is_error,omitempty"`
+	RefIndex int32                `protobuf:"varint,5,opt,name=ref_index,json=refIndex" json:"ref_index,omitempty"`
+}
+
+func (m *Entry) Reset()                    { *m = Entry{} }
+func (m *Entry) String() string            { return proto.CompactTextString(m) }
+func (*Entry) ProtoMessage()               {}
+func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Entry) GetKind() Entry_Kind {
+	if m != nil {
+		return m.Kind
+	}
+	return Entry_TYPE_UNSPECIFIED
+}
+
+func (m *Entry) GetMethod() string {
+	if m != nil {
+		return m.Method
+	}
+	return ""
+}
+
+func (m *Entry) GetMessage() *google_protobuf.Any {
+	if m != nil {
+		return m.Message
+	}
+	return nil
+}
+
+func (m *Entry) GetIsError() bool {
+	if m != nil {
+		return m.IsError
+	}
+	return false
+}
+
+func (m *Entry) GetRefIndex() int32 {
+	if m != nil {
+		return m.RefIndex
+	}
+	return 0
+}
+
+func init() {
+	proto.RegisterType((*Entry)(nil), "rpcreplay.Entry")
+	proto.RegisterEnum("rpcreplay.Entry_Kind", Entry_Kind_name, Entry_Kind_value)
+}
+
+func init() { proto.RegisterFile("rpcreplay.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+	// 289 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x44, 0x8e, 0xdf, 0x4e, 0xc2, 0x30,
+	0x14, 0xc6, 0x2d, 0x6c, 0x30, 0x0e, 0xfe, 0xa9, 0x0d, 0x9a, 0xa1, 0x37, 0x0b, 0x57, 0xf3, 0xa6,
+	0x24, 0xf8, 0x04, 0x04, 0x8e, 0x09, 0x31, 0x22, 0xb6, 0xc3, 0xc4, 0x1b, 0x17, 0x70, 0x05, 0x17,
+	0xa1, 0x25, 0xdd, 0x4c, 0xdc, 0x6b, 0xf8, 0xc4, 0x66, 0x13, 0xf4, 0xae, 0xbf, 0x7e, 0xbf, 0x9c,
+	0xef, 0x83, 0x33, 0xbb, 0x7b, 0xb3, 0x6a, 0xb7, 0x59, 0x14, 0x7c, 0x67, 0x4d, 0x6e, 0x58, 0xeb,
+	0xef, 0xe3, 0xaa, 0xbb, 0x36, 0x66, 0xbd, 0x51, 0xfd, 0x2a, 0x58, 0x7e, 0xae, 0xfa, 0x0b, 0xbd,
+	0xb7, 0x7a, 0xdf, 0x35, 0x70, 0x51, 0xe7, 0xb6, 0x60, 0x37, 0xe0, 0x7c, 0xa4, 0x3a, 0xf1, 0x49,
+	0x40, 0xc2, 0xd3, 0xc1, 0x05, 0xff, 0xbf, 0x57, 0xe5, 0xfc, 0x3e, 0xd5, 0x89, 0xa8, 0x14, 0x76,
+	0x09, 0x8d, 0xad, 0xca, 0xdf, 0x4d, 0xe2, 0xd7, 0x02, 0x12, 0xb6, 0xc4, 0x9e, 0x18, 0x87, 0xe6,
+	0x56, 0x65, 0xd9, 0x62, 0xad, 0xfc, 0x7a, 0x40, 0xc2, 0xf6, 0xa0, 0xc3, 0x7f, 0x9b, 0xf9, 0xa1,
+	0x99, 0x0f, 0x75, 0x21, 0x0e, 0x12, 0xeb, 0x82, 0x97, 0x66, 0xb1, 0xb2, 0xd6, 0x58, 0xdf, 0x09,
+	0x48, 0xe8, 0x89, 0x66, 0x9a, 0x61, 0x89, 0xec, 0x1a, 0x5a, 0x56, 0xad, 0xe2, 0x54, 0x27, 0xea,
+	0xcb, 0x77, 0x03, 0x12, 0xba, 0xc2, 0xb3, 0x6a, 0x35, 0x29, 0xb9, 0xf7, 0x0a, 0x4e, 0xb9, 0x86,
+	0x75, 0x80, 0x46, 0x2f, 0x33, 0x8c, 0xe7, 0x53, 0x39, 0xc3, 0xd1, 0xe4, 0x6e, 0x82, 0x63, 0x7a,
+	0xc4, 0xda, 0xd0, 0x14, 0xf8, 0x34, 0x47, 0x19, 0x51, 0xc2, 0x8e, 0xc1, 0x13, 0x28, 0x67, 0x8f,
+	0x53, 0x89, 0xb4, 0xc6, 0xce, 0xe1, 0x64, 0x24, 0x70, 0x18, 0x61, 0x2c, 0x23, 0x81, 0xc3, 0x07,
+	0x5a, 0x67, 0x1e, 0x38, 0x12, 0xa7, 0x63, 0xea, 0x94, 0x2f, 0x81, 0xa3, 0x67, 0xea, 0x2e, 0x1b,
+	0xd5, 0xdc, 0xdb, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x9b, 0x9d, 0x4f, 0x54, 0x01, 0x00,
+	0x00,
+}
diff --git a/internal/rpcreplay/proto/rpcreplay/rpcreplay.proto b/internal/rpcreplay/proto/rpcreplay/rpcreplay.proto
new file mode 100644
index 0000000..8475f33
--- /dev/null
+++ b/internal/rpcreplay/proto/rpcreplay/rpcreplay.proto
@@ -0,0 +1,71 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package rpcreplay;
+
+import "google/protobuf/any.proto";
+
+// An Entry represents a single RPC activity, typically a request or response.
+message Entry {
+  enum Kind {
+    TYPE_UNSPECIFIED = 0;
+
+    // A unary request.
+    // method: the full name of the method
+    // message: the request proto
+    // is_error: false
+    // ref_index: 0
+    REQUEST = 1;
+
+    // A unary response.
+    // method: the full name of the method
+    // message:
+    //   if is_error: a google.rpc.Status proto
+    //   else:        the response proto
+    // ref_index: index in the sequence of Entries of matching request (1-based)
+    RESPONSE = 2;
+
+    // A method that creates a stream.
+    // method: the full name of the method
+    // message:
+    //   if is_error: a google.rpc.Status proto
+    //   else:        nil
+    // ref_index: 0
+    CREATE_STREAM = 3;
+
+    // A call to Send on the client returned by a stream-creating method.
+    // method: unset
+    // message: the proto being sent
+    // is_error: false
+    // ref_index: index of matching CREATE_STREAM entry (1-based)
+    SEND = 4;   // message sent on stream
+
+    // A call to Recv on the client returned by a stream-creating method.
+    // method: unset
+    // message:
+    //   if is_error: a google.rpc.Status proto, or nil on EOF
+    //   else:        the received message
+    // ref_index: index of matching CREATE_STREAM entry
+    RECV = 5;   // message received from stream
+  }
+
+  Kind kind = 1;
+  string method = 2;                // method name
+  google.protobuf.Any message = 3;  // request, response or error status
+  bool is_error = 4;                // was response an error?
+  int32 ref_index = 5;              // for RESPONSE, index of matching request;
+                                    // for SEND/RECV, index of CREATE_STREAM
+}
diff --git a/internal/rpcreplay/rpcreplay.go b/internal/rpcreplay/rpcreplay.go
new file mode 100644
index 0000000..36b9241
--- /dev/null
+++ b/internal/rpcreplay/rpcreplay.go
@@ -0,0 +1,204 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpcreplay
+
+import (
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"io"
+
+	"google.golang.org/grpc/status"
+
+	pb "cloud.google.com/go/internal/rpcreplay/proto/rpcreplay"
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	spb "google.golang.org/genproto/googleapis/rpc/status"
+)
+
+// An entry holds one gRPC action (request, response, etc.).
+type entry struct {
+	kind     pb.Entry_Kind
+	method   string
+	msg      message
+	refIndex int // index of corresponding request or create-stream
+}
+
+func (e1 *entry) equal(e2 *entry) bool {
+	if e1 == nil && e2 == nil {
+		return true
+	}
+	if e1 == nil || e2 == nil {
+		return false
+	}
+	return e1.kind == e2.kind &&
+		e1.method == e2.method &&
+		proto.Equal(e1.msg.msg, e2.msg.msg) &&
+		errEqual(e1.msg.err, e2.msg.err) &&
+		e1.refIndex == e2.refIndex
+}
+
+func errEqual(e1, e2 error) bool {
+	if e1 == e2 {
+		return true
+	}
+	s1, ok1 := status.FromError(e1)
+	s2, ok2 := status.FromError(e2)
+	if !ok1 || !ok2 {
+		return false
+	}
+	return proto.Equal(s1.Proto(), s2.Proto())
+}
+
+// message holds either a single proto.Message or an error.
+type message struct {
+	msg proto.Message
+	err error
+}
+
+func (m *message) set(msg interface{}, err error) {
+	if msg != nil {
+		m.msg = msg.(proto.Message)
+	}
+	m.err = err
+}
+
+// File format:
+//   header
+//   sequence of Entry protos
+//
+// Header format:
+//   magic string
+//   a record containing the bytes of the initial state
+
+const magic = "RPCReplay"
+
+func writeHeader(w io.Writer, initial []byte) error {
+	if _, err := io.WriteString(w, magic); err != nil {
+		return err
+	}
+	return writeRecord(w, initial)
+}
+
+func readHeader(r io.Reader) ([]byte, error) {
+	var buf [len(magic)]byte
+	if _, err := io.ReadFull(r, buf[:]); err != nil {
+		if err == io.EOF {
+			err = errors.New("rpcreplay: empty replay file")
+		}
+		return nil, err
+	}
+	if string(buf[:]) != magic {
+		return nil, errors.New("rpcreplay: not a replay file (does not begin with magic string)")
+	}
+	bytes, err := readRecord(r)
+	if err == io.EOF {
+		err = errors.New("rpcreplay: missing initial state")
+	}
+	return bytes, err
+}
+
+func writeEntry(w io.Writer, e *entry) error {
+	var m proto.Message
+	if e.msg.err != nil && e.msg.err != io.EOF {
+		s, ok := status.FromError(e.msg.err)
+		if !ok {
+			return fmt.Errorf("rpcreplay: error %v is not a Status", e.msg.err)
+		}
+		m = s.Proto()
+	} else {
+		m = e.msg.msg
+	}
+	var a *any.Any
+	var err error
+	if m != nil {
+		a, err = ptypes.MarshalAny(m)
+		if err != nil {
+			return err
+		}
+	}
+	pe := &pb.Entry{
+		Kind:     e.kind,
+		Method:   e.method,
+		Message:  a,
+		IsError:  e.msg.err != nil,
+		RefIndex: int32(e.refIndex),
+	}
+	bytes, err := proto.Marshal(pe)
+	if err != nil {
+		return err
+	}
+	return writeRecord(w, bytes)
+}
+
+func readEntry(r io.Reader) (*entry, error) {
+	buf, err := readRecord(r)
+	if err == io.EOF {
+		return nil, nil
+	}
+	if err != nil {
+		return nil, err
+	}
+	var pe pb.Entry
+	if err := proto.Unmarshal(buf, &pe); err != nil {
+		return nil, err
+	}
+	var msg message
+	if pe.Message != nil {
+		var any ptypes.DynamicAny
+		if err := ptypes.UnmarshalAny(pe.Message, &any); err != nil {
+			return nil, err
+		}
+		if pe.IsError {
+			msg.err = status.ErrorProto(any.Message.(*spb.Status))
+		} else {
+			msg.msg = any.Message
+		}
+	} else if pe.IsError {
+		msg.err = io.EOF
+	} else {
+		return nil, errors.New("rpcreplay: entry with nil message and false is_error")
+	}
+	return &entry{
+		kind:     pe.Kind,
+		method:   pe.Method,
+		msg:      msg,
+		refIndex: int(pe.RefIndex),
+	}, nil
+}
+
+// A record consists of an unsigned 32-bit little-endian length L followed by L
+// bytes.
+
+func writeRecord(w io.Writer, data []byte) error {
+	if err := binary.Write(w, binary.LittleEndian, uint32(len(data))); err != nil {
+		return err
+	}
+	_, err := w.Write(data)
+	return err
+}
+
+func readRecord(r io.Reader) ([]byte, error) {
+	var size uint32
+	if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
+		return nil, err
+	}
+	buf := make([]byte, size)
+	if _, err := io.ReadFull(r, buf); err != nil {
+		return nil, err
+	}
+	return buf, nil
+}
diff --git a/internal/rpcreplay/rpcreplay_test.go b/internal/rpcreplay/rpcreplay_test.go
new file mode 100644
index 0000000..30e64f5
--- /dev/null
+++ b/internal/rpcreplay/rpcreplay_test.go
@@ -0,0 +1,98 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpcreplay
+
+import (
+	"bytes"
+	"io"
+	"reflect"
+	"testing"
+
+	rpb "cloud.google.com/go/internal/rpcreplay/proto/rpcreplay"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+func TestRecordIO(t *testing.T) {
+	buf := &bytes.Buffer{}
+	want := []byte{1, 2, 3}
+	if err := writeRecord(buf, want); err != nil {
+		t.Fatal(err)
+	}
+	got, err := readRecord(buf)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !bytes.Equal(got, want) {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+func TestHeaderIO(t *testing.T) {
+	buf := &bytes.Buffer{}
+	want := []byte{1, 2, 3}
+	if err := writeHeader(buf, want); err != nil {
+		t.Fatal(err)
+	}
+	got, err := readHeader(buf)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got %v, want %v", got, want)
+	}
+
+	// readHeader errors
+	for _, contents := range []string{"", "badmagic", "gRPCReplay"} {
+		if _, err := readHeader(bytes.NewBufferString(contents)); err == nil {
+			t.Errorf("%q: got nil, want error", contents)
+		}
+	}
+}
+
+func TestEntryIO(t *testing.T) {
+	for i, want := range []*entry{
+		{
+			kind:     rpb.Entry_REQUEST,
+			method:   "method",
+			msg:      message{msg: &rpb.Entry{}},
+			refIndex: 7,
+		},
+		{
+			kind:     rpb.Entry_RESPONSE,
+			method:   "method",
+			msg:      message{err: status.Error(codes.NotFound, "not found")},
+			refIndex: 8,
+		},
+		{
+			kind:     rpb.Entry_RECV,
+			method:   "method",
+			msg:      message{err: io.EOF},
+			refIndex: 3,
+		},
+	} {
+		buf := &bytes.Buffer{}
+		if err := writeEntry(buf, want); err != nil {
+			t.Fatal(err)
+		}
+		got, err := readEntry(buf)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if !got.equal(want) {
+			t.Errorf("#%d: got %v, want %v", i, got, want)
+		}
+	}
+}