blob: a8d795c236691db6946db79288d77d4c31338361 [file] [log] [blame]
package client_test
import (
repb ""
bsgrpc ""
const (
instance = "instance"
defaultCASConcurrency = 50
reqMaxSleepDuration = 5 * time.Millisecond
func TestSplitEndpoints(t *testing.T) {
ctx := context.Background()
l1, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
port := portpicker.PickUnusedPortTB(t)
l2, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
t.Fatalf("Cannot listen: %v", err)
defer l1.Close()
defer l2.Close()
execServer := grpc.NewServer()
casServer := grpc.NewServer()
blob := []byte("foobar")
fake := &fakes.Reader{
Blob: blob,
Chunks: []int{6},
ExpectCompressed: false,
bsgrpc.RegisterByteStreamServer(casServer, fake)
go execServer.Serve(l1)
go casServer.Serve(l2)
defer casServer.Stop()
defer execServer.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: l1.Addr().String(),
CASService: l2.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false))
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
defer c.Close()
got, _, err := c.ReadBlob(ctx, digest.NewFromBlob(blob))
if err != nil {
t.Errorf("c.ReadBlob(ctx, digest) gave error %s, want nil", err)
if !bytes.Equal(blob, got) {
t.Errorf("c.ReadBlob(ctx, digest) gave diff: want %v, got %v", blob, got)
func TestReadEmptyBlobDoesNotCallServer(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
got, _, err := c.ReadBlob(ctx, digest.Empty)
if err != nil {
t.Errorf("c.ReadBlob(ctx, Empty) gave error %s, want nil", err)
if len(got) != 0 {
t.Errorf("c.ReadBlob(ctx, Empty) gave diff: want nil, got %v", got)
reads := fake.BlobReads(digest.Empty)
if reads != 0 {
t.Errorf("expected no blob reads to the fake, got %v", reads)
func TestRead(t *testing.T) {
type testCase struct {
name string
fake fakes.Reader
offset int64
limit int64
compress bool
want []byte // If nil, fake.blob is expected by default.
tests := []testCase{
name: "empty blob, 10 chunks",
fake: fakes.Reader{
Blob: []byte{},
Chunks: []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
name: "blob 'foobar', 1 chunk",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{6},
name: "blob 'foobar', 3 evenly sized chunks",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{2, 2, 2},
name: "blob 'foobar', 3 unequal chunks",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{1, 3, 2},
name: "blob 'foobar', 2 chunks with 0-sized chunk between",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 0, 3},
name: "blob 'foobarbaz', partial read spanning multiple chunks",
fake: fakes.Reader{
Blob: []byte("foobarbaz"),
Chunks: []int{3, 0, 3, 3},
offset: 2,
limit: 5,
want: []byte("obarb"),
name: "blob 'foobar', partial read within chunk",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{6},
offset: 2,
limit: 3,
want: []byte("oba"),
name: "blob 'foobar', partial read from start",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 3},
offset: 0,
limit: 5,
want: []byte("fooba"),
name: "blob 'foobar', partial read with no limit",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 3},
offset: 2,
limit: 0,
want: []byte("obar"),
name: "blob 'foobar', partial read with limit extending beyond end of blob",
fake: fakes.Reader{
Blob: []byte("foobar"),
Chunks: []int{3, 3},
offset: 2,
limit: 8,
want: []byte("obar"),
var compressionTests []testCase
for _, tc := range tests {
if tc.limit == 0 {
// Limit tests don't work well with compression, as the limit refers to the compressed bytes
// while offset, per spec, refers to uncompressed bytes.
tc.compress = true = + "_compressed"
compressionTests = append(compressionTests, tc)
tests = append(tests, compressionTests...)
for _, tc := range tests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
defer listener.Close()
server := grpc.NewServer()
bsgrpc.RegisterByteStreamServer(server, &tc.fake)
go server.Serve(listener)
defer server.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: listener.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false))
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
defer c.Close()
c.CompressedBytestreamThreshold = -1
if tc.compress {
c.CompressedBytestreamThreshold = 0
tc.fake.ExpectCompressed = tc.compress
want := tc.want
if want == nil {
want = tc.fake.Blob
if tc.offset == 0 && tc.limit > int64(len(tc.fake.Blob)) {
got, stats, err := c.ReadBlob(ctx, digest.NewFromBlob(want))
if err != nil {
t.Errorf("c.ReadBlob(ctx, digest) gave error %s, want nil", err)
if !bytes.Equal(want, got) {
t.Errorf("c.ReadBlob(ctx, digest) gave diff: want %v, got %v", want, got)
if int64(len(got)) != stats.LogicalMoved {
t.Errorf("c.ReadBlob(ctx, digest) = _, %v - logical bytes moved different than len of blob received", stats.LogicalMoved)
if tc.compress && len(tc.fake.Blob) > 0 && stats.LogicalMoved == stats.RealMoved {
t.Errorf("c.ReadBlob(ctx, digest) = %v - compression on but different real and logical bytes", stats)
got, stats, err := c.ReadBlobRange(ctx, digest.NewFromBlob(tc.fake.Blob), tc.offset, tc.limit)
if err != nil {
t.Errorf("c.ReadBlobRange(ctx, digest, %d, %d) gave error %s, want nil", tc.offset, tc.limit, err)
if !bytes.Equal(want, got) {
t.Errorf("c.ReadBlobRange(ctx, digest, %d, %d) gave diff: want %v, got %v", tc.offset, tc.limit, want, got)
if int64(len(got)) != stats.LogicalMoved {
t.Errorf("c.ReadBlob(ctx, digest) = _, %v - logical bytes moved different than len of blob received", stats.LogicalMoved)
if tc.compress && len(tc.fake.Blob) > 0 && stats.LogicalMoved == stats.RealMoved {
t.Errorf("c.ReadBlob(ctx, digest) = %v - compression on but same real and logical bytes", stats)
func TestWrite(t *testing.T) {
type testcase struct {
name string
blob []byte
cmp client.CompressedBytestreamThreshold
tests := []testcase{
name: "empty blob",
blob: []byte{},
name: "small blob",
blob: []byte("this is a pretty small blob comparatively"),
name: "5MB zero blob",
blob: make([]byte, 5*1024*1024),
var allTests []testcase
for _, tc := range tests {
for _, th := range []int{0, -1} {
t := tc += fmt.Sprintf("CompressionThreshold=%d", th)
t.cmp = client.CompressedBytestreamThreshold(th)
allTests = append(allTests, t)
for _, tc := range allTests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
defer listener.Close()
server := grpc.NewServer()
fake := &fakes.Writer{}
bsgrpc.RegisterByteStreamServer(server, fake)
go server.Serve(listener)
defer server.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: listener.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false), client.ChunkMaxSize(20)) // Use small write chunk size for tests.
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
defer c.Close()
fake.ExpectCompressed = int(tc.cmp) == 0
gotDg, err := c.WriteBlob(ctx, tc.blob)
if err != nil {
t.Errorf("c.WriteBlob(ctx, blob) gave error %s, wanted nil", err)
if fake.Err != nil {
t.Errorf("c.WriteBlob(ctx, blob) caused the server to return error %s (possibly unseen by c)", fake.Err)
if !bytes.Equal(tc.blob, fake.Buf) {
t.Errorf("c.WriteBlob(ctx, blob) had diff on blobs, want %v, got %v:", tc.blob, fake.Buf)
dg := digest.NewFromBlob(tc.blob)
if dg != gotDg {
t.Errorf("c.WriteBlob(ctx, blob) had diff on digest returned (want %s, got %s)", dg, gotDg)
func TestMissingBlobs(t *testing.T) {
tests := []struct {
name string
// present is the blobs present in the CAS.
present []string
// input is the digests given to MissingBlobs.
input []digest.Digest
// want is the returned list of digests.
want []digest.Digest
name: "none present",
present: nil,
input: []digest.Digest{
want: []digest.Digest{
name: "all present",
present: []string{"foo", "bar", "baz"},
input: []digest.Digest{
want: nil,
name: "some present",
present: []string{"foo", "bar"},
input: []digest.Digest{
want: []digest.Digest{
for _, tc := range tests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
for _, s := range tc.present {
t.Logf("CAS contains digests of %s", tc.present)
got, err := c.MissingBlobs(ctx, tc.input)
if err != nil {
t.Errorf("c.MissingBlobs(ctx, %v) gave error %s, expected nil", tc.input, err)
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("c.MissingBlobs(ctx, %v) gave diff (want -> got):\n%s", tc.input, diff)
func TestUploadConcurrent(t *testing.T) {
blobs := make([][]byte, 50)
for i := range blobs {
blobs[i] = []byte(fmt.Sprint(i))
type testCase struct {
name string
// Whether to use batching.
batching client.UseBatchOps
// Whether to use background CAS ops.
unified client.UnifiedUploads
// The batch size.
maxBatchDigests client.MaxBatchDigests
// The CAS concurrency for uploading the blobs.
concurrency client.CASConcurrency
var tests []testCase
for _, ub := range []client.UseBatchOps{false, true} {
for _, cb := range []client.UnifiedUploads{false, true} {
for _, conc := range []client.CASConcurrency{3, 100} {
tc := testCase{
name: fmt.Sprintf("batch:%t,unified:%t,conc:%d", ub, cb, conc),
batching: ub,
unified: cb,
maxBatchDigests: client.MaxBatchDigests(9),
concurrency: conc,
tests = append(tests, tc)
for _, tc := range tests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
for _, opt := range []client.Opt{tc.batching, tc.maxBatchDigests, tc.concurrency, tc.unified} {
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 100; i++ {
eg.Go(func() error {
var input []*uploadinfo.Entry
for _, blob := range append(blobs, blobs...) {
input = append(input, uploadinfo.EntryFromBlob(blob))
if _, _, err := c.UploadIfMissing(eCtx, input...); err != nil {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
return nil
if err := eg.Wait(); err != nil {
// Verify everything was written exactly once.
for i, blob := range blobs {
dg := digest.NewFromBlob(blob)
if tc.unified {
if fake.BlobWrites(dg) != 1 {
t.Errorf("wanted 1 write for blob %v: %v, got %v", i, dg, fake.BlobWrites(dg))
if fake.BlobMissingReqs(dg) != 1 {
t.Errorf("wanted 1 missing request for blob %v: %v, got %v", i, dg, fake.BlobMissingReqs(dg))
func TestUploadConcurrentBatch(t *testing.T) {
blobs := make([][]byte, 100)
for i := range blobs {
blobs[i] = []byte(fmt.Sprint(i))
ctx := context.Background()
for _, uo := range []client.UnifiedUploads{false, true} {
uo := uo
t.Run(fmt.Sprintf("unified:%t", uo), func(t *testing.T) {
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
c.MaxBatchDigests = 50
client.UnifiedUploadTickDuration(500 * time.Millisecond).Apply(c)
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 10; i++ {
i := i
eg.Go(func() error {
var input []*uploadinfo.Entry
// Upload 15 digests in a sliding window.
for j := i * 10; j < i*10+15 && j < len(blobs); j++ {
input = append(input, uploadinfo.EntryFromBlob(blobs[j]))
// Twice to have the same upload in same call, in addition to between calls.
input = append(input, uploadinfo.EntryFromBlob(blobs[j]))
if _, _, err := c.UploadIfMissing(eCtx, input...); err != nil {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
return nil
if err := eg.Wait(); err != nil {
// Verify everything was written exactly once.
for i, blob := range blobs {
dg := digest.NewFromBlob(blob)
if c.UnifiedUploads {
if fake.BlobWrites(dg) != 1 {
t.Errorf("wanted 1 write for blob %v: %v, got %v", i, dg, fake.BlobWrites(dg))
if fake.BlobMissingReqs(dg) != 1 {
t.Errorf("wanted 1 missing requests for blob %v: %v, got %v", i, dg, fake.BlobMissingReqs(dg))
expectedReqs := 10
if c.UnifiedUploads {
// All the 100 digests will be batched into two batches, together.
expectedReqs = 2
if fake.BatchReqs() != expectedReqs {
t.Errorf("%d requests were made to BatchUpdateBlobs, wanted %v", fake.BatchReqs(), expectedReqs)
func TestUploadCancel(t *testing.T) {
ctx := context.Background()
blob := []byte{1, 2, 3}
dg := digest.NewFromBlob(blob)
for _, uo := range []client.UnifiedUploads{false, true} {
uo := uo
t.Run(fmt.Sprintf("unified:%t", uo), func(t *testing.T) {
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
wait := make(chan bool)
fake.PerDigestBlockFn[dg] = func() {
c := e.Client.GrpcClient
cCtx, cancel := context.WithCancel(ctx)
eg, _ := errgroup.WithContext(cCtx)
ue := uploadinfo.EntryFromBlob(blob)
eg.Go(func() error {
if _, _, err := c.UploadIfMissing(cCtx, ue); !errors.Is(err, context.Canceled) {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected to wrap context.Canceled", err)
return nil
eg.Go(func() error {
time.Sleep(60 * time.Millisecond) // Enough time to trigger upload cycle.
time.Sleep(10 * time.Millisecond)
return nil
if err := eg.Wait(); err != nil {
// Verify that nothing was written.
if fake.BlobWrites(ue.Digest) != 0 {
t.Errorf("Blob was written, expected cancellation.")
func TestUploadConcurrentCancel(t *testing.T) {
blobs := make([][]byte, 50)
for i := range blobs {
blobs[i] = []byte(fmt.Sprint(i))
var input []*uploadinfo.Entry
for _, blob := range blobs {
input = append(input, uploadinfo.EntryFromBlob(blob))
input = append(input, input...)
type testCase struct {
name string
// Whether to use batching.
batching client.UseBatchOps
// Whether to use background CAS ops.
unified client.UnifiedUploads
// The batch size.
maxBatchDigests client.MaxBatchDigests
// The CAS concurrency for uploading the blobs.
concurrency client.CASConcurrency
var tests []testCase
for _, ub := range []client.UseBatchOps{false, true} {
for _, uo := range []client.UnifiedUploads{false, true} {
for _, conc := range []client.CASConcurrency{3, 20} {
tc := testCase{
name: fmt.Sprintf("batch:%t,unified:%t,conc:%d", ub, uo, conc),
batching: ub,
unified: uo,
maxBatchDigests: client.MaxBatchDigests(9),
concurrency: conc,
tests = append(tests, tc)
for _, tc := range tests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
for _, opt := range []client.Opt{tc.batching, tc.maxBatchDigests, tc.concurrency, tc.unified} {
eg, eCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
if _, _, err := c.UploadIfMissing(eCtx, input...); err != nil {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
return nil
cCtx, cancel := context.WithCancel(eCtx)
for i := 0; i < 50; i++ {
eg.Go(func() error {
// Verify that we got a context cancellation error. Sometimes, the request can succeed, if the original thread takes a while to run.
if _, _, err := c.UploadIfMissing(cCtx, input...); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("c.UploadIfMissing(ctx, input) gave error %+v!, expected context canceled", err)
return nil
eg.Go(func() error {
time.Sleep(time.Duration(20*rand.Float32()) * time.Microsecond)
return nil
if err := eg.Wait(); err != nil {
if tc.unified {
// Verify everything was written exactly once, despite the context being canceled.
for i, blob := range blobs {
dg := digest.NewFromBlob(blob)
if fake.BlobWrites(dg) != 1 {
t.Errorf("wanted 1 write for blob %v: %v, got %v", i, dg, fake.BlobWrites(dg))
// It is possible to get more than 1 GetMissingBlobs requests if all concurrent requests for a particular
// digest get cancelled, because then this upload gets actually cancelled and deleted.
// This will happen if e.g. the original (non-canceled) upload thread is the last to run.
if fake.BlobMissingReqs(dg) > 2 {
t.Errorf("wanted <=2 missing requests for blob %v: %v, got %v", i, dg, fake.BlobMissingReqs(dg))
func TestUpload(t *testing.T) {
var twoThousandBlobs [][]byte
var thousandBlobs [][]byte
for i := 0; i < 2000; i++ {
var buf = new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
// Write a few extra bytes so that we have > chunkSize sized blobs.
for j := 0; j < 10; j++ {
binary.Write(buf, binary.LittleEndian, 0)
twoThousandBlobs = append(twoThousandBlobs, buf.Bytes())
if i%2 == 0 {
thousandBlobs = append(thousandBlobs, buf.Bytes())
type testcase struct {
name string
// input is the blobs to try to store; they're converted to a file map by the test
input [][]byte
// present is the blobs already present in the CAS; they're pre-loaded into the fakes.CAS object
// and the test verifies no attempt was made to upload them.
present [][]byte
opts []client.Opt
tests := []testcase{
name: "No blobs",
input: nil,
present: nil,
name: "None present",
input: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
present: nil,
name: "All present",
input: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
present: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
name: "Some present",
input: [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")},
present: [][]byte{[]byte("bar")},
name: "2000 blobs heavy concurrency",
input: twoThousandBlobs,
present: thousandBlobs,
opts: []client.Opt{client.CASConcurrency(500)},
var allTests []testcase
for _, tc := range tests {
for _, ub := range []client.UseBatchOps{false, true} {
for _, uo := range []client.UnifiedUploads{false, true} {
for _, cmp := range []client.CompressedBytestreamThreshold{-1, 0} {
t := tc = fmt.Sprintf("%s_UsingBatch:%t,UnifiedUploads:%t,CompressionThresh:%d",, ub, uo, cmp)
t.opts = append(t.opts, []client.Opt{ub, uo, cmp}...)
allTests = append(allTests, t)
for _, tc := range allTests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
for _, o := range tc.opts {
present := make(map[digest.Digest]bool)
for _, blob := range tc.present {
present[digest.NewFromBlob(blob)] = true
var input []*uploadinfo.Entry
for _, blob := range tc.input {
input = append(input, uploadinfo.EntryFromBlob(blob))
missing, bMoved, err := c.UploadIfMissing(ctx, input...)
if err != nil {
t.Errorf("c.UploadIfMissing(ctx, input) gave error %v, expected nil", err)
missingSet := make(map[digest.Digest]struct{})
totalBytes := int64(0)
for _, dg := range missing {
missingSet[dg] = struct{}{}
totalBytes += dg.Size
// It's much harder to check the case where compression is on as we also have to ignore batch ops,
// so we just don't.
if int(c.CompressedBytestreamThreshold) < 0 && bMoved != totalBytes {
t.Errorf("c.UploadIfMissing(ctx, input) = %v, expected %v (reported different bytes moved and digest size despite no compression)", bMoved, totalBytes)
for i, ue := range input {
dg := ue.Digest
blob := tc.input[i]
if present[dg] {
if fake.BlobWrites(dg) > 0 {
t.Errorf("blob %v with digest %s was uploaded even though it was already present in the CAS", blob, dg)
if _, ok := missingSet[dg]; ok {
t.Errorf("Stats said that blob %v with digest %s was missing in the CAS", blob, dg)
if gotBlob, ok := fake.Get(dg); !ok {
t.Errorf("blob %v with digest %s was not uploaded, expected it to be present in the CAS", blob, dg)
} else if !bytes.Equal(blob, gotBlob) {
t.Errorf("blob digest %s had diff on uploaded blob: want %v, got %v", dg, blob, gotBlob)
if _, ok := missingSet[dg]; !ok {
t.Errorf("Stats said that blob %v with digest %s was present in the CAS", blob, dg)
if fake.MaxConcurrency() > defaultCASConcurrency {
t.Errorf("CAS concurrency %v was higher than max %v", fake.MaxConcurrency(), defaultCASConcurrency)
func TestWriteBlobsBatching(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
sizes []int
batchReqs int
writeReqs int
name: "single small blob",
sizes: []int{1},
batchReqs: 0,
writeReqs: 1,
name: "large and small blobs hitting max exactly",
sizes: []int{338, 338, 338, 1, 1, 1},
batchReqs: 3,
writeReqs: 0,
name: "small batches of big blobs",
sizes: []int{88, 88, 88, 88, 88, 88, 88},
batchReqs: 2,
writeReqs: 1,
name: "batch with blob that's too big",
sizes: []int{400, 88, 88, 88},
batchReqs: 1,
writeReqs: 1,
name: "many small blobs hitting max digests",
sizes: []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
batchReqs: 4,
writeReqs: 0,
for _, tc := range tests {
tc := tc
t.Run(, func(t *testing.T) {
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
c.MaxBatchSize = 500
c.MaxBatchDigests = 4
// Each batch request frame overhead is 13 bytes.
// A per-blob overhead is 74 bytes.
blobs := make(map[digest.Digest][]byte)
for i, sz := range tc.sizes {
blob := make([]byte, int(sz))
blob[0] = byte(i) // Ensure blobs are distinct
blobs[digest.NewFromBlob(blob)] = blob
err := c.WriteBlobs(ctx, blobs)
if err != nil {
t.Fatalf("c.WriteBlobs(ctx, inputs) gave error %s, expected nil", err)
for d, blob := range blobs {
if gotBlob, ok := fake.Get(d); !ok {
t.Errorf("blob with digest %s was not uploaded, expected it to be present in the CAS", d)
} else if !bytes.Equal(blob, gotBlob) {
t.Errorf("blob with digest %s had diff on uploaded blob: wanted %v, got %v", d, blob, gotBlob)
if fake.BatchReqs() != tc.batchReqs {
t.Errorf("%d requests were made to BatchUpdateBlobs, wanted %d", fake.BatchReqs(), tc.batchReqs)
if fake.WriteReqs() != tc.writeReqs {
t.Errorf("%d requests were made to Write, wanted %d", fake.WriteReqs(), tc.writeReqs)
func TestFlattenActionOutputs(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
fooDigest := digest.TestNew("1001", 1)
barDigest := digest.TestNew("1002", 2)
dirB := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
bDigest := digest.TestNewFromMessage(dirB)
dirA := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "b", Digest: bDigest.ToProto()},
Files: []*repb.FileNode{
{Name: "bar", Digest: barDigest.ToProto()},
aDigest := digest.TestNewFromMessage(dirA)
root := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "a", Digest: aDigest.ToProto()},
{Name: "b", Digest: bDigest.ToProto()},
tr := &repb.Tree{
Root: root,
Children: []*repb.Directory{dirA, dirB},
treeBlob, err := proto.Marshal(tr)
if err != nil {
t.Errorf("failed marshalling Tree: %s", err)
treeA := &repb.Tree{
Root: dirA,
Children: []*repb.Directory{dirB},
treeABlob, err := proto.Marshal(treeA)
if err != nil {
t.Errorf("failed marshalling Tree: %s", err)
treeDigest := fake.Put(treeBlob)
treeADigest := fake.Put(treeABlob)
ar := &repb.ActionResult{
OutputFiles: []*repb.OutputFile{
&repb.OutputFile{Path: "foo", Digest: fooDigest.ToProto()}},
OutputFileSymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/bar", Target: "../dir/a/bar"}},
OutputDirectorySymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/a", Target: "../dir/a"}},
OutputDirectories: []*repb.OutputDirectory{
&repb.OutputDirectory{Path: "dir", TreeDigest: treeDigest.ToProto()},
&repb.OutputDirectory{Path: "dir2", TreeDigest: treeADigest.ToProto()},
outputs, err := c.FlattenActionOutputs(ctx, ar)
if err != nil {
t.Errorf("error in FlattenActionOutputs: %s", err)
wantOutputs := map[string]*client.TreeOutput{
"dir/a/b/foo": &client.TreeOutput{Digest: fooDigest, IsExecutable: true},
"dir/a/bar": &client.TreeOutput{Digest: barDigest},
"dir/b/foo": &client.TreeOutput{Digest: fooDigest, IsExecutable: true},
"dir2/b/foo": &client.TreeOutput{Digest: fooDigest, IsExecutable: true},
"dir2/bar": &client.TreeOutput{Digest: barDigest},
"foo": &client.TreeOutput{Digest: fooDigest},
"x/a": &client.TreeOutput{SymlinkTarget: "../dir/a"},
"x/bar": &client.TreeOutput{SymlinkTarget: "../dir/a/bar"},
if len(outputs) != len(wantOutputs) {
t.Errorf("FlattenActionOutputs gave wrong number of outputs: want %d, got %d", len(wantOutputs), len(outputs))
for path, wantOut := range wantOutputs {
got, ok := outputs[path]
if !ok {
t.Errorf("expected output %s is missing", path)
if got.Path != path {
t.Errorf("FlattenActionOutputs keyed %s output with %s path", got.Path, path)
if wantOut.Digest != got.Digest {
t.Errorf("FlattenActionOutputs gave digest diff on %s: want %v, got: %v", path, wantOut.Digest, got.Digest)
if wantOut.IsExecutable != got.IsExecutable {
t.Errorf("FlattenActionOutputs gave IsExecutable diff on %s: want %v, got: %v", path, wantOut.IsExecutable, got.IsExecutable)
if wantOut.SymlinkTarget != got.SymlinkTarget {
t.Errorf("FlattenActionOutputs gave symlink target diff on %s: want %s, got: %s", path, wantOut.SymlinkTarget, got.SymlinkTarget)
func TestDownloadActionOutputs(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
cache := filemetadata.NewSingleFlightCache()
fooDigest := fake.Put([]byte("foo"))
barDigest := fake.Put([]byte("bar"))
dirB := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
bDigest := digest.TestNewFromMessage(dirB)
dirA := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "b", Digest: bDigest.ToProto()},
{Name: "e2", Digest: digest.Empty.ToProto()},
Files: []*repb.FileNode{
{Name: "bar", Digest: barDigest.ToProto()},
aDigest := digest.TestNewFromMessage(dirA)
root := &repb.Directory{
Directories: []*repb.DirectoryNode{
{Name: "a", Digest: aDigest.ToProto()},
{Name: "b", Digest: bDigest.ToProto()},
{Name: "e1", Digest: digest.Empty.ToProto()},
tree := &repb.Tree{
Root: root,
Children: []*repb.Directory{dirA, dirB, &repb.Directory{}},
treeBlob, err := proto.Marshal(tree)
if err != nil {
t.Fatalf("failed marshalling Tree: %s", err)
treeA := &repb.Tree{
Root: dirA,
Children: []*repb.Directory{dirB, &repb.Directory{}},
treeABlob, err := proto.Marshal(treeA)
if err != nil {
t.Fatalf("failed marshalling Tree: %s", err)
treeDigest := fake.Put(treeBlob)
treeADigest := fake.Put(treeABlob)
ar := &repb.ActionResult{
OutputFiles: []*repb.OutputFile{
&repb.OutputFile{Path: "foo", Digest: fooDigest.ToProto()}},
OutputFileSymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/bar", Target: "../dir/a/bar"}},
OutputDirectorySymlinks: []*repb.OutputSymlink{
&repb.OutputSymlink{Path: "x/a", Target: "../dir/a"}},
OutputDirectories: []*repb.OutputDirectory{
&repb.OutputDirectory{Path: "dir", TreeDigest: treeDigest.ToProto()},
&repb.OutputDirectory{Path: "dir2", TreeDigest: treeADigest.ToProto()},
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
defer os.RemoveAll(execRoot)
_, err = c.DownloadActionOutputs(ctx, ar, execRoot, cache)
if err != nil {
t.Errorf("error in DownloadActionOutputs: %s", err)
wantOutputs := []struct {
path string
isExecutable bool
contents []byte
symlinkTarget string
isEmptyDirectory bool
fileDigest *digest.Digest
path: "dir/e1",
isEmptyDirectory: true,
path: "dir/a/e2",
isEmptyDirectory: true,
path: "dir/a/b/foo",
isExecutable: true,
contents: []byte("foo"),
fileDigest: &fooDigest,
path: "dir/a/bar",
contents: []byte("bar"),
path: "dir/b/foo",
isExecutable: true,
contents: []byte("foo"),
fileDigest: &fooDigest,
path: "dir2/e2",
isEmptyDirectory: true,
path: "dir2/b/foo",
isExecutable: true,
contents: []byte("foo"),
fileDigest: &fooDigest,
path: "dir2/bar",
contents: []byte("bar"),
path: "foo",
contents: []byte("foo"),
fileDigest: &fooDigest,
path: "x/a",
symlinkTarget: "../dir/a",
path: "x/bar",
symlinkTarget: "../dir/a/bar",
for _, out := range wantOutputs {
path := filepath.Join(execRoot, out.path)
fi, err := os.Lstat(path)
if err != nil {
t.Errorf("expected output %s is missing", path)
if out.fileDigest != nil {
fmd := cache.Get(path)
if fmd == nil {
t.Errorf("cache does not contain metadata for path: %v", path)
} else {
if diff := cmp.Diff(*out.fileDigest, fmd.Digest); diff != "" {
t.Errorf("invalid digeset in cache for path %v, (-want +got): %v", path, diff)
if out.symlinkTarget != "" {
if fi.Mode()&os.ModeSymlink == 0 {
t.Errorf("expected %s to be a symlink, got %v", path, fi.Mode())
target, e := os.Readlink(path)
if e != nil {
t.Errorf("expected %s to be a symlink, got error reading symlink: %v", path, err)
if target != out.symlinkTarget {
t.Errorf("expected %s to be a symlink to %s, got %s", path, out.symlinkTarget, target)
} else if out.isEmptyDirectory {
if !fi.Mode().IsDir() {
t.Errorf("expected %s to be a directory, got %s", path, fi.Mode())
files, err := ioutil.ReadDir(path)
if err != nil {
t.Errorf("expected %s to be a directory, got error reading directory: %v", path, err)
if len(files) != 0 {
t.Errorf("expected %s to be an empty directory, got contents: %v", path, files)
} else {
contents, err := ioutil.ReadFile(path)
if err != nil {
t.Errorf("error reading from %s: %v", path, err)
if !bytes.Equal(contents, out.contents) {
t.Errorf("expected %s to contain %v, got %v", path, out.contents, contents)
// TODO(olaola): verify the file is executable, if required.
// Doing this naively failed go test in CI.
func TestDownloadDirectory(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
cache := filemetadata.NewSingleFlightCache()
fooDigest := fake.Put([]byte("foo"))
dir := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
Directories: []*repb.DirectoryNode{
{Name: "empty", Digest: digest.Empty.ToProto()},
dirBlob, err := proto.Marshal(dir)
if err != nil {
t.Fatalf("failed marshalling Tree: %s", err)
d := digest.TestNewFromMessage(dir)
execRoot := t.TempDir()
outputs, _, err := c.DownloadDirectory(ctx, d, execRoot, cache)
if err != nil {
t.Errorf("error in DownloadActionOutputs: %s", err)
if diff := cmp.Diff(outputs, map[string]*client.TreeOutput{
"empty": {
Digest: digest.Empty,
Path: "empty",
IsEmptyDirectory: true,
"foo": {
Digest: fooDigest,
Path: "foo",
IsExecutable: true,
}}); diff != "" {
t.Fatalf("DownloadDirectory() mismatch (-want +got):\n%s", diff)
b, err := ioutil.ReadFile(filepath.Join(execRoot, "foo"))
if err != nil {
t.Fatalf("failed to read foo: %s", err)
if want, got := []byte("foo"), b; !bytes.Equal(want, got) {
t.Errorf("want %s, got %s", want, got)
func TestDownloadActionOutputsErrors(t *testing.T) {
ar := &repb.ActionResult{}
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: "foo", Digest: digest.NewFromBlob([]byte("foo")).ToProto()})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: "bar", Digest: digest.NewFromBlob([]byte("bar")).ToProto()})
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
defer os.RemoveAll(execRoot)
for _, ub := range []client.UseBatchOps{false, true} {
ub := ub
t.Run(fmt.Sprintf("%sUsingBatch:%t", t.Name(), ub), func(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
c := e.Client.GrpcClient
_, err := c.DownloadActionOutputs(ctx, ar, execRoot, filemetadata.NewSingleFlightCache())
if status.Code(err) != codes.NotFound && !strings.Contains(err.Error(), "not found") {
t.Errorf("expected 'not found' error in DownloadActionOutputs, got: %v", err)
func TestDownloadActionOutputsBatching(t *testing.T) {
tests := []struct {
name string
sizes []int
locality bool
batchReqs int
name: "single small blob",
sizes: []int{1},
batchReqs: 0,
name: "large and small blobs hitting max exactly",
sizes: []int{338, 338, 338, 1, 1, 1},
batchReqs: 3,
name: "small batches of big blobs",
sizes: []int{88, 88, 88, 88, 88, 88, 88},
batchReqs: 2,
name: "batch with blob that's too big",
sizes: []int{400, 88, 88, 88},
batchReqs: 1,
name: "many small blobs hitting max digests",
sizes: []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
batchReqs: 4,
name: "single small blob locality",
sizes: []int{1},
locality: true,
batchReqs: 0,
name: "large and small blobs hitting max exactly locality",
sizes: []int{338, 338, 338, 1, 1, 1},
locality: true,
batchReqs: 2,
name: "small batches of big blobs locality",
sizes: []int{88, 88, 88, 88, 88, 88, 88},
locality: true,
batchReqs: 2,
name: "batch with blob that's too big locality",
sizes: []int{400, 88, 88, 88},
locality: true,
batchReqs: 1,
name: "many small blobs hitting max digests locality",
sizes: []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
locality: true,
batchReqs: 4,
for _, tc := range tests {
tc := tc
t.Run(, func(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
c.MaxBatchSize = 500
c.MaxBatchDigests = 4
// Each batch request frame overhead is 13 bytes.
// A per-blob overhead is 74 bytes.
c.UtilizeLocality = client.UtilizeLocality(tc.locality)
var dgs []digest.Digest
blobs := make(map[digest.Digest][]byte)
ar := &repb.ActionResult{}
for i, sz := range tc.sizes {
blob := make([]byte, int(sz))
if sz > 0 {
blob[0] = byte(i) // Ensure blobs are distinct
dg := digest.NewFromBlob(blob)
blobs[dg] = blob
dgs = append(dgs, dg)
if sz > 0 {
// Don't seed fake with empty blob, because it should not be called.
name := fmt.Sprintf("foo_%s", dg)
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dg.ToProto()})
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
defer os.RemoveAll(execRoot)
_, err = c.DownloadActionOutputs(ctx, ar, execRoot, filemetadata.NewSingleFlightCache())
if err != nil {
t.Errorf("error in DownloadActionOutputs: %s", err)
for dg, data := range blobs {
path := filepath.Join(execRoot, fmt.Sprintf("foo_%s", dg))
contents, err := ioutil.ReadFile(path)
if err != nil {
t.Errorf("error reading from %s: %v", path, err)
if !bytes.Equal(contents, data) {
t.Errorf("expected %s to contain %v, got %v", path, contents, data)
if fake.BatchReqs() != tc.batchReqs {
t.Errorf("%d requests were made to BatchReadBlobs, wanted %d", fake.BatchReqs(), tc.batchReqs)
func TestDownloadActionOutputsConcurrency(t *testing.T) {
ctx := context.Background()
type testBlob struct {
digest digest.Digest
blob []byte
blobs := make([]*testBlob, 1000)
for i := 0; i < 1000; i++ {
blob := []byte(fmt.Sprint(i))
blobs[i] = &testBlob{
digest: digest.NewFromBlob(blob),
blob: blob,
for _, ub := range []client.UseBatchOps{false, true} {
for _, uo := range []client.UnifiedDownloads{false, true} {
ub, uo := ub, uo
t.Run(fmt.Sprintf("%sUsingBatch:%t,UnifiedDownloads:%t", t.Name(), ub, uo), func(t *testing.T) {
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
c := e.Client.GrpcClient
for _, b := range blobs {
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
var input []*testBlob
ar := &repb.ActionResult{}
// Download 15 digests in a sliding window.
for j := i * 10; j < i*10+15 && j < len(blobs); j++ {
input = append(input, blobs[j])
for _, i := range input {
name := fmt.Sprintf("foo_%s", i.digest)
dgPb := i.digest.ToProto()
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dgPb})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name + "_copy", Digest: dgPb})
execRoot := t.TempDir()
if _, err := c.DownloadActionOutputs(eCtx, ar, execRoot, filemetadata.NewSingleFlightCache()); err != nil {
return fmt.Errorf("error in DownloadActionOutputs: %s", err)
for _, i := range input {
name := filepath.Join(execRoot, fmt.Sprintf("foo_%s", i.digest))
for _, path := range []string{name, name + "_copy"} {
contents, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading from %s: %v", path, err)
if !bytes.Equal(contents, i.blob) {
return fmt.Errorf("expected %s to contain %v, got %v", path, contents, i.blob)
return nil
if err := eg.Wait(); err != nil {
if fake.MaxConcurrency() > defaultCASConcurrency {
t.Errorf("CAS concurrency %v was higher than max %v", fake.MaxConcurrency(), defaultCASConcurrency)
if ub {
if uo {
// Check that we batch requests from different Actions.
if fake.BatchReqs() > 50 {
t.Errorf("%d requests were made to BatchReadBlobs, wanted <= 50", fake.BatchReqs())
} else {
if fake.BatchReqs() != 100 {
t.Errorf("%d requests were made to BatchReadBlobs, wanted 100", fake.BatchReqs())
func TestDownloadActionOutputsOneSlowRead(t *testing.T) {
ctx := context.Background()
type testBlob struct {
digest digest.Digest
blob []byte
blobs := make([]*testBlob, 20)
for i := 0; i < len(blobs); i++ {
blob := []byte(fmt.Sprint(i))
blobs[i] = &testBlob{
digest: digest.NewFromBlob(blob),
blob: blob,
problemBlob := make([]byte, 2000) // Will not be batched.
problemDg := digest.NewFromBlob(problemBlob)
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fake.ReqSleepDuration = reqMaxSleepDuration
fake.ReqSleepRandomize = true
wait := make(chan bool)
fake.PerDigestBlockFn[problemDg] = func() {
c := e.Client.GrpcClient
for _, b := range blobs {
// Start downloading the problem digest.
pg, pCtx := errgroup.WithContext(ctx)
pg.Go(func() error {
name := fmt.Sprintf("problem_%s", problemDg)
dgPb := problemDg.ToProto()
ar := &repb.ActionResult{}
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dgPb})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name + "_copy", Digest: dgPb})
execRoot := t.TempDir()
if _, err := c.DownloadActionOutputs(pCtx, ar, execRoot, filemetadata.NewSingleFlightCache()); err != nil {
return fmt.Errorf("error in DownloadActionOutputs: %s", err)
for _, path := range []string{name, name + "_copy"} {
contents, err := ioutil.ReadFile(filepath.Join(execRoot, path))
if err != nil {
return fmt.Errorf("error reading from %s: %v", path, err)
if !bytes.Equal(contents, problemBlob) {
return fmt.Errorf("expected %s to contain %v, got %v", path, problemBlob, contents)
return nil
// Download a bunch of fast-downloading blobs.
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
var input []*testBlob
ar := &repb.ActionResult{}
// Download 15 digests in a sliding window.
for j := i * 10; j < i*10+15 && j < len(blobs); j++ {
input = append(input, blobs[j])
totalBytes := int64(0)
for _, i := range input {
name := fmt.Sprintf("foo_%s", i.digest)
dgPb := i.digest.ToProto()
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name, Digest: dgPb})
ar.OutputFiles = append(ar.OutputFiles, &repb.OutputFile{Path: name + "_copy", Digest: dgPb})
// Count only once due to dedup
totalBytes += i.digest.Size
execRoot := t.TempDir()
stats, err := c.DownloadActionOutputs(eCtx, ar, execRoot, filemetadata.NewSingleFlightCache())
if err != nil {
return fmt.Errorf("error in DownloadActionOutputs: %s", err)
if stats.LogicalMoved != stats.RealMoved {
t.Errorf("c.DownloadActionOutputs: logical (%v) and real (%v) bytes moved different despite compression off", stats.LogicalMoved, stats.RealMoved)
if stats.LogicalMoved != totalBytes {
t.Errorf("c.DownloadActionOutputs: logical (%v) bytes moved different from sum of digests (%v) despite downloaded", stats.LogicalMoved, stats.RealMoved)
for _, i := range input {
name := filepath.Join(execRoot, fmt.Sprintf("foo_%s", i.digest))
for _, path := range []string{name, name + "_copy"} {
contents, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading from %s: %v", path, err)
if !bytes.Equal(contents, i.blob) {
return fmt.Errorf("expected %s to contain %v, got %v", path, i.blob, contents)
return nil
if err := eg.Wait(); err != nil {
// Now let the problem digest download finish.
if err := pg.Wait(); err != nil {
func TestWriteAndReadProto(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
fooDigest := fake.Put([]byte("foo"))
dirA := &repb.Directory{
Files: []*repb.FileNode{
{Name: "foo", Digest: fooDigest.ToProto(), IsExecutable: true},
d, err := c.WriteProto(ctx, dirA)
if err != nil {
t.Errorf("Failed writing proto: %s", err)
dirB := &repb.Directory{}
if _, err := c.ReadProto(ctx, d, dirB); err != nil {
t.Errorf("Failed reading proto: %s", err)
if !proto.Equal(dirA, dirB) {
t.Errorf("Protos not equal: %s / %s", dirA, dirB)
func TestDownloadFiles(t *testing.T) {
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
c := e.Client.GrpcClient
fooDigest := fake.Put([]byte("foo"))
barDigest := fake.Put([]byte("bar"))
execRoot, err := ioutil.TempDir("", "DownloadOuts")
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
defer os.RemoveAll(execRoot)
stats, err := c.DownloadFiles(ctx, execRoot, map[digest.Digest]*client.TreeOutput{
fooDigest: {Digest: fooDigest, Path: "foo", IsExecutable: true},
barDigest: {Digest: barDigest, Path: "bar"},
if err != nil {
t.Errorf("Failed to run DownloadFiles: %v", err)
if stats.LogicalMoved != stats.RealMoved {
t.Errorf("c.DownloadFiles: logical (%v) and real (%v) bytes moved different despite compression off", stats.LogicalMoved, stats.RealMoved)
if stats.LogicalMoved != fooDg.Size+barDigest.Size {
t.Errorf("c.DownloadFiles: logical (%v) bytes moved different from sum of digests (%v) despite no duplication", stats.LogicalMoved, fooDg.Size+barDigest.Size)
if b, err := ioutil.ReadFile(filepath.Join(execRoot, "foo")); err != nil {
t.Errorf("failed to read file: %v", err)
} else if diff := cmp.Diff(b, []byte("foo")); diff != "" {
t.Errorf("foo mismatch (-want +got):\n%s", diff)
if b, err := ioutil.ReadFile(filepath.Join(execRoot, "bar")); err != nil {
t.Errorf("failed to read file: %v", err)
} else if diff := cmp.Diff(b, []byte("bar")); diff != "" {
t.Errorf("foo mismatch (-want +got):\n%s", diff)
func TestDownloadFilesCancel(t *testing.T) {
for _, uo := range []client.UnifiedDownloads{false, true} {
uo := uo
t.Run(fmt.Sprintf("UnifiedDownloads:%t", uo), func(t *testing.T) {
execRoot, err := ioutil.TempDir("", strings.ReplaceAll(t.Name(), string(filepath.Separator), "_"))
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
defer os.RemoveAll(execRoot)
ctx := context.Background()
e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
fake := e.Server.CAS
fooDigest := fake.Put([]byte{1, 2, 3})
wait := make(chan bool)
fake.PerDigestBlockFn[fooDigest] = func() {
c := e.Client.GrpcClient
eg, eCtx := errgroup.WithContext(ctx)
cCtx, cancel := context.WithCancel(eCtx)
eg.Go(func() error {
if _, err := c.DownloadFiles(cCtx, execRoot, map[digest.Digest]*client.TreeOutput{
fooDigest: {Digest: fooDigest, Path: "foo", IsExecutable: true},
}); err != context.Canceled {
return fmt.Errorf("Failed to run DownloadFiles: expected context.Canceled, got %v", err)
return nil
eg.Go(func() error {
return nil
if err := eg.Wait(); err != nil {
if fake.BlobReads(fooDigest) != 0 {
t.Errorf("Expected no reads for foo since request is cancelled, got %v.", fake.BlobReads(fooDigest))