[rsproxy] Add upload request read/write tests
resultstore.go: provide writeUploadRequests() as the natural
complement of readUploadRequests().
resultstore_test.go: test read/writeUploadRequests equivalence
Factor our a reusable test environment containing the server,
client, and fake connection.
Bug: 369980343
Change-Id: I01d5b3c3e8f2574805578ddb0e58e0c68629a00e
Reviewed-on: https://fuchsia-review.googlesource.com/c/rsclient/+/1348640
Reviewed-by: Jay Zhuang <jayzhuang@google.com>
Commit-Queue: David Fang <fangism@google.com>
diff --git a/MODULE.bazel b/MODULE.bazel
index b05f8a7..1ec036d 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -80,6 +80,7 @@
"com_github_bazelbuild_remote_apis",
"com_github_bazelbuild_remote_apis_sdks",
"com_github_golang_glog",
+ "com_github_google_go_cmp",
"com_github_google_uuid",
"com_github_googleapis_googleapis",
"org_golang_google_genproto",
diff --git a/go.mod b/go.mod
index faad09b..c4a905e 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@
github.com/bazelbuild/remote-apis v0.0.0-20230411132548-35aee1c4a425
github.com/bazelbuild/remote-apis-sdks v0.0.0-20250110170550-8bf84d3488e5
github.com/golang/glog v1.2.3
+ github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
golang.org/x/mod v0.22.0
golang.org/x/oauth2 v0.24.0
diff --git a/internal/pkg/rsproxy/BUILD.bazel b/internal/pkg/rsproxy/BUILD.bazel
index 0b9ebbc..203e159 100644
--- a/internal/pkg/rsproxy/BUILD.bazel
+++ b/internal/pkg/rsproxy/BUILD.bazel
@@ -45,12 +45,14 @@
deps = [
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/cas",
+ "@com_github_google_go_cmp//cmp",
"@org_golang_google_genproto//googleapis/devtools/resultstore/v2:resultstore",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//status",
"@org_golang_google_grpc//test/bufconn",
+ "@org_golang_google_protobuf//testing/protocmp",
"@org_golang_google_protobuf//types/known/emptypb",
],
)
diff --git a/internal/pkg/rsproxy/resultstore.go b/internal/pkg/rsproxy/resultstore.go
index 500181c..68c5b33 100644
--- a/internal/pkg/rsproxy/resultstore.go
+++ b/internal/pkg/rsproxy/resultstore.go
@@ -682,3 +682,21 @@
return readUploadRequests(reqsfile, reqs)
}
+
+// The opposite of readUploadRequests, primarily used for testing.
+// The length-encoding/decoding must match.
+func writeUploadRequests(output io.Writer, reqs <-chan *rspb.UploadRequest) error {
+ for req := range reqs {
+ wireBytes, err := proto.Marshal(req)
+ if err != nil {
+ return err
+ }
+ if err := binary.Write(output, binary.LittleEndian, uint32(len(wireBytes))); err != nil {
+ return err
+ }
+ if _, err := output.Write(wireBytes); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/internal/pkg/rsproxy/resultstore_test.go b/internal/pkg/rsproxy/resultstore_test.go
index c405f2b..03fde30 100644
--- a/internal/pkg/rsproxy/resultstore_test.go
+++ b/internal/pkg/rsproxy/resultstore_test.go
@@ -1,25 +1,26 @@
package rsproxy
import (
- "bufio"
+ "bytes"
"context"
+ "io"
"net"
- "strings"
+ "sync"
"testing"
+ "github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
-
- emptypb "google.golang.org/protobuf/types/known/emptypb"
+ "google.golang.org/protobuf/testing/protocmp"
recas "github.com/bazelbuild/remote-apis-sdks/go/pkg/cas"
-
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
rsgrpc "google.golang.org/genproto/googleapis/devtools/resultstore/v2"
rspb "google.golang.org/genproto/googleapis/devtools/resultstore/v2"
+ emptypb "google.golang.org/protobuf/types/known/emptypb"
)
const listenerBufSize = 1024 * 1024
@@ -125,11 +126,18 @@
return &rspb.UploadMetadata{}, nil
}
-func TestNewUploaderNoop(t *testing.T) {
- ctx := context.Background()
- // TODO: refactor into a common TestEnv
+// testEnv is a structure containing a server and client, with
+// gRPC connection to a fake listener. Construct with newTestEnv().
+type testEnv struct {
+ listener *bufconn.Listener
+ conn *grpc.ClientConn
+ server *grpc.Server
+ uploader *ResultStoreUploader
+ inputReader io.Reader
+}
+
+func newTestEnv(ctx context.Context, t *testing.T) (*testEnv, error) {
lis := bufconn.Listen(listenerBufSize) // in-memory
- defer lis.Close()
conn, err := grpc.DialContext(ctx, "bufnet",
grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {
@@ -138,31 +146,199 @@
grpc.WithTransportCredentials(insecure.NewCredentials()), // For testing
)
if err != nil {
- t.Fatalf("Failed to dial in-memory buffer listener: %v", err)
+ return nil, err
}
- defer conn.Close()
s := grpc.NewServer()
rsgrpc.RegisterResultStoreUploadServer(s, &fakeResultStoreServer{})
- defer s.Stop()
u := NewResultStoreUploader(ctx, conn)
- defer u.Close()
u.blobUploader = func(req *repb.BatchUpdateBlobsRequest_Request) error {
+ // TODO: record requests
return nil
}
u.fileUploader = func(input *recas.UploadInput) error {
+ // TODO: record requests
return nil
}
- strReader := strings.NewReader("") // empty requests
- bufReader := bufio.NewReader(strReader)
+ // caller should u.ReceiveUploadRequests(...) to source requests
- // connect input buffer to ResultStoreUploader
- u.ReceiveUploadRequests(bufReader)
+ e := &testEnv{
+ listener: lis,
+ conn: conn,
+ server: s,
+ uploader: u,
+ }
- if err := u.Run(ctx); err != nil {
- t.Errorf("Run() expected no error, but got: %v", err)
+ t.Cleanup(func() {
+ e.Shutdown(t)
+ })
+ return e, nil
+}
+
+func (e *testEnv) Shutdown(t *testing.T) {
+ if err := e.uploader.Close(); err != nil {
+ t.Errorf("Error closing uploader: %v", err)
+ }
+ e.server.GracefulStop()
+ if err := e.conn.Close(); err != nil {
+ t.Errorf("Error closing connection: %v", err)
+ }
+ if err := e.listener.Close(); err != nil {
+ t.Errorf("Error closing listener: %v", err)
+ }
+}
+
+func TestResultStoreUploaderRun(t *testing.T) {
+ ctx := context.Background()
+
+ tests := []struct {
+ name string
+ inputReqs []*rspb.UploadRequest
+ // TODO: wantedRPCCalls: ...
+ }{
+ {
+ name: "no-op",
+ inputReqs: []*rspb.UploadRequest{},
+ },
+ }
+
+ for _, test := range tests {
+ tenv, err := newTestEnv(ctx, t)
+
+ if err != nil {
+ t.Fatalf("Test \"%s\": failed to create test environment: %v", test.name, err)
+ }
+
+ // configure upload request input source
+ wireData, err := uploadRequestsToBytes(test.inputReqs)
+ if err != nil {
+ t.Fatalf("Test \"%s\": error serializing upload requests: %v", test.name, err)
+ }
+
+ u := tenv.uploader
+ buf := bytes.NewBuffer(wireData)
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ u.ReceiveUploadRequests(buf)
+ }()
+
+ if err := u.Run(ctx); err != nil {
+ t.Errorf("Run() expected no error, but got: %v", err)
+ }
+
+ // TODO: compare against expected RPC calls
+
+ wg.Wait()
+ }
+}
+
+func uploadRequestsToBytes(reqs []*rspb.UploadRequest) ([]byte, error) {
+ reqChan := make(chan *rspb.UploadRequest)
+
+ // serialize requests to a buffer
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for _, req := range reqs {
+ reqChan <- req
+ }
+ close(reqChan)
+ }()
+ defer wg.Wait()
+
+ var buf bytes.Buffer
+ if err := writeUploadRequests(&buf, reqChan); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func bytesToUploadRequests(data []byte) ([]*rspb.UploadRequest, error) {
+ gotReqs := []*rspb.UploadRequest{}
+ reqChan := make(chan *rspb.UploadRequest)
+
+ buf := bytes.NewBuffer(data)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ readUploadRequests(buf, reqChan)
+ }()
+ defer wg.Wait()
+
+ for req := range reqChan {
+ gotReqs = append(gotReqs, req)
+ }
+
+ return gotReqs, nil
+}
+
+func TestUploadRequestsReadWrite(t *testing.T) {
+ tests := []struct {
+ name string
+ requests []*rspb.UploadRequest
+ }{
+ {
+ name: "empty-requests",
+ requests: []*rspb.UploadRequest{},
+ },
+ {
+ name: "one-blank-request",
+ requests: []*rspb.UploadRequest{
+ &rspb.UploadRequest{},
+ },
+ },
+ {
+ name: "create-invocation-only",
+ requests: []*rspb.UploadRequest{
+ &rspb.UploadRequest{
+ UploadOperation: rspb.UploadRequest_CREATE,
+ Resource: &rspb.UploadRequest_Invocation{
+ Invocation: &rspb.Invocation{},
+ },
+ },
+ },
+ },
+ {
+ name: "create-and-finalize-invocation",
+ requests: []*rspb.UploadRequest{
+ &rspb.UploadRequest{
+ UploadOperation: rspb.UploadRequest_CREATE,
+ Resource: &rspb.UploadRequest_Invocation{
+ Invocation: &rspb.Invocation{ /* doesn't matter */ },
+ },
+ },
+ &rspb.UploadRequest{
+ UploadOperation: rspb.UploadRequest_FINALIZE,
+ Resource: &rspb.UploadRequest_Invocation{
+ Invocation: &rspb.Invocation{ /* doesn't matter */ },
+ },
+ },
+ },
+ },
+ }
+ for _, test := range tests {
+ wireData, err := uploadRequestsToBytes(test.requests)
+
+ if err != nil {
+ t.Fatalf("For test \"%s\", failed to writeUploadRequests: %v", test.name, err)
+ }
+
+ gotReqs, err := bytesToUploadRequests(wireData)
+ if err != nil {
+ t.Fatalf("For test \"%s\", failed to readUploadRequests: %v", test.name, err)
+ }
+
+ // compare requests against original
+ if diff := cmp.Diff(test.requests, gotReqs, protocmp.Transform()); diff != "" {
+ t.Errorf("For test \"%s\", found unexpected differences in UploadRequests (-want +got):\n%s", test.name, diff)
+ }
}
}