blob: 7532460898b91d04457dafe3c43e3c6f93e0f25e [file] [log] [blame]
package client
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/contextmd"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
"github.com/klauspost/compress/zstd"
syncpool "github.com/mostynb/zstdpool-syncpool"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// DownloadFiles downloads the output files under |outDir|.
// It returns the number of logical and real bytes downloaded, which may be different from sum
// of sizes of the files due to dedupping and compression.
func (c *Client) DownloadFiles(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error) {
stats := &MovedBytesMetadata{}
if !c.UnifiedDownloads {
return c.downloadNonUnified(ctx, outDir, outputs)
}
count := len(outputs)
if count == 0 {
return stats, nil
}
meta, err := contextmd.ExtractMetadata(ctx)
if err != nil {
return stats, err
}
wait := make(chan *downloadResponse, count)
for dg, out := range outputs {
r := &downloadRequest{
digest: dg,
context: ctx,
outDir: outDir,
output: out,
meta: meta,
wait: wait,
}
select {
case <-ctx.Done():
contextmd.Infof(ctx, log.Level(2), "Download canceled")
return stats, ctx.Err()
case c.casDownloadRequests <- r:
continue
}
}
// Wait for all downloads to finish.
for count > 0 {
select {
case <-ctx.Done():
contextmd.Infof(ctx, log.Level(2), "Download canceled")
return stats, ctx.Err()
case resp := <-wait:
if resp.err != nil {
return stats, resp.err
}
stats.addFrom(resp.stats)
count--
}
}
return stats, nil
}
// DownloadOutputs downloads the specified outputs. It returns the amount of downloaded bytes.
// It returns the number of logical and real bytes downloaded, which may be different from sum
// of sizes of the files due to dedupping and compression.
func (c *Client) DownloadOutputs(ctx context.Context, outs map[string]*TreeOutput, outDir string, cache filemetadata.Cache) (*MovedBytesMetadata, error) {
var symlinks, copies []*TreeOutput
downloads := make(map[digest.Digest]*TreeOutput)
fullStats := &MovedBytesMetadata{}
for _, out := range outs {
path := filepath.Join(outDir, out.Path)
if out.IsEmptyDirectory {
if err := os.MkdirAll(path, c.DirMode); err != nil {
return fullStats, err
}
continue
}
if err := os.MkdirAll(filepath.Dir(path), c.DirMode); err != nil {
return fullStats, err
}
// We create the symbolic links after all regular downloads are finished, because dangling
// links will not work.
if out.SymlinkTarget != "" {
symlinks = append(symlinks, out)
continue
}
if _, ok := downloads[out.Digest]; ok {
copies = append(copies, out)
// All copies are effectivelly cached
fullStats.Requested += out.Digest.Size
fullStats.Cached += out.Digest.Size
} else {
downloads[out.Digest] = out
}
}
stats, err := c.DownloadFiles(ctx, outDir, downloads)
fullStats.addFrom(stats)
if err != nil {
return fullStats, err
}
for _, output := range downloads {
path := output.Path
md := &filemetadata.Metadata{
Digest: output.Digest,
IsExecutable: output.IsExecutable,
}
absPath := path
if !filepath.IsAbs(absPath) {
absPath = filepath.Join(outDir, absPath)
}
if err := cache.Update(absPath, md); err != nil {
return fullStats, err
}
}
for _, out := range copies {
perm := c.RegularMode
if out.IsExecutable {
perm = c.ExecutableMode
}
src := downloads[out.Digest]
if src.IsEmptyDirectory {
return fullStats, fmt.Errorf("unexpected empty directory: %s", src.Path)
}
if err := copyFile(outDir, outDir, src.Path, out.Path, perm); err != nil {
return fullStats, err
}
}
for _, out := range symlinks {
if err := os.Symlink(out.SymlinkTarget, filepath.Join(outDir, out.Path)); err != nil {
return fullStats, err
}
}
return fullStats, nil
}
// DownloadDirectory downloads the entire directory of given digest.
// It returns the number of logical and real bytes downloaded, which may be different from sum
// of sizes of the files due to dedupping and compression.
func (c *Client) DownloadDirectory(ctx context.Context, d digest.Digest, outDir string, cache filemetadata.Cache) (map[string]*TreeOutput, *MovedBytesMetadata, error) {
dir := &repb.Directory{}
stats := &MovedBytesMetadata{}
protoStats, err := c.ReadProto(ctx, d, dir)
stats.addFrom(protoStats)
if err != nil {
return nil, stats, fmt.Errorf("digest %v cannot be mapped to a directory proto: %v", d, err)
}
dirs, err := c.GetDirectoryTree(ctx, d.ToProto())
if err != nil {
return nil, stats, err
}
outputs, err := c.FlattenTree(&repb.Tree{
Root: dir,
Children: dirs,
}, "")
if err != nil {
return nil, stats, err
}
outStats, err := c.DownloadOutputs(ctx, outputs, outDir, cache)
stats.addFrom(outStats)
return outputs, stats, err
}
// zstdDecoder is a shared instance that should only be used in stateless mode, i.e. only by calling DecodeAll()
var zstdDecoder, _ = zstd.NewReader(nil)
// BatchDownloadBlobs downloads a number of blobs from the CAS to memory. They must collectively be below the
// maximum total size for a batch read, which is about 4 MB (see MaxBatchSize). Digests must be
// computed in advance by the caller. In case multiple errors occur during the blob read, the
// last error will be returned.
func (c *Client) BatchDownloadBlobs(ctx context.Context, dgs []digest.Digest) (map[digest.Digest][]byte, error) {
if len(dgs) > int(c.MaxBatchDigests) {
return nil, fmt.Errorf("batch read of %d total blobs exceeds maximum of %d", len(dgs), c.MaxBatchDigests)
}
req := &repb.BatchReadBlobsRequest{InstanceName: c.InstanceName}
if c.batchCompression {
req.AcceptableCompressors = []repb.Compressor_Value{repb.Compressor_ZSTD}
}
var sz int64
foundEmpty := false
for _, dg := range dgs {
if dg.Size == 0 {
foundEmpty = true
continue
}
sz += int64(dg.Size)
req.Digests = append(req.Digests, dg.ToProto())
}
if sz > int64(c.MaxBatchSize) {
return nil, fmt.Errorf("batch read of %d total bytes exceeds maximum of %d", sz, c.MaxBatchSize)
}
res := make(map[digest.Digest][]byte)
if foundEmpty {
res[digest.Empty] = nil
}
opts := c.RPCOpts()
closure := func() error {
var resp *repb.BatchReadBlobsResponse
err := c.CallWithTimeout(ctx, "BatchReadBlobs", func(ctx context.Context) (e error) {
resp, e = c.cas.BatchReadBlobs(ctx, req, opts...)
return e
})
if err != nil {
return err
}
numErrs, errDg, errMsg := 0, &repb.Digest{}, ""
var failedDgs []*repb.Digest
var retriableError error
allRetriable := true
for _, r := range resp.Responses {
st := status.FromProto(r.Status)
if st.Code() != codes.OK {
e := st.Err()
if c.Retrier.ShouldRetry(e) {
failedDgs = append(failedDgs, r.Digest)
retriableError = e
} else {
allRetriable = false
}
numErrs++
errDg = r.Digest
errMsg = r.Status.Message
} else {
switch r.Compressor {
case repb.Compressor_IDENTITY:
// do nothing
case repb.Compressor_ZSTD:
b, err := zstdDecoder.DecodeAll(r.Data, nil)
if err != nil {
errDg = r.Digest
errMsg = err.Error()
continue
}
r.Data = b
default:
errDg = r.Digest
errMsg = fmt.Sprintf("blob returned with unsupported compressor %s", r.Compressor)
continue
}
res[digest.NewFromProtoUnvalidated(r.Digest)] = r.Data
}
}
req.Digests = failedDgs
if numErrs > 0 {
if allRetriable {
return retriableError // Retriable errors only, retry the failed digests.
}
return fmt.Errorf("downloading blobs as part of a batch resulted in %d failures, including blob %s: %s", numErrs, errDg, errMsg)
}
return nil
}
return res, c.Retrier.Do(ctx, closure)
}
// ReadBlob fetches a blob from the CAS into a byte slice.
// Returns the size of the blob and the amount of bytes moved through the wire.
func (c *Client) ReadBlob(ctx context.Context, d digest.Digest) ([]byte, *MovedBytesMetadata, error) {
return c.readBlob(ctx, d, 0, 0)
}
// ReadBlobRange fetches a partial blob from the CAS into a byte slice, starting from offset bytes
// and including at most limit bytes (or no limit if limit==0). The offset must be non-negative and
// no greater than the size of the entire blob. The limit must not be negative, but offset+limit may
// be greater than the size of the entire blob.
func (c *Client) ReadBlobRange(ctx context.Context, d digest.Digest, offset, limit int64) ([]byte, *MovedBytesMetadata, error) {
return c.readBlob(ctx, d, offset, limit)
}
// ReadBlobToFile fetches a blob with a provided digest name from the CAS, saving it into a file.
// It returns the number of bytes read.
func (c *Client) ReadBlobToFile(ctx context.Context, d digest.Digest, fpath string) (*MovedBytesMetadata, error) {
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.RegularMode)
if err != nil {
return nil, err
}
defer f.Close()
return c.readBlobStreamed(ctx, d, 0, 0, f)
}
// ReadProto reads a blob from the CAS and unmarshals it into the given message.
// Returns the size of the proto and the amount of bytes moved through the wire.
func (c *Client) ReadProto(ctx context.Context, d digest.Digest, msg proto.Message) (*MovedBytesMetadata, error) {
bytes, stats, err := c.ReadBlob(ctx, d)
if err != nil {
return stats, err
}
return stats, proto.Unmarshal(bytes, msg)
}
// Returns the size of the blob and the amount of bytes moved through the wire.
func (c *Client) readBlob(ctx context.Context, dg digest.Digest, offset, limit int64) ([]byte, *MovedBytesMetadata, error) {
// int might be 32-bit, in which case we could have a blob whose size is representable in int64
// but not int32, and thus can't fit in a slice. We can check for this by casting and seeing if
// the result is negative, since 32 bits is big enough wrap all out-of-range values of int64 to
// negative numbers. If int is 64-bits, the cast is a no-op and so the condition will always fail.
if int(dg.Size) < 0 {
return nil, nil, fmt.Errorf("digest size %d is too big to fit in a byte slice", dg.Size)
}
if offset > dg.Size {
return nil, nil, fmt.Errorf("offset %d out of range for a blob of size %d", offset, dg.Size)
}
if offset < 0 {
return nil, nil, fmt.Errorf("offset %d may not be negative", offset)
}
if limit < 0 {
return nil, nil, fmt.Errorf("limit %d may not be negative", limit)
}
sz := dg.Size - offset
if limit > 0 && limit < sz {
sz = limit
}
// Pad size so bytes.Buffer does not reallocate.
buf := bytes.NewBuffer(make([]byte, 0, sz+bytes.MinRead))
stats, err := c.readBlobStreamed(ctx, dg, offset, limit, buf)
return buf.Bytes(), stats, err
}
func (c *Client) readBlobStreamed(ctx context.Context, d digest.Digest, offset, limit int64, w io.Writer) (*MovedBytesMetadata, error) {
stats := &MovedBytesMetadata{}
stats.Requested = d.Size
if d.Size == 0 {
// Do not download empty blobs.
return stats, nil
}
sz := d.Size - offset
if limit > 0 && limit < sz {
sz = limit
}
wt := newWriteTracker(w)
defer func() { stats.LogicalMoved = wt.n }()
closure := func() (err error) {
name, wc, done, e := c.maybeCompressReadBlob(d, wt)
if e != nil {
return e
}
defer func() {
errC := wc.Close()
errD := <-done
close(done)
if err == nil && errC != nil {
err = errC
} else if errC != nil {
log.Errorf("Failed to close writer: %v", errC)
}
if err == nil && errD != nil {
err = errD
} else if errD != nil {
log.Errorf("Failed to finalize writing blob: %v", errD)
}
}()
wireBytes, err := c.readStreamed(ctx, name, offset+wt.n, limit, wc)
stats.RealMoved += wireBytes
if err != nil {
return err
}
return nil
}
// Only retry on transient backend issues.
if err := c.Retrier.Do(ctx, closure); err != nil {
return stats, err
}
if wt.n != sz {
return stats, fmt.Errorf("partial read of digest %s returned %d bytes, expected %d bytes", d, wt.n, sz)
}
// Incomplete reads only, since we can't reliably calculate hash without the full blob
if d.Size == sz {
// Signal for writeTracker to take the digest of the data.
if err := wt.Close(); err != nil {
return stats, err
}
// Wait for the digest to be ready.
if err := <-wt.ready; err != nil {
return stats, err
}
close(wt.ready)
if wt.dg != d {
return stats, fmt.Errorf("calculated digest %s != expected digest %s", wt.dg, d)
}
}
return stats, nil
}
// GetDirectoryTree returns the entire directory tree rooted at the given digest (which must target
// a Directory stored in the CAS).
func (c *Client) GetDirectoryTree(ctx context.Context, d *repb.Digest) (result []*repb.Directory, err error) {
if digest.NewFromProtoUnvalidated(d).IsEmpty() {
return []*repb.Directory{&repb.Directory{}}, nil
}
pageTok := ""
result = []*repb.Directory{}
closure := func(ctx context.Context) error {
stream, err := c.GetTree(ctx, &repb.GetTreeRequest{
InstanceName: c.InstanceName,
RootDigest: d,
PageToken: pageTok,
})
if err != nil {
return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
pageTok = resp.NextPageToken
result = append(result, resp.Directories...)
}
return nil
}
if err := c.Retrier.Do(ctx, func() error { return c.CallWithTimeout(ctx, "GetTree", closure) }); err != nil {
return nil, err
}
return result, nil
}
// FlattenActionOutputs collects and flattens all the outputs of an action.
// It downloads the output directory metadata, if required, but not the leaf file blobs.
func (c *Client) FlattenActionOutputs(ctx context.Context, ar *repb.ActionResult) (map[string]*TreeOutput, error) {
outs := make(map[string]*TreeOutput)
for _, file := range ar.OutputFiles {
outs[file.Path] = &TreeOutput{
Path: file.Path,
Digest: digest.NewFromProtoUnvalidated(file.Digest),
IsExecutable: file.IsExecutable,
}
}
for _, sm := range ar.OutputFileSymlinks {
outs[sm.Path] = &TreeOutput{
Path: sm.Path,
SymlinkTarget: sm.Target,
}
}
for _, sm := range ar.OutputDirectorySymlinks {
outs[sm.Path] = &TreeOutput{
Path: sm.Path,
SymlinkTarget: sm.Target,
}
}
for _, dir := range ar.OutputDirectories {
t := &repb.Tree{}
if _, err := c.ReadProto(ctx, digest.NewFromProtoUnvalidated(dir.TreeDigest), t); err != nil {
return nil, err
}
dirouts, err := c.FlattenTree(t, dir.Path)
if err != nil {
return nil, err
}
for _, out := range dirouts {
outs[out.Path] = out
}
}
return outs, nil
}
// DownloadActionOutputs downloads the output files and directories in the given action result. It returns the amount of downloaded bytes.
// It returns the number of logical and real bytes downloaded, which may be different from sum
// of sizes of the files due to dedupping and compression.
func (c *Client) DownloadActionOutputs(ctx context.Context, resPb *repb.ActionResult, outDir string, cache filemetadata.Cache) (*MovedBytesMetadata, error) {
outs, err := c.FlattenActionOutputs(ctx, resPb)
if err != nil {
return nil, err
}
// Remove the existing output directories before downloading.
for _, dir := range resPb.OutputDirectories {
if err := os.RemoveAll(filepath.Join(outDir, dir.Path)); err != nil {
return nil, err
}
}
return c.DownloadOutputs(ctx, outs, outDir, cache)
}
var decoderInit sync.Once
var decoders *sync.Pool
// NewCompressedWriteBuffer creates wraps a io.Writer contained compressed contents to write
// decompressed contents.
func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
// Our Bytestream abstraction uses a Writer so that the bytestream interface can "write"
// the data upstream. However, the zstd library only has an interface from a reader.
// Instead of writing a different bytestream version that returns a reader, we're piping
// the writer data.
r, nw := io.Pipe()
decoderInit.Do(func() {
decoders = syncpool.NewDecoderPool(zstd.WithDecoderConcurrency(1))
})
decdIntf := decoders.Get()
decoderW, ok := decdIntf.(*syncpool.DecoderWrapper)
if !ok || decoderW == nil {
return nil, nil, fmt.Errorf("failed creating new decoder")
}
if err := decoderW.Reset(r); err != nil {
return nil, nil, err
}
done := make(chan error)
go func() {
// WriteTo will block until the reader is closed - or, in this
// case, the pipe writer, so we have to launch our compressor in a
// separate thread. As such, we also need a way to signal the main
// thread that the decoding has finished - which will have some delay
// from the last Write call.
_, err := decoderW.WriteTo(w)
if err != nil {
// Because WriteTo returned early, the pipe writers still
// have to go somewhere or they'll block execution.
io.Copy(io.Discard, r)
}
// DecoderWrapper.Close moves the decoder back to the Pool.
decoderW.Close()
done <- err
}()
return nw, done, nil
}
// writerTracker is useful as an midware before writing to a Read caller's
// underlying data. Since cas.go should be responsible for sanity checking data,
// and potentially having to re-open files on disk to do a checking earlier
// on the call stack, we dup the writes through a digest creator and track
// how much data was written.
type writerTracker struct {
w io.Writer
pw *io.PipeWriter
dg digest.Digest
// Tracked independently of the digest as we might want to retry
// on partial reads.
n int64
ready chan error
}
func newWriteTracker(w io.Writer) *writerTracker {
pr, pw := io.Pipe()
wt := &writerTracker{
pw: pw,
w: w,
ready: make(chan error, 1),
n: 0,
}
go func() {
var err error
wt.dg, err = digest.NewFromReader(pr)
wt.ready <- err
}()
return wt
}
func (wt *writerTracker) Write(p []byte) (int, error) {
// Any error on this write will be reflected on the
// pipe reader end when trying to calculate the digest.
// Additionally, if we are not downloading the entire
// blob, we can't even verify the digest to begin with.
// So we can ignore errors on this pipewriter.
wt.pw.Write(p)
n, err := wt.w.Write(p)
wt.n += int64(n)
return n, err
}
// Close closes the pipe - which triggers the end of the
// digest creation.
func (wt *writerTracker) Close() error {
return wt.pw.Close()
}
type downloadRequest struct {
digest digest.Digest
outDir string
// TODO(olaola): use channels for cancellations instead of embedding download context.
context context.Context
output *TreeOutput
meta *contextmd.Metadata
wait chan<- *downloadResponse
}
type downloadResponse struct {
stats *MovedBytesMetadata
err error
}
func (c *Client) downloadProcessor(ctx context.Context) {
var buffer []*downloadRequest
ticker := time.NewTicker(time.Duration(c.UnifiedDownloadTickDuration))
for {
select {
case ch, ok := <-c.casDownloadRequests:
if !ok {
// Client is exiting. Notify remaining downloads to prevent deadlocks.
ticker.Stop()
if buffer != nil {
for _, r := range buffer {
r.wait <- &downloadResponse{err: context.Canceled}
}
}
return
}
buffer = append(buffer, ch)
if len(buffer) >= int(c.UnifiedDownloadBufferSize) {
c.download(ctx, buffer)
buffer = nil
}
case <-ticker.C:
if buffer != nil {
c.download(ctx, buffer)
buffer = nil
}
}
}
}
func (c *Client) download(ctx context.Context, data []*downloadRequest) {
// It is possible to have multiple same files download to different locations.
// This will download once and copy to the other locations.
reqs := make(map[digest.Digest][]*downloadRequest)
var metas []*contextmd.Metadata
for _, r := range data {
rs := reqs[r.digest]
rs = append(rs, r)
reqs[r.digest] = rs
metas = append(metas, r.meta)
}
var dgs []digest.Digest
if bool(c.useBatchOps) && bool(c.UtilizeLocality) {
paths := make([]*TreeOutput, 0, len(data))
for _, r := range data {
paths = append(paths, r.output)
}
// This is to utilize locality in disk when writing files.
sort.Slice(paths, func(i, j int) bool {
return paths[i].Path < paths[j].Path
})
for _, path := range paths {
dgs = append(dgs, path.Digest)
}
} else {
for dg := range reqs {
dgs = append(dgs, dg)
}
}
unifiedMeta := contextmd.MergeMetadata(metas...)
var err error
if unifiedMeta.ActionID != "" {
ctx, err = contextmd.WithMetadata(ctx, unifiedMeta)
}
if err != nil {
afterDownload(dgs, reqs, map[digest.Digest]*MovedBytesMetadata{}, err)
return
}
contextmd.Infof(ctx, log.Level(2), "%d digests to download (%d reqs)", len(dgs), len(reqs))
var batches [][]digest.Digest
if c.useBatchOps {
batches = c.makeBatches(ctx, dgs, !bool(c.UtilizeLocality))
} else {
contextmd.Infof(ctx, log.Level(2), "Downloading them individually")
for i := range dgs {
contextmd.Infof(ctx, log.Level(3), "Creating single batch of blob %s", dgs[i])
batches = append(batches, dgs[i:i+1])
}
}
for i, batch := range batches {
i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
go func() {
if c.casDownloaders.Acquire(ctx, 1) == nil {
defer c.casDownloaders.Release(1)
}
if i%logInterval == 0 {
contextmd.Infof(ctx, log.Level(2), "%d batches left to download", len(batches)-i)
}
if len(batch) > 1 {
c.downloadBatch(ctx, batch, reqs)
} else {
rs := reqs[batch[0]]
downloadCtx := ctx
if len(rs) == 1 {
// We have only one download request for this digest.
// Download on same context as the issuing request, to support proper cancellation.
downloadCtx = rs[0].context
}
c.downloadSingle(downloadCtx, batch[0], reqs)
}
}()
}
}
func (c *Client) downloadBatch(ctx context.Context, batch []digest.Digest, reqs map[digest.Digest][]*downloadRequest) {
contextmd.Infof(ctx, log.Level(3), "Downloading batch of %d files", len(batch))
bchMap, err := c.BatchDownloadBlobs(ctx, batch)
if err != nil {
afterDownload(batch, reqs, map[digest.Digest]*MovedBytesMetadata{}, err)
return
}
for _, dg := range batch {
stats := &MovedBytesMetadata{
Requested: dg.Size,
LogicalMoved: dg.Size,
// There's no compression for batch requests, and there's no such thing as "partial" data for
// a blob since they're all inlined in the response.
RealMoved: dg.Size,
}
data := bchMap[dg]
for i, r := range reqs[dg] {
perm := c.RegularMode
if r.output.IsExecutable {
perm = c.ExecutableMode
}
// bytesMoved will be zero for error cases.
// We only report it to the first client to prevent double accounting.
r.wait <- &downloadResponse{
stats: stats,
err: os.WriteFile(filepath.Join(r.outDir, r.output.Path), data, perm),
}
if i == 0 {
// Prevent races by not writing to the original stats.
newStats := &MovedBytesMetadata{}
newStats.Requested = stats.Requested
newStats.Cached = stats.LogicalMoved
newStats.RealMoved = 0
newStats.LogicalMoved = 0
stats = newStats
}
}
}
}
func (c *Client) downloadSingle(ctx context.Context, dg digest.Digest, reqs map[digest.Digest][]*downloadRequest) (err error) {
// The lock is released when all file copies are finished.
// We cannot release the lock after each individual file copy, because
// the caller might move the file, and we don't have the contents in memory.
bytesMoved := map[digest.Digest]*MovedBytesMetadata{}
defer func() { afterDownload([]digest.Digest{dg}, reqs, bytesMoved, err) }()
rs := reqs[dg]
if len(rs) < 1 {
return fmt.Errorf("Failed precondition: cannot find %v in reqs map", dg)
}
r := rs[0]
rs = rs[1:]
path := filepath.Join(r.outDir, r.output.Path)
contextmd.Infof(ctx, log.Level(3), "Downloading single file with digest %s to %s", r.output.Digest, path)
stats, err := c.ReadBlobToFile(ctx, r.output.Digest, path)
if err != nil {
return err
}
bytesMoved[r.output.Digest] = stats
if r.output.IsExecutable {
if err := os.Chmod(path, c.ExecutableMode); err != nil {
return err
}
}
for _, cp := range rs {
perm := c.RegularMode
if cp.output.IsExecutable {
perm = c.ExecutableMode
}
if err := copyFile(r.outDir, cp.outDir, r.output.Path, cp.output.Path, perm); err != nil {
return err
}
}
return err
}
// This is a legacy function used only when UnifiedDownloads=false.
// It will be removed when UnifiedDownloads=true is stable.
// Returns the number of logical and real bytes downloaded, which may be
// different from sum of sizes of the files due to compression.
func (c *Client) downloadNonUnified(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error) {
var dgs []digest.Digest
// statsMu protects stats across threads.
statsMu := sync.Mutex{}
fullStats := &MovedBytesMetadata{}
if bool(c.useBatchOps) && bool(c.UtilizeLocality) {
paths := make([]*TreeOutput, 0, len(outputs))
for _, output := range outputs {
paths = append(paths, output)
}
// This is to utilize locality in disk when writing files.
sort.Slice(paths, func(i, j int) bool {
return paths[i].Path < paths[j].Path
})
for _, path := range paths {
dgs = append(dgs, path.Digest)
fullStats.Requested += path.Digest.Size
}
} else {
for dg := range outputs {
dgs = append(dgs, dg)
fullStats.Requested += dg.Size
}
}
contextmd.Infof(ctx, log.Level(2), "%d items to download", len(dgs))
var batches [][]digest.Digest
if c.useBatchOps {
batches = c.makeBatches(ctx, dgs, !bool(c.UtilizeLocality))
} else {
contextmd.Infof(ctx, log.Level(2), "Downloading them individually")
for i := range dgs {
contextmd.Infof(ctx, log.Level(3), "Creating single batch of blob %s", dgs[i])
batches = append(batches, dgs[i:i+1])
}
}
eg, eCtx := errgroup.WithContext(ctx)
for i, batch := range batches {
i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
if err := c.casDownloaders.Acquire(eCtx, 1); err != nil {
return err
}
defer c.casDownloaders.Release(1)
if i%logInterval == 0 {
contextmd.Infof(ctx, log.Level(2), "%d batches left to download", len(batches)-i)
}
if len(batch) > 1 {
contextmd.Infof(ctx, log.Level(3), "Downloading batch of %d files", len(batch))
bchMap, err := c.BatchDownloadBlobs(eCtx, batch)
for _, dg := range batch {
data := bchMap[dg]
out := outputs[dg]
perm := c.RegularMode
if out.IsExecutable {
perm = c.ExecutableMode
}
if err := os.WriteFile(filepath.Join(outDir, out.Path), data, perm); err != nil {
return err
}
statsMu.Lock()
fullStats.LogicalMoved += int64(len(data))
fullStats.RealMoved += int64(len(data))
statsMu.Unlock()
}
if err != nil {
return err
}
} else {
out := outputs[batch[0]]
path := filepath.Join(outDir, out.Path)
contextmd.Infof(ctx, log.Level(3), "Downloading single file with digest %s to %s", out.Digest, path)
stats, err := c.ReadBlobToFile(ctx, out.Digest, path)
if err != nil {
return err
}
statsMu.Lock()
fullStats.addFrom(stats)
statsMu.Unlock()
if out.IsExecutable {
if err := os.Chmod(path, c.ExecutableMode); err != nil {
return err
}
}
}
if eCtx.Err() != nil {
return eCtx.Err()
}
return nil
})
}
contextmd.Infof(ctx, log.Level(3), "Waiting for remaining jobs")
err := eg.Wait()
contextmd.Infof(ctx, log.Level(3), "Done")
return fullStats, err
}
func afterDownload(batch []digest.Digest, reqs map[digest.Digest][]*downloadRequest, bytesMoved map[digest.Digest]*MovedBytesMetadata, err error) {
if err != nil {
log.Errorf("Error downloading %v: %v", batch[0], err)
}
for _, dg := range batch {
rs, ok := reqs[dg]
if !ok {
log.Errorf("Precondition failed: download request not found in input %v.", dg)
}
stats, ok := bytesMoved[dg]
if !ok {
log.Errorf("Internal tool error - matching map entry")
continue
}
// If there's no real bytes moved it likely means there was an error moving these.
for i, r := range rs {
// bytesMoved will be zero for error cases.
// We only report it to the first client to prevent double accounting.
r.wait <- &downloadResponse{stats: stats, err: err}
if i == 0 {
// Prevent races by not writing to the original stats.
newStats := &MovedBytesMetadata{}
newStats.Requested = stats.Requested
newStats.Cached = stats.LogicalMoved
newStats.RealMoved = 0
newStats.LogicalMoved = 0
stats = newStats
}
}
}
}
type writeDummyCloser struct {
io.Writer
}
func (w *writeDummyCloser) Close() error { return nil }
func (c *Client) resourceNameRead(hash string, sizeBytes int64) string {
return fmt.Sprintf("%s/blobs/%s/%d", c.InstanceName, hash, sizeBytes)
}
// TODO(rubensf): Converge compressor to proto in https://github.com/bazelbuild/remote-apis/pull/168 once
// that gets merged in.
func (c *Client) resourceNameCompressedRead(hash string, sizeBytes int64) string {
return fmt.Sprintf("%s/compressed-blobs/zstd/%s/%d", c.InstanceName, hash, sizeBytes)
}
// maybeCompressReadBlob will, depending on the client configuration, set the blobs to be
// read compressed. It returns the appropriate resource name.
func (c *Client) maybeCompressReadBlob(d digest.Digest, w io.Writer) (string, io.WriteCloser, chan error, error) {
if !c.shouldCompress(d.Size) {
// If we aren't compressing the data, theere's nothing to wait on.
dummyDone := make(chan error, 1)
dummyDone <- nil
return c.resourceNameRead(d.Hash, d.Size), &writeDummyCloser{w}, dummyDone, nil
}
cw, done, err := NewCompressedWriteBuffer(w)
if err != nil {
return "", nil, nil, err
}
return c.resourceNameCompressedRead(d.Hash, d.Size), cw, done, nil
}