[rsproxy] Add upload request read/write tests

resultstore.go: provide writeUploadRequests() as the natural
complement of readUploadRequests().

resultstore_test.go: test read/writeUploadRequests equivalence
Factor our a reusable test environment containing the server,
client, and fake connection.

Bug: 369980343
Change-Id: I01d5b3c3e8f2574805578ddb0e58e0c68629a00e
Reviewed-on: https://fuchsia-review.googlesource.com/c/rsclient/+/1348640
Reviewed-by: Jay Zhuang <jayzhuang@google.com>
Commit-Queue: David Fang <fangism@google.com>
diff --git a/MODULE.bazel b/MODULE.bazel
index b05f8a7..1ec036d 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -80,6 +80,7 @@
     "com_github_bazelbuild_remote_apis",
     "com_github_bazelbuild_remote_apis_sdks",
     "com_github_golang_glog",
+    "com_github_google_go_cmp",
     "com_github_google_uuid",
     "com_github_googleapis_googleapis",
     "org_golang_google_genproto",
diff --git a/go.mod b/go.mod
index faad09b..c4a905e 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@
 	github.com/bazelbuild/remote-apis v0.0.0-20230411132548-35aee1c4a425
 	github.com/bazelbuild/remote-apis-sdks v0.0.0-20250110170550-8bf84d3488e5
 	github.com/golang/glog v1.2.3
+	github.com/google/go-cmp v0.6.0
 	github.com/google/uuid v1.6.0
 	golang.org/x/mod v0.22.0
 	golang.org/x/oauth2 v0.24.0
diff --git a/internal/pkg/rsproxy/BUILD.bazel b/internal/pkg/rsproxy/BUILD.bazel
index 0b9ebbc..203e159 100644
--- a/internal/pkg/rsproxy/BUILD.bazel
+++ b/internal/pkg/rsproxy/BUILD.bazel
@@ -45,12 +45,14 @@
     deps = [
         "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
         "@com_github_bazelbuild_remote_apis_sdks//go/pkg/cas",
+        "@com_github_google_go_cmp//cmp",
         "@org_golang_google_genproto//googleapis/devtools/resultstore/v2:resultstore",
         "@org_golang_google_grpc//:grpc",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//credentials/insecure",
         "@org_golang_google_grpc//status",
         "@org_golang_google_grpc//test/bufconn",
+        "@org_golang_google_protobuf//testing/protocmp",
         "@org_golang_google_protobuf//types/known/emptypb",
     ],
 )
diff --git a/internal/pkg/rsproxy/resultstore.go b/internal/pkg/rsproxy/resultstore.go
index 500181c..68c5b33 100644
--- a/internal/pkg/rsproxy/resultstore.go
+++ b/internal/pkg/rsproxy/resultstore.go
@@ -682,3 +682,21 @@
 
 	return readUploadRequests(reqsfile, reqs)
 }
+
+// The opposite of readUploadRequests, primarily used for testing.
+// The length-encoding/decoding must match.
+func writeUploadRequests(output io.Writer, reqs <-chan *rspb.UploadRequest) error {
+	for req := range reqs {
+		wireBytes, err := proto.Marshal(req)
+		if err != nil {
+			return err
+		}
+		if err := binary.Write(output, binary.LittleEndian, uint32(len(wireBytes))); err != nil {
+			return err
+		}
+		if _, err := output.Write(wireBytes); err != nil {
+			return err
+		}
+	}
+	return nil
+}
diff --git a/internal/pkg/rsproxy/resultstore_test.go b/internal/pkg/rsproxy/resultstore_test.go
index c405f2b..03fde30 100644
--- a/internal/pkg/rsproxy/resultstore_test.go
+++ b/internal/pkg/rsproxy/resultstore_test.go
@@ -1,25 +1,26 @@
 package rsproxy
 
 import (
-	"bufio"
+	"bytes"
 	"context"
+	"io"
 	"net"
-	"strings"
+	"sync"
 	"testing"
 
+	"github.com/google/go-cmp/cmp"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/status"
 	"google.golang.org/grpc/test/bufconn"
-
-	emptypb "google.golang.org/protobuf/types/known/emptypb"
+	"google.golang.org/protobuf/testing/protocmp"
 
 	recas "github.com/bazelbuild/remote-apis-sdks/go/pkg/cas"
-
 	repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
 	rsgrpc "google.golang.org/genproto/googleapis/devtools/resultstore/v2"
 	rspb "google.golang.org/genproto/googleapis/devtools/resultstore/v2"
+	emptypb "google.golang.org/protobuf/types/known/emptypb"
 )
 
 const listenerBufSize = 1024 * 1024
@@ -125,11 +126,18 @@
 	return &rspb.UploadMetadata{}, nil
 }
 
-func TestNewUploaderNoop(t *testing.T) {
-	ctx := context.Background()
-	// TODO: refactor into a common TestEnv
+// testEnv is a structure containing a server and client, with
+// gRPC connection to a fake listener.  Construct with newTestEnv().
+type testEnv struct {
+	listener    *bufconn.Listener
+	conn        *grpc.ClientConn
+	server      *grpc.Server
+	uploader    *ResultStoreUploader
+	inputReader io.Reader
+}
+
+func newTestEnv(ctx context.Context, t *testing.T) (*testEnv, error) {
 	lis := bufconn.Listen(listenerBufSize) // in-memory
-	defer lis.Close()
 
 	conn, err := grpc.DialContext(ctx, "bufnet",
 		grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {
@@ -138,31 +146,199 @@
 		grpc.WithTransportCredentials(insecure.NewCredentials()), // For testing
 	)
 	if err != nil {
-		t.Fatalf("Failed to dial in-memory buffer listener: %v", err)
+		return nil, err
 	}
-	defer conn.Close()
 
 	s := grpc.NewServer()
 	rsgrpc.RegisterResultStoreUploadServer(s, &fakeResultStoreServer{})
-	defer s.Stop()
 
 	u := NewResultStoreUploader(ctx, conn)
-	defer u.Close()
 
 	u.blobUploader = func(req *repb.BatchUpdateBlobsRequest_Request) error {
+		// TODO: record requests
 		return nil
 	}
 	u.fileUploader = func(input *recas.UploadInput) error {
+		// TODO: record requests
 		return nil
 	}
 
-	strReader := strings.NewReader("") // empty requests
-	bufReader := bufio.NewReader(strReader)
+	// caller should u.ReceiveUploadRequests(...) to source requests
 
-	// connect input buffer to ResultStoreUploader
-	u.ReceiveUploadRequests(bufReader)
+	e := &testEnv{
+		listener: lis,
+		conn:     conn,
+		server:   s,
+		uploader: u,
+	}
 
-	if err := u.Run(ctx); err != nil {
-		t.Errorf("Run() expected no error, but got: %v", err)
+	t.Cleanup(func() {
+		e.Shutdown(t)
+	})
+	return e, nil
+}
+
+func (e *testEnv) Shutdown(t *testing.T) {
+	if err := e.uploader.Close(); err != nil {
+		t.Errorf("Error closing uploader: %v", err)
+	}
+	e.server.GracefulStop()
+	if err := e.conn.Close(); err != nil {
+		t.Errorf("Error closing connection: %v", err)
+	}
+	if err := e.listener.Close(); err != nil {
+		t.Errorf("Error closing listener: %v", err)
+	}
+}
+
+func TestResultStoreUploaderRun(t *testing.T) {
+	ctx := context.Background()
+
+	tests := []struct {
+		name      string
+		inputReqs []*rspb.UploadRequest
+		// TODO: wantedRPCCalls: ...
+	}{
+		{
+			name:      "no-op",
+			inputReqs: []*rspb.UploadRequest{},
+		},
+	}
+
+	for _, test := range tests {
+		tenv, err := newTestEnv(ctx, t)
+
+		if err != nil {
+			t.Fatalf("Test \"%s\": failed to create test environment: %v", test.name, err)
+		}
+
+		// configure upload request input source
+		wireData, err := uploadRequestsToBytes(test.inputReqs)
+		if err != nil {
+			t.Fatalf("Test \"%s\": error serializing upload requests: %v", test.name, err)
+		}
+
+		u := tenv.uploader
+		buf := bytes.NewBuffer(wireData)
+
+		var wg sync.WaitGroup
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			u.ReceiveUploadRequests(buf)
+		}()
+
+		if err := u.Run(ctx); err != nil {
+			t.Errorf("Run() expected no error, but got: %v", err)
+		}
+
+		// TODO: compare against expected RPC calls
+
+		wg.Wait()
+	}
+}
+
+func uploadRequestsToBytes(reqs []*rspb.UploadRequest) ([]byte, error) {
+	reqChan := make(chan *rspb.UploadRequest)
+
+	// serialize requests to a buffer
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for _, req := range reqs {
+			reqChan <- req
+		}
+		close(reqChan)
+	}()
+	defer wg.Wait()
+
+	var buf bytes.Buffer
+	if err := writeUploadRequests(&buf, reqChan); err != nil {
+		return nil, err
+	}
+	return buf.Bytes(), nil
+}
+
+func bytesToUploadRequests(data []byte) ([]*rspb.UploadRequest, error) {
+	gotReqs := []*rspb.UploadRequest{}
+	reqChan := make(chan *rspb.UploadRequest)
+
+	buf := bytes.NewBuffer(data)
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		readUploadRequests(buf, reqChan)
+	}()
+	defer wg.Wait()
+
+	for req := range reqChan {
+		gotReqs = append(gotReqs, req)
+	}
+
+	return gotReqs, nil
+}
+
+func TestUploadRequestsReadWrite(t *testing.T) {
+	tests := []struct {
+		name     string
+		requests []*rspb.UploadRequest
+	}{
+		{
+			name:     "empty-requests",
+			requests: []*rspb.UploadRequest{},
+		},
+		{
+			name: "one-blank-request",
+			requests: []*rspb.UploadRequest{
+				&rspb.UploadRequest{},
+			},
+		},
+		{
+			name: "create-invocation-only",
+			requests: []*rspb.UploadRequest{
+				&rspb.UploadRequest{
+					UploadOperation: rspb.UploadRequest_CREATE,
+					Resource: &rspb.UploadRequest_Invocation{
+						Invocation: &rspb.Invocation{},
+					},
+				},
+			},
+		},
+		{
+			name: "create-and-finalize-invocation",
+			requests: []*rspb.UploadRequest{
+				&rspb.UploadRequest{
+					UploadOperation: rspb.UploadRequest_CREATE,
+					Resource: &rspb.UploadRequest_Invocation{
+						Invocation: &rspb.Invocation{ /* doesn't matter */ },
+					},
+				},
+				&rspb.UploadRequest{
+					UploadOperation: rspb.UploadRequest_FINALIZE,
+					Resource: &rspb.UploadRequest_Invocation{
+						Invocation: &rspb.Invocation{ /* doesn't matter */ },
+					},
+				},
+			},
+		},
+	}
+	for _, test := range tests {
+		wireData, err := uploadRequestsToBytes(test.requests)
+
+		if err != nil {
+			t.Fatalf("For test \"%s\", failed to writeUploadRequests: %v", test.name, err)
+		}
+
+		gotReqs, err := bytesToUploadRequests(wireData)
+		if err != nil {
+			t.Fatalf("For test \"%s\", failed to readUploadRequests: %v", test.name, err)
+		}
+
+		// compare requests against original
+		if diff := cmp.Diff(test.requests, gotReqs, protocmp.Transform()); diff != "" {
+			t.Errorf("For test \"%s\", found unexpected differences in UploadRequests (-want +got):\n%s", test.name, diff)
+		}
 	}
 }