blob: 3a8453650942512add5b3f82e49bbd498a915bd1 [file] [log] [blame]
package client_test
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/fakes"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/portpicker"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
"github.com/google/go-cmp/cmp"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
bsgrpc "google.golang.org/genproto/googleapis/bytestream"
)
const (
instance = "instance"
defaultCASConcurrency = 50
reqMaxSleepDuration = 5 * time.Millisecond
)
func TestSplitEndpoints(t *testing.T) {
t.Parallel()
ctx := context.Background()
l1, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
port := portpicker.PickUnusedPortTB(t)
l2, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
defer l1.Close()
defer l2.Close()
execServer := grpc.NewServer()
casServer := grpc.NewServer()
blob := []byte("foobar")
fake := &fakes.Reader{
Blob: blob,
Chunks: []int{6},
ExpectCompressed: false,
}
bsgrpc.RegisterByteStreamServer(casServer, fake)
go execServer.Serve(l1)
go casServer.Serve(l2)
defer casServer.Stop()
defer execServer.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: l1.Addr().String(),
CASService: l2.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false))
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer c.Close()
got, _, err := c.ReadBlob(ctx, digest.NewFromBlob(blob))
if err != nil {
t.Errorf("c.ReadBlob(ctx, digest) gave error %s, want nil", err)
}
if !bytes.Equal(blob, got) {
t.Errorf("c.ReadBlob(ctx, digest) gave diff: want %v, got %v", blob, got)
}
}
func TestReadEmptyBlobDoesNotCallServer(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
got, _, err := c.ReadBlob(ctx, digest.Empty)
if err != nil {
t.Errorf("c.ReadBlob(ctx, Empty) gave error %s, want nil", err)
}
if len(got) != 0 {
t.Errorf("c.ReadBlob(ctx, Empty) gave diff: want nil, got %v", got)
}
reads := fake.BlobReads(digest.Empty)
if reads != 0 {
t.Errorf("expected no blob reads to the fake, got %v", reads)
}
}
func TestRead(t *testing.T) {
t.Parallel()
type testCase struct {
name string
fake fakes.Reader
offset int64
limit int64
compress bool
want []byte // If nil, fake.blob is expected by default.
}
tests := []testCase{
{
name: "empty blob, 10 chunks",
fake: fakes.Reader{
Blob: []byte{},
Chunks: []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},
},
{
name: "blob 'foobar', 1 chunk",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{6},
},
},
{
name: "blob 'foobar', 3 evenly sized chunks",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{2, 2, 2},
},
},
{
name: "blob 'foobar', 3 unequal chunks",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{1, 3, 2},
},
},
{
name: "blob 'foobar', 2 chunks with 0-sized chunk between",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 0, 3},
},
},
{
name: "blob 'foobarbaz', partial read spanning multiple chunks",
fake: fakes.Reader{
Blob: []byte("foobarbaz"),
Chunks: []int{3, 0, 3, 3},
},
offset: 2,
limit: 5,
want: []byte("obarb"),
},
{
name: "blob 'foobar', partial read within chunk",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{6},
},
offset: 2,
limit: 3,
want: []byte("oba"),
},
{
name: "blob 'foobar', partial read from start",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 3},
},
offset: 0,
limit: 5,
want: []byte("fooba"),
},
{
name: "blob 'foobar', partial read with no limit",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 3},
},
offset: 2,
limit: 0,
want: []byte("obar"),
},
{
name: "blob 'foobar', partial read with limit extending beyond end of blob",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 3},
},
offset: 2,
limit: 8,
want: []byte("obar"),
},
}
var compressionTests []testCase
for _, tc := range tests {
if tc.limit == 0 {
// Limit tests don't work well with compression, as the limit refers to the compressed bytes
// while offset, per spec, refers to uncompressed bytes.
tc.compress = true
tc.name = tc.name + "_compressed"
compressionTests = append(compressionTests, tc)
}
}
tests = append(tests, compressionTests...)
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
defer listener.Close()
server := grpc.NewServer()
bsgrpc.RegisterByteStreamServer(server, &tc.fake)
go server.Serve(listener)
defer server.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: listener.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false))
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer c.Close()
tc.fake.Validate(t)
c.CompressedBytestreamThreshold = -1
if tc.compress {
c.CompressedBytestreamThreshold = 0
}
tc.fake.ExpectCompressed = tc.compress
want := tc.want
if want == nil {
want = tc.fake.Blob
}
if tc.offset == 0 && tc.limit > int64(len(tc.fake.Blob)) {
got, stats, err := c.ReadBlob(ctx, digest.NewFromBlob(want))
if err != nil {
t.Errorf("c.ReadBlob(ctx, digest) gave error %s, want nil", err)
}
if !bytes.Equal(want, got) {
t.Errorf("c.ReadBlob(ctx, digest) gave diff: want %v, got %v", want, got)
}
if int64(len(got)) != stats.LogicalMoved {
t.Errorf("c.ReadBlob(ctx, digest) = _, %v - logical bytes moved different than len of blob received", stats.LogicalMoved)
}
if tc.compress && len(tc.fake.Blob) > 0 && stats.LogicalMoved == stats.RealMoved {
t.Errorf("c.ReadBlob(ctx, digest) = %v - compression on but different real and logical bytes", stats)
}
}
got, stats, err := c.ReadBlobRange(ctx, digest.NewFromBlob(tc.fake.Blob), tc.offset, tc.limit)
if err != nil {
t.Errorf("c.ReadBlobRange(ctx, digest, %d, %d) gave error %s, want nil", tc.offset, tc.limit, err)
}
if !bytes.Equal(want, got) {
t.Errorf("c.ReadBlobRange(ctx, digest, %d, %d) gave diff: want %v, got %v", tc.offset, tc.limit, want, got)
}
if int64(len(got)) != stats.LogicalMoved {
t.Errorf("c.ReadBlob(ctx, digest) = _, %v - logical bytes moved different than len of blob received", stats.LogicalMoved)
}
if tc.compress && len(tc.fake.Blob) > 0 && stats.LogicalMoved == stats.RealMoved {
t.Errorf("c.ReadBlob(ctx, digest) = %v - compression on but same real and logical bytes", stats)
}
})
}
}
func TestWrite(t *testing.T) {
t.Parallel()
type testcase struct {
name string
blob []byte
cmp client.CompressedBytestreamThreshold
}
tests := []testcase{
{
name: "empty blob",
blob: []byte{},
},
{
name: "small blob",
blob: []byte("this is a pretty small blob comparatively"),
},
{
name: "5MB zero blob",
blob: make([]byte, 5*1024*1024),
},
}
var allTests []testcase
for _, tc := range tests {
for _, th := range []int{0, -1} {
t := tc
t.name += fmt.Sprintf("CompressionThreshold=%d", th)
t.cmp = client.CompressedBytestreamThreshold(th)
allTests = append(allTests, t)
}
}
for _, tc := range allTests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
defer listener.Close()
server := grpc.NewServer()
fake := &fakes.Writer{}
bsgrpc.RegisterByteStreamServer(server, fake)
go server.Serve(listener)
defer server.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: listener.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false), client.ChunkMaxSize(20)) // Use small write chunk size for tests.
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer c.Close()
fake.ExpectCompressed = int(tc.cmp) == 0
tc.cmp.Apply(c)
gotDg, err := c.WriteBlob(ctx, tc.blob)
if err != nil {
t.Errorf("c.WriteBlob(ctx, blob) gave error %s, wanted nil", err)
}
if fake.Err != nil {
t.Errorf("c.WriteBlob(ctx, blob) caused the server to return error %s (possibly unseen by c)", fake.Err)
}
if !bytes.Equal(tc.blob, fake.Buf) {
t.Errorf("c.WriteBlob(ctx, blob) had diff on blobs, want %v, got %v:", tc.blob, fake.Buf)
}
dg := digest.NewFromBlob(tc.blob)
if dg != gotDg {
t.Errorf("c.WriteBlob(ctx, blob) had diff on digest returned (want %s, got %s)", dg, gotDg)
}
})
}
}
func TestMissingBlobs(t *testing.T) {
t.Parallel()
tests := []struct {
name string
// present is the blobs present in the CAS.
present []string
// input is the digests given to MissingBlobs.
input []digest.Digest
// want is the returned list of digests.
want []digest.Digest
}{
{
name: "none present",
present: nil,
input: []digest.Digest{
digest.NewFromBlob([]byte("foo")),
digest.NewFromBlob([]byte("bar")),
digest.NewFromBlob([]byte("baz")),
},
want: []digest.Digest{
digest.NewFromBlob([]byte("foo")),
digest.NewFromBlob([]byte("bar")),
digest.NewFromBlob([]byte("baz")),
},
},
{
name: "all present",
present: []string{"foo", "bar", "baz"},
input: []digest.Digest{
digest.NewFromBlob([]byte("foo")),
digest.NewFromBlob([]byte("bar")),
digest.NewFromBlob([]byte("baz")),
},
want: nil,
},
{
name: "some present",
present: []string{"foo", "bar"},
input: []digest.Digest{
digest.NewFromBlob([]byte("foo")),
digest.NewFromBlob([]byte("bar")),
digest.NewFromBlob([]byte("baz")),
},
want: []digest.Digest{
digest.NewFromBlob([]byte("baz")),
},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
for _, s := range tc.present {
fake.Put([]byte(s))
}
t.Logf("CAS contains digests of %s", tc.present)
got, err := c.MissingBlobs(ctx, tc.input)
if err != nil {
t.Errorf("c.MissingBlobs(ctx, %v) gave error %s, expected nil", tc.input, err)
}
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("c.MissingBlobs(ctx, %v) gave diff (want -> got):\n%s", tc.input, diff)
}
})
}
}
func TestUploadConcurrent(t *testing.T) {
t.Parallel()
blobs := make([][]byte, 50)
for i := range blobs {
blobs[i] = []byte(fmt.Sprint(i))
}
type testCase struct {
name string
// Whether to use batching.
batching client.UseBatchOps
// Whether to use background CAS ops.
unified client.UnifiedUploads
// The batch size.
maxBatchDigests client.MaxBatchDigests
// The CAS concurrency for uploading the blobs.
concurrency client.CASConcurrency
}
var tests []testCase
for _, ub := range []client.UseBatchOps{false, true} {
for _, cb := range []client.UnifiedUploads{false, true} {
for _, conc := range []client.CASConcurrency{3, 100} {
tc := testCase{
name: fmt.Sprintf("batch:%t,unified:%t,conc:%d", ub, cb, conc),
batching: ub,
unified: cb,
maxBatchDigests: client.MaxBatchDigests(9),
concurrency: conc,
}
tests = append(tests, tc)
}
}
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
for _, opt := range []client.Opt{tc.batching, tc.maxBatchDigests, tc.concurrency, tc.unified} {
opt.Apply(c)
}
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 100; i++ {
eg.Go(func() error {
var input []*uploadinfo.Entry
for _, blob := range append(blobs, blobs...) {
input = append(input, uploadinfo.EntryFromBlob(blob))
}
if _, _, err := c.UploadIfMissing(eCtx, input...); err != nil {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
}
return nil
})
}
if err := eg.Wait(); err != nil {
t.Error(err)
}
// Verify everything was written exactly once.
for i, blob := range blobs {
dg := digest.NewFromBlob(blob)
if tc.unified {
if fake.BlobWrites(dg) != 1 {
t.Errorf("wanted 1 write for blob %v: %v, got %v", i, dg, fake.BlobWrites(dg))
}
if fake.BlobMissingReqs(dg) != 1 {
t.Errorf("wanted 1 missing request for blob %v: %v, got %v", i, dg, fake.BlobMissingReqs(dg))
}
}
}
})
}
}
func TestUploadConcurrentBatch(t *testing.T) {
t.Parallel()
blobs := make([][]byte, 100)
for i := range blobs {
blobs[i] = []byte(fmt.Sprint(i))
}
ctx := context.Background()
for _, uo := range []client.UnifiedUploads{false, true} {
uo := uo
t.Run(fmt.Sprintf("unified:%t", uo), func(t *testing.T) {
t.Parallel()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
c.MaxBatchDigests = 50
client.UnifiedUploadTickDuration(500 * time.Millisecond).Apply(c)
uo.Apply(c)
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 10; i++ {
i := i
eg.Go(func() error {
var input []*uploadinfo.Entry
// Upload 15 digests in a sliding window.
for j := i * 10; j < i*10+15 && j < len(blobs); j++ {
input = append(input, uploadinfo.EntryFromBlob(blobs[j]))
// Twice to have the same upload in same call, in addition to between calls.
input = append(input, uploadinfo.EntryFromBlob(blobs[j]))
}
if _, _, err := c.UploadIfMissing(eCtx, input...); err != nil {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
}
return nil
})
}
if err := eg.Wait(); err != nil {
t.Error(err)
}
// Verify everything was written exactly once.
for i, blob := range blobs {
dg := digest.NewFromBlob(blob)
if c.UnifiedUploads {
if fake.BlobWrites(dg) != 1 {
t.Errorf("wanted 1 write for blob %v: %v, got %v", i, dg, fake.BlobWrites(dg))
}
if fake.BlobMissingReqs(dg) != 1 {
t.Errorf("wanted 1 missing requests for blob %v: %v, got %v", i, dg, fake.BlobMissingReqs(dg))
}
}
}
expectedReqs := 10
if c.UnifiedUploads {
// All the 100 digests will be batched into two batches, together.
expectedReqs = 2
}
if fake.BatchReqs() != expectedReqs {
t.Errorf("%d requests were made to BatchUpdateBlobs, wanted %v", fake.BatchReqs(), expectedReqs)
}
})
}
}
func TestUploadCancel(t *testing.T) {
t.Parallel()
ctx := context.Background()
blob := []byte{1, 2, 3}
dg := digest.NewFromBlob(blob)
for _, uo := range []client.UnifiedUploads{false, true} {
uo := uo
t.Run(fmt.Sprintf("unified:%t", uo), func(t *testing.T) {
t.Parallel()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
wait := make(chan bool)
fake.PerDigestBlockFn[dg] = func() {
<-wait
}
c := e.Client.GrpcClient
uo.Apply(c)
client.UseBatchOps(false).Apply(c)
cCtx, cancel := context.WithCancel(ctx)
eg, _ := errgroup.WithContext(cCtx)
ue := uploadinfo.EntryFromBlob(blob)
eg.Go(func() error {
if _, _, err := c.UploadIfMissing(cCtx, ue); !errors.Is(err, context.Canceled) {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected to wrap context.Canceled", err)
}
return nil
})
eg.Go(func() error {
time.Sleep(60 * time.Millisecond) // Enough time to trigger upload cycle.
cancel()
time.Sleep(10 * time.Millisecond)
return nil
})
if err := eg.Wait(); err != nil {
t.Error(err)
}
// Verify that nothing was written.
if fake.BlobWrites(ue.Digest) != 0 {
t.Errorf("Blob was written, expected cancellation.")
}
close(wait)
})
}
}
func TestUploadConcurrentCancel(t *testing.T) {
t.Parallel()
blobs := make([][]byte, 50)
for i := range blobs {
blobs[i] = []byte(fmt.Sprint(i))
}
var input []*uploadinfo.Entry
for _, blob := range blobs {
input = append(input, uploadinfo.EntryFromBlob(blob))
}
input = append(input, input...)
type testCase struct {
name string
// Whether to use batching.
batching client.UseBatchOps
// Whether to use background CAS ops.
unified client.UnifiedUploads
// The batch size.
maxBatchDigests client.MaxBatchDigests
// The CAS concurrency for uploading the blobs.
concurrency client.CASConcurrency
}
var tests []testCase
for _, ub := range []client.UseBatchOps{false, true} {
for _, uo := range []client.UnifiedUploads{false, true} {
for _, conc := range []client.CASConcurrency{3, 20} {
tc := testCase{
name: fmt.Sprintf("batch:%t,unified:%t,conc:%d", ub, uo, conc),
batching: ub,
unified: uo,
maxBatchDigests: client.MaxBatchDigests(9),
concurrency: conc,
}
tests = append(tests, tc)
}
}
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
for _, opt := range []client.Opt{tc.batching, tc.maxBatchDigests, tc.concurrency, tc.unified} {
opt.Apply(c)
}
eg, eCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
if _, _, err := c.UploadIfMissing(eCtx, input...); err != nil {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
}
return nil
})
cCtx, cancel := context.WithCancel(eCtx)
for i := 0; i < 50; i++ {
eg.Go(func() error {
// Verify that we got a context cancellation error. Sometimes, the request can succeed, if the original thread takes a while to run.
if _, _, err := c.UploadIfMissing(cCtx, input...); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %+v!, expected context canceled", err)
}
return nil
})
}
eg.Go(func() error {
time.Sleep(time.Duration(20*rand.Float32()) * time.Microsecond)
cancel()
return nil
})
if err := eg.Wait(); err != nil {
t.Error(err)
}
if tc.unified {
// Verify everything was written exactly once, despite the context being canceled.
for i, blob := range blobs {
dg := digest.NewFromBlob(blob)
if fake.BlobWrites(dg) != 1 {
t.Errorf("wanted 1 write for blob %v: %v, got %v", i, dg, fake.BlobWrites(dg))
}
// It is possible to get more than 1 GetMissingBlobs requests if all concurrent requests for a particular
// digest get cancelled, because then this upload gets actually cancelled and deleted.
// This will happen if e.g. the original (non-canceled) upload thread is the last to run.
if fake.BlobMissingReqs(dg) > 2 {
t.Errorf("wanted <=2 missing requests for blob %v: %v, got %v", i, dg, fake.BlobMissingReqs(dg))
}
}
}
})
}
}
func TestUpload(t *testing.T) {
t.Parallel()
var twoThousandBlobs [][]byte
var thousandBlobs [][]byte
for i := 0; i < 2000; i++ {
var buf = new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
// Write a few extra bytes so that we have > chunkSize sized blobs.
for j := 0; j < 10; j++ {
binary.Write(buf, binary.LittleEndian, 0)
}
twoThousandBlobs = append(twoThousandBlobs, buf.Bytes())
if i%2 == 0 {
thousandBlobs = append(thousandBlobs, buf.Bytes())
}
}
type testcase struct {
name string
// input is the blobs to try to store; they're converted to a file map by the test
input [][]byte
// present is the blobs already present in the CAS; they're pre-loaded into the fakes.CAS object
// and the test verifies no attempt was made to upload them.
present [][]byte
opts []client.Opt
}
tests := []testcase{
{
name: "No blobs",
input: nil,
present: nil,
},
{
name: "None present",
input: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
present: nil,
},
{
name: "All present",
input: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
present: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
},
{
name: "Some present",
input: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
present: [][]byte{[]byte("bar")},
},
{
name: "2000 blobs heavy concurrency",
input: twoThousandBlobs,
present: thousandBlobs,
opts: []client.Opt{client.CASConcurrency(500)},
},
}
var allTests []testcase
for _, tc := range tests {
for _, ub := range []client.UseBatchOps{false, true} {
for _, uo := range []client.UnifiedUploads{false, true} {
for _, cmp := range []client.CompressedBytestreamThreshold{-1, 0} {
t := tc
t.name = fmt.Sprintf("%s_UsingBatch:%t,UnifiedUploads:%t,CompressionThresh:%d", tc.name, ub, uo, cmp)
t.opts = append(t.opts, []client.Opt{ub, uo, cmp}...)
allTests = append(allTests, t)
}
}
}
}
for _, tc := range allTests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
for _, o := range tc.opts {
o.Apply(c)
}
present := make(map[digest.Digest]bool)
for _, blob := range tc.present {
fake.Put(blob)
present[digest.NewFromBlob(blob)] = true
}
var input []*uploadinfo.Entry
for _, blob := range tc.input {
input = append(input, uploadinfo.EntryFromBlob(blob))
}
missing, bMoved, err := c.UploadIfMissing(ctx, input...)
if err != nil {
t.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
}
missingSet := make(map[digest.Digest]struct{})
totalBytes := int64(0)
for _, dg := range missing {
missingSet[dg] = struct{}{}
totalBytes += dg.Size
}
// It's much harder to check the case where compression is on as we also have to ignore batch ops,
// so we just don't.
if int(c.CompressedBytestreamThreshold) < 0 && bMoved != totalBytes {
t.Errorf("c.UploadIfMissing(ctx, input) = %v, expected %v (reported different bytes moved and digest size despite no compression)", bMoved, totalBytes)
}
for i, ue := range input {
dg := ue.Digest
blob := tc.input[i]
if present[dg] {
if fake.BlobWrites(dg) > 0 {
t.Errorf("blob %v with digest %s was uploaded even though it was already present in the CAS", blob, dg)
}
if _, ok := missingSet[dg]; ok {
t.Errorf("Stats said that blob %v with digest %s was missing in the CAS", blob, dg)
}
continue
}
if gotBlob, ok := fake.Get(dg); !ok {
t.Errorf("blob %v with digest %s was not uploaded, expected it to be present in the CAS", blob, dg)
} else if !bytes.Equal(blob, gotBlob) {
t.Errorf("blob digest %s had diff on uploaded blob: want %v, got %v", dg, blob, gotBlob)
}
if _, ok := missingSet[dg]; !ok {
t.Errorf("Stats said that blob %v with digest %s was present in the CAS", blob, dg)
}
}
if fake.MaxConcurrency() > defaultCASConcurrency {
t.Errorf("CAS concurrency %v was higher than max %v", fake.MaxConcurrency(), defaultCASConcurrency)
}
})
}
}
func TestWriteBlobsBatching(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := []struct {
name string
sizes []int
batchReqs int
writeReqs int
}{
{
name: "single small blob",
sizes: []int{1},
batchReqs: 0,
writeReqs: 1,
},
{
name: "large and small blobs hitting max exactly",
sizes: []int{338, 338, 338, 1, 1, 1},
batchReqs: 3,
writeReqs: 0,
},
{
name: "small batches of big blobs",
sizes: []int{88, 88, 88, 88, 88, 88, 88},
batchReqs: 2,
writeReqs: 1,
},
{
name: "batch with blob that's too big",
sizes: []int{400, 88, 88, 88},
batchReqs: 1,
writeReqs: 1,
},
{
name: "many small blobs hitting max digests",
sizes: []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
batchReqs: 4,
writeReqs: 0,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
c.MaxBatchSize = 500
c.MaxBatchDigests = 4
// Each batch request frame overhead is 13 bytes.
// A per-blob overhead is 74 bytes.
blobs := make(map[digest.Digest][]byte)
for i, sz := range tc.sizes {
blob := make([]byte, int(sz))
blob[0] = byte(i) // Ensure blobs are distinct
blobs[digest.NewFromBlob(blob)] = blob
}
err := c.WriteBlobs(ctx, blobs)
if err != nil {
t.Fatalf("c.WriteBlobs(ctx, inputs) gave error %s, expected nil", err)
}
for d, blob := range blobs {
if gotBlob, ok := fake.Get(d); !ok {
t.Errorf("blob with digest %s was not uploaded, expected it to be present in the CAS", d)
} else if !bytes.Equal(blob, gotBlob) {
t.Errorf("blob with digest %s had diff on uploaded blob: wanted %v, got %v", d, blob, gotBlob)
}
}
if fake.BatchReqs() != tc.batchReqs {
t.Errorf("%d requests were made to BatchUpdateBlobs, wanted %d", fake.BatchReqs(), tc.batchReqs)
}
if fake.WriteReqs() != tc.writeReqs {
t.Errorf("%d requests were made to Write, wanted %d", fake.WriteReqs(), tc.writeReqs)
}
})
}
}
func TestFlattenActionOutputs(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
fooDigest := digest.TestNew("1001", 1)
barDigest := digest.TestNew("1002", 2)
dirB := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
},
}
bDigest := digest.TestNewFromMessage(dirB)
dirA := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "b", Digest: bDigest.ToProto()},
},
Files: []*repb.FileNode{
{Name: "bar", Digest: barDigest.ToProto()},
},
}
aDigest := digest.TestNewFromMessage(dirA)
root := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "a", Digest: aDigest.ToProto()},
{Name: "b", Digest: bDigest.ToProto()},
},
}
tr := &repb.Tree{
Root: root,
Children: []*repb.Directory{dirA, dirB},
}
treeBlob, err := proto.Marshal(tr)
if err != nil {
t.Errorf("failed marshalling Tree: %s", err)
}
treeA := &repb.Tree{
Root: dirA,
Children: []*repb.Directory{dirB},
}
treeABlob, err := proto.Marshal(treeA)
if err != nil {
t.Errorf("failed marshalling Tree: %s", err)
}
treeDigest := fake.Put(treeBlob)
treeADigest := fake.Put(treeABlob)
ar := &repb.ActionResult{
OutputFiles: []*repb.OutputFile{
&repb.OutputFile{Path: "foo", Digest: fooDigest.ToProto()}},
OutputFileSymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/bar", Target: "../dir/a/bar"}},
OutputDirectorySymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/a", Target: "../dir/a"}},
OutputDirectories: []*repb.OutputDirectory{
&repb.OutputDirectory{Path: "dir", TreeDigest: treeDigest.ToProto()},
&repb.OutputDirectory{Path: "dir2", TreeDigest: treeADigest.ToProto()},
},
}
outputs, err := c.FlattenActionOutputs(ctx, ar)
if err != nil {
t.Errorf("error in FlattenActionOutputs: %s", err)
}
wantOutputs := map[string]*client.TreeOutput{
"dir/a/b/foo": &client.TreeOutput{Digest: fooDigest, IsExecutable: true},
"dir/a/bar": &client.TreeOutput{Digest: barDigest},
"dir/b/foo": &client.TreeOutput{Digest: fooDigest, IsExecutable: true},
"dir2/b/foo": &client.TreeOutput{Digest: fooDigest, IsExecutable: true},
"dir2/bar": &client.TreeOutput{Digest: barDigest},
"foo": &client.TreeOutput{Digest: fooDigest},
"x/a": &client.TreeOutput{SymlinkTarget: "../dir/a"},
"x/bar": &client.TreeOutput{SymlinkTarget: "../dir/a/bar"},
}
if len(outputs) != len(wantOutputs) {
t.Errorf("FlattenActionOutputs gave wrong number of outputs: want %d, got %d", len(wantOutputs), len(outputs))
}
for path, wantOut := range wantOutputs {
got, ok := outputs[path]
if !ok {
t.Errorf("expected output %s is missing", path)
}
if got.Path != path {
t.Errorf("FlattenActionOutputs keyed %s output with %s path", got.Path, path)
}
if wantOut.Digest != got.Digest {
t.Errorf("FlattenActionOutputs gave digest diff on %s: want %v, got: %v", path, wantOut.Digest, got.Digest)
}
if wantOut.IsExecutable != got.IsExecutable {
t.Errorf("FlattenActionOutputs gave IsExecutable diff on %s: want %v, got: %v", path, wantOut.IsExecutable, got.IsExecutable)
}
if wantOut.SymlinkTarget != got.SymlinkTarget {
t.Errorf("FlattenActionOutputs gave symlink target diff on %s: want %s, got: %s", path, wantOut.SymlinkTarget, got.SymlinkTarget)
}
}
}
func TestDownloadActionOutputs(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
cache := filemetadata.NewSingleFlightCache()
fooDigest := fake.Put([]byte("foo"))
barDigest := fake.Put([]byte("bar"))
dirB := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
},
}
bDigest := digest.TestNewFromMessage(dirB)
dirA := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "b", Digest: bDigest.ToProto()},
{Name: "e2", Digest: digest.Empty.ToProto()},
},
Files: []*repb.FileNode{
{Name: "bar", Digest: barDigest.ToProto()},
},
}
aDigest := digest.TestNewFromMessage(dirA)
root := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "a", Digest: aDigest.ToProto()},
{Name: "b", Digest: bDigest.ToProto()},
{Name: "e1", Digest: digest.Empty.ToProto()},
},
}
tree := &repb.Tree{
Root: root,
Children: []*repb.Directory{dirA, dirB, &repb.Directory{}},
}
treeBlob, err := proto.Marshal(tree)
if err != nil {
t.Fatalf("failed marshalling Tree: %s", err)
}
treeA := &repb.Tree{
Root: dirA,
Children: []*repb.Directory{dirB, &repb.Directory{}},
}
treeABlob, err := proto.Marshal(treeA)
if err != nil {
t.Fatalf("failed marshalling Tree: %s", err)
}
treeDigest := fake.Put(treeBlob)
treeADigest := fake.Put(treeABlob)
ar := &repb.ActionResult{
OutputFiles: []*repb.OutputFile{
&repb.OutputFile{Path: "../foo", Digest: fooDigest.ToProto()}},
OutputFileSymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/bar", Target: "../dir/a/bar"}},
OutputDirectorySymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/a", Target: "../dir/a"}},
OutputDirectories: []*repb.OutputDirectory{
&repb.OutputDirectory{Path: "dir", TreeDigest: treeDigest.ToProto()},
&repb.OutputDirectory{Path: "dir2", TreeDigest: treeADigest.ToProto()},
},
}
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
wd := "wd"
if err := os.Mkdir(filepath.Join(execRoot, wd), os.ModePerm); err != nil {
t.Fatalf("failed to create working directory %v: %v", wd, err)
}
defer os.RemoveAll(execRoot)
_, err = c.DownloadActionOutputs(ctx, ar, filepath.Join(execRoot, wd), cache)
if err != nil {
t.Errorf("error in DownloadActionOutputs: %s", err)
}
wantOutputs := []struct {
path string
isExecutable bool
contents []byte
symlinkTarget string
isEmptyDirectory bool
fileDigest *digest.Digest
}{
{
path: "wd/dir/e1",
isEmptyDirectory: true,
},
{
path: "wd/dir/a/e2",
isEmptyDirectory: true,
},
{
path: "wd/dir/a/b/foo",
isExecutable: true,
contents: []byte("foo"),
fileDigest: &fooDigest,
},
{
path: "wd/dir/a/bar",
contents: []byte("bar"),
},
{
path: "wd/dir/b/foo",
isExecutable: true,
contents: []byte("foo"),
fileDigest: &fooDigest,
},
{
path: "wd/dir2/e2",
isEmptyDirectory: true,
},
{
path: "wd/dir2/b/foo",
isExecutable: true,
contents: []byte("foo"),
fileDigest: &fooDigest,
},
{
path: "wd/dir2/bar",
contents: []byte("bar"),
},
{
path: "foo",
contents: []byte("foo"),
fileDigest: &fooDigest,
},
{
path: "wd/x/a",
symlinkTarget: "../dir/a",
},
{
path: "wd/x/bar",
symlinkTarget: "../dir/a/bar",
},
}
for _, out := range wantOutputs {
path := filepath.Join(execRoot, out.path)
fi, err := os.Lstat(path)
if err != nil {
t.Errorf("expected output %s is missing", path)
}
if out.fileDigest != nil {
fmd := cache.Get(path)
if fmd == nil {
t.Errorf("cache does not contain metadata for path: %v", path)
} else {
if diff := cmp.Diff(*out.fileDigest, fmd.Digest); diff != "" {
t.Errorf("invalid digeset in cache for path %v, (-want +got): %v", path, diff)
}
}
}
if out.symlinkTarget != "" {
if fi.Mode()&os.ModeSymlink == 0 {
t.Errorf("expected %s to be a symlink, got %v", path, fi.Mode())
}
target, e := os.Readlink(path)
if e != nil {
t.Errorf("expected %s to be a symlink, got error reading symlink: %v", path, err)
}
if target != out.symlinkTarget {
t.Errorf("expected %s to be a symlink to %s, got %s", path, out.symlinkTarget, target)
}
} else if out.isEmptyDirectory {
if !fi.Mode().IsDir() {
t.Errorf("expected %s to be a directory, got %s", path, fi.Mode())
}
files, err := ioutil.ReadDir(path)
if err != nil {
t.Errorf("expected %s to be a directory, got error reading directory: %v", path, err)
}
if len(files) != 0 {
t.Errorf("expected %s to be an empty directory, got contents: %v", path, files)
}
} else {
contents, err := ioutil.ReadFile(path)
if err != nil {
t.Errorf("error reading from %s: %v", path, err)
}
if !bytes.Equal(contents, out.contents) {
t.Errorf("expected %s to contain %v, got %v", path, out.contents, contents)
}
// TODO(olaola): verify the file is executable, if required.
// Doing this naively failed go test in CI.
}
}
}
func TestDownloadDirectory(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
cache := filemetadata.NewSingleFlightCache()
fooDigest := fake.Put([]byte("foo"))
dir := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
},
Directories: []*repb.DirectoryNode{
{Name: "empty", Digest: digest.Empty.ToProto()},
},
}
dirBlob, err := proto.Marshal(dir)
if err != nil {
t.Fatalf("failed marshalling Tree: %s", err)
}
fake.Put(dirBlob)
d := digest.TestNewFromMessage(dir)
execRoot := t.TempDir()
outputs, _, err := c.DownloadDirectory(ctx, d, execRoot, cache)
if err != nil {
t.Errorf("error in DownloadActionOutputs: %s", err)
}
if diff := cmp.Diff(outputs, map[string]*client.TreeOutput{
"empty": {
Digest: digest.Empty,
Path: "empty",
IsEmptyDirectory: true,
},
"foo": {
Digest: fooDigest,
Path: "foo",
IsExecutable: true,
}}); diff != "" {
t.Fatalf("DownloadDirectory() mismatch (-want +got):\n%s", diff)
}
b, err := ioutil.ReadFile(filepath.Join(execRoot, "foo"))
if err != nil {
t.Fatalf("failed to read foo: %s", err)
}
if want, got := []byte("foo"), b; !bytes.Equal(want, got) {
t.Errorf("want %s, got %s", want, got)
}
}
func TestDownloadActionOutputsErrors(t *testing.T) {
ar := &repb.ActionResult{}
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: "foo", Digest: digest.NewFromBlob([]byte("foo")).ToProto()})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: "bar", Digest: digest.NewFromBlob([]byte("bar")).ToProto()})
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
defer os.RemoveAll(execRoot)
for _, ub := range []client.UseBatchOps{false, true} {
ub := ub
t.Run(fmt.Sprintf("%sUsingBatch:%t", t.Name(), ub), func(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
c := e.Client.GrpcClient
ub.Apply(c)
_, err := c.DownloadActionOutputs(ctx, ar, execRoot, filemetadata.NewSingleFlightCache())
if status.Code(err) != codes.NotFound && !strings.Contains(err.Error(), "not found") {
t.Errorf("expected 'not found' error in DownloadActionOutputs, got: %v", err)
}
})
}
}
func TestDownloadActionOutputsBatching(t *testing.T) {
tests := []struct {
name string
sizes []int
locality bool
batchReqs int
}{
{
name: "single small blob",
sizes: []int{1},
batchReqs: 0,
},
{
name: "large and small blobs hitting max exactly",
sizes: []int{338, 338, 338, 1, 1, 1},
batchReqs: 3,
},
{
name: "small batches of big blobs",
sizes: []int{88, 88, 88, 88, 88, 88, 88},
batchReqs: 2,
},
{
name: "batch with blob that's too big",
sizes: []int{400, 88, 88, 88},
batchReqs: 1,
},
{
name: "many small blobs hitting max digests",
sizes: []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
batchReqs: 4,
},
{
name: "single small blob locality",
sizes: []int{1},
locality: true,
batchReqs: 0,
},
{
name: "large and small blobs hitting max exactly locality",
sizes: []int{338, 338, 338, 1, 1, 1},
locality: true,
batchReqs: 2,
},
{
name: "small batches of big blobs locality",
sizes: []int{88, 88, 88, 88, 88, 88, 88},
locality: true,
batchReqs: 2,
},
{
name: "batch with blob that's too big locality",
sizes: []int{400, 88, 88, 88},
locality: true,
batchReqs: 1,
},
{
name: "many small blobs hitting max digests locality",
sizes: []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
locality: true,
batchReqs: 4,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
c.MaxBatchSize = 500
c.MaxBatchDigests = 4
// Each batch request frame overhead is 13 bytes.
// A per-blob overhead is 74 bytes.
c.UtilizeLocality = client.UtilizeLocality(tc.locality)
var dgs []digest.Digest
blobs := make(map[digest.Digest][]byte)
ar := &repb.ActionResult{}
for i, sz := range tc.sizes {
blob := make([]byte, int(sz))
if sz > 0 {
blob[0] = byte(i) // Ensure blobs are distinct
}
dg := digest.NewFromBlob(blob)
blobs[dg] = blob
dgs = append(dgs, dg)
if sz > 0 {
// Don't seed fake with empty blob, because it should not be called.
fake.Put(blob)
}
name := fmt.Sprintf("foo_%s", dg)
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dg.ToProto()})
}
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
defer os.RemoveAll(execRoot)
_, err = c.DownloadActionOutputs(ctx, ar, execRoot, filemetadata.NewSingleFlightCache())
if err != nil {
t.Errorf("error in DownloadActionOutputs: %s", err)
}
for dg, data := range blobs {
path := filepath.Join(execRoot, fmt.Sprintf("foo_%s", dg))
contents, err := ioutil.ReadFile(path)
if err != nil {
t.Errorf("error reading from %s: %v", path, err)
}
if !bytes.Equal(contents, data) {
t.Errorf("expected %s to contain %v, got %v", path, contents, data)
}
}
if fake.BatchReqs() != tc.batchReqs {
t.Errorf("%d requests were made to BatchReadBlobs, wanted %d", fake.BatchReqs(), tc.batchReqs)
}
})
}
}
func TestDownloadActionOutputsConcurrency(t *testing.T) {
t.Parallel()
ctx := context.Background()
type testBlob struct {
digest digest.Digest
blob []byte
}
blobs := make([]*testBlob, 1000)
for i := 0; i < 1000; i++ {
blob := []byte(fmt.Sprint(i))
blobs[i] = &testBlob{
digest: digest.NewFromBlob(blob),
blob: blob,
}
}
for _, ub := range []client.UseBatchOps{false, true} {
for _, uo := range []client.UnifiedDownloads{false, true} {
ub, uo := ub, uo
t.Run(fmt.Sprintf("%sUsingBatch:%t,UnifiedDownloads:%t", t.Name(), ub, uo), func(t *testing.T) {
t.Parallel()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
client.CASConcurrency(defaultCASConcurrency).Apply(c)
client.MaxBatchDigests(300).Apply(c)
ub.Apply(c)
uo.Apply(c)
for _, b := range blobs {
fake.Put(b.blob)
}
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
var input []*testBlob
ar := &repb.ActionResult{}
// Download 15 digests in a sliding window.
for j := i * 10; j < i*10+15 && j < len(blobs); j++ {
input = append(input, blobs[j])
}
for _, i := range input {
name := fmt.Sprintf("foo_%s", i.digest)
dgPb := i.digest.ToProto()
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dgPb})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name + "_copy", Digest: dgPb})
}
execRoot := t.TempDir()
if _, err := c.DownloadActionOutputs(eCtx, ar, execRoot, filemetadata.NewSingleFlightCache()); err != nil {
return fmt.Errorf("error in DownloadActionOutputs: %s", err)
}
for _, i := range input {
name := filepath.Join(execRoot, fmt.Sprintf("foo_%s", i.digest))
for _, path := range []string{name, name + "_copy"} {
contents, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading from %s: %v", path, err)
}
if !bytes.Equal(contents, i.blob) {
return fmt.Errorf("expected %s to contain %v, got %v", path, contents, i.blob)
}
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
t.Error(err)
}
if fake.MaxConcurrency() > defaultCASConcurrency {
t.Errorf("CAS concurrency %v was higher than max %v", fake.MaxConcurrency(), defaultCASConcurrency)
}
if ub {
if uo {
// Check that we batch requests from different Actions.
if fake.BatchReqs() > 50 {
t.Errorf("%d requests were made to BatchReadBlobs, wanted <= 50", fake.BatchReqs())
}
} else {
if fake.BatchReqs() != 100 {
t.Errorf("%d requests were made to BatchReadBlobs, wanted 100", fake.BatchReqs())
}
}
}
})
}
}
}
func TestDownloadActionOutputsOneSlowRead(t *testing.T) {
t.Parallel()
ctx := context.Background()
type testBlob struct {
digest digest.Digest
blob []byte
}
blobs := make([]*testBlob, 20)
for i := 0; i < len(blobs); i++ {
blob := []byte(fmt.Sprint(i))
blobs[i] = &testBlob{
digest: digest.NewFromBlob(blob),
blob: blob,
}
}
problemBlob := make([]byte, 2000) // Will not be batched.
problemDg := digest.NewFromBlob(problemBlob)
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
wait := make(chan bool)
fake.PerDigestBlockFn[problemDg] = func() {
<-wait
}
c := e.Client.GrpcClient
client.MaxBatchSize(1000).Apply(c)
for _, b := range blobs {
fake.Put(b.blob)
}
fake.Put(problemBlob)
// Start downloading the problem digest.
pg, pCtx := errgroup.WithContext(ctx)
pg.Go(func() error {
name := fmt.Sprintf("problem_%s", problemDg)
dgPb := problemDg.ToProto()
ar := &repb.ActionResult{}
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dgPb})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name + "_copy", Digest: dgPb})
execRoot := t.TempDir()
if _, err := c.DownloadActionOutputs(pCtx, ar, execRoot, filemetadata.NewSingleFlightCache()); err != nil {
return fmt.Errorf("error in DownloadActionOutputs: %s", err)
}
for _, path := range []string{name, name + "_copy"} {
contents, err := ioutil.ReadFile(filepath.Join(execRoot, path))
if err != nil {
return fmt.Errorf("error reading from %s: %v", path, err)
}
if !bytes.Equal(contents, problemBlob) {
return fmt.Errorf("expected %s to contain %v, got %v", path, problemBlob, contents)
}
}
return nil
})
// Download a bunch of fast-downloading blobs.
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
var input []*testBlob
ar := &repb.ActionResult{}
// Download 15 digests in a sliding window.
for j := i * 10; j < i*10+15 && j < len(blobs); j++ {
input = append(input, blobs[j])
}
totalBytes := int64(0)
for _, i := range input {
name := fmt.Sprintf("foo_%s", i.digest)
dgPb := i.digest.ToProto()
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dgPb})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name + "_copy", Digest: dgPb})
// Count only once due to dedup
totalBytes += i.digest.Size
}
execRoot := t.TempDir()
stats, err := c.DownloadActionOutputs(eCtx, ar, execRoot, filemetadata.NewSingleFlightCache())
if err != nil {
return fmt.Errorf("error in DownloadActionOutputs: %s", err)
}
if stats.LogicalMoved != stats.RealMoved {
t.Errorf("c.DownloadActionOutputs: logical (%v) and real (%v) bytes moved different despite compression off", stats.LogicalMoved, stats.RealMoved)
}
if stats.LogicalMoved != totalBytes {
t.Errorf("c.DownloadActionOutputs: logical (%v) bytes moved different from sum of digests (%v) despite downloaded", stats.LogicalMoved, stats.RealMoved)
}
for _, i := range input {
name := filepath.Join(execRoot, fmt.Sprintf("foo_%s", i.digest))
for _, path := range []string{name, name + "_copy"} {
contents, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading from %s: %v", path, err)
}
if !bytes.Equal(contents, i.blob) {
return fmt.Errorf("expected %s to contain %v, got %v", path, i.blob, contents)
}
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
t.Error(err)
}
// Now let the problem digest download finish.
close(wait)
if err := pg.Wait(); err != nil {
t.Error(err)
}
}
func TestWriteAndReadProto(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
fooDigest := fake.Put([]byte("foo"))
dirA := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
},
}
d, err := c.WriteProto(ctx, dirA)
if err != nil {
t.Errorf("Failed writing proto: %s", err)
}
dirB := &repb.Directory{}
if _, err := c.ReadProto(ctx, d, dirB); err != nil {
t.Errorf("Failed reading proto: %s", err)
}
if !proto.Equal(dirA, dirB) {
t.Errorf("Protos not equal: %s / %s", dirA, dirB)
}
}
func TestDownloadFiles(t *testing.T) {
t.Parallel()
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
fooDigest := fake.Put([]byte("foo"))
barDigest := fake.Put([]byte("bar"))
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
defer os.RemoveAll(execRoot)
stats, err := c.DownloadFiles(ctx, execRoot, map[digest.Digest]*client.TreeOutput{
fooDigest: {Digest: fooDigest, Path: "foo", IsExecutable: true},
barDigest: {Digest: barDigest, Path: "bar"},
})
if err != nil {
t.Errorf("Failed to run DownloadFiles: %v", err)
}
if stats.LogicalMoved != stats.RealMoved {
t.Errorf("c.DownloadFiles: logical (%v) and real (%v) bytes moved different despite compression off", stats.LogicalMoved, stats.RealMoved)
}
if stats.LogicalMoved != fooDg.Size+barDigest.Size {
t.Errorf("c.DownloadFiles: logical (%v) bytes moved different from sum of digests (%v) despite no duplication", stats.LogicalMoved, fooDg.Size+barDigest.Size)
}
if b, err := ioutil.ReadFile(filepath.Join(execRoot, "foo")); err != nil {
t.Errorf("failed to read file: %v", err)
} else if diff := cmp.Diff(b, []byte("foo")); diff != "" {
t.Errorf("foo mismatch (-want +got):\n%s", diff)
}
if b, err := ioutil.ReadFile(filepath.Join(execRoot, "bar")); err != nil {
t.Errorf("failed to read file: %v", err)
} else if diff := cmp.Diff(b, []byte("bar")); diff != "" {
t.Errorf("foo mismatch (-want +got):\n%s", diff)
}
}
func TestDownloadFilesCancel(t *testing.T) {
t.Parallel()
for _, uo := range []client.UnifiedDownloads{false, true} {
uo := uo
t.Run(fmt.Sprintf("UnifiedDownloads:%t", uo), func(t *testing.T) {
t.Parallel()
execRoot, err := ioutil.TempDir("", strings.ReplaceAll(t.Name(), string(filepath.Separator), "_"))
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
defer os.RemoveAll(execRoot)
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fooDigest := fake.Put([]byte{1, 2, 3})
wait := make(chan bool)
fake.PerDigestBlockFn[fooDigest] = func() {
<-wait
}
c := e.Client.GrpcClient
uo.Apply(c)
eg, eCtx := errgroup.WithContext(ctx)
cCtx, cancel := context.WithCancel(eCtx)
eg.Go(func() error {
if _, err := c.DownloadFiles(cCtx, execRoot, map[digest.Digest]*client.TreeOutput{
fooDigest: {Digest: fooDigest, Path: "foo", IsExecutable: true},
}); err != context.Canceled {
return fmt.Errorf("Failed to run DownloadFiles: expected context.Canceled, got %v", err)
}
return nil
})
eg.Go(func() error {
cancel()
return nil
})
if err := eg.Wait(); err != nil {
t.Error(err)
}
if fake.BlobReads(fooDigest) != 0 {
t.Errorf("Expected no reads for foo since request is cancelled, got %v.", fake.BlobReads(fooDigest))
}
close(wait)
})
}
}