blob: fc75ba4bb5364c9ef311989b0f684f4ca95f625a [file] [log] [blame]
package cas
import (
repb ""
bspb ""
// ErrFilteredSymlinkTarget is returned when a symlink's target was filtered out
// via UploadInput.PathExclude or ErrSkip, while the symlink itself wasn't.
var ErrFilteredSymlinkTarget = errors.New("symlink's target was filtered out")
// zstdEncoders is a pool of ZStd encoders.
// Clients of this pool must call Close() on the encoder after using the
// encoder.
var zstdEncoders = sync.Pool{
New: func() interface{} {
enc, _ := zstd.NewWriter(nil)
return enc
// UploadInput is one of inputs to Client.Upload function.
// It can be either a reference to a file/dir (see Path) or it can be an
// in-memory blob (see Content).
type UploadInput struct {
// Path to the file or a directory to upload.
// If empty, the Content is uploaded instead.
// Must be absolute or relative to CWD.
Path string
// Contents to upload.
// Ignored if Path is not empty.
Content []byte
// PathExclude is a file/dir filter. If PathExclude is not nil and the
// absolute path of a file/dir match this regexp, then the file/dir is skipped.
// If the Path is a directory, then the filter is evaluated against each file
// in the subtree.
// See ErrSkip comments for more details on semantics regarding excluding symlinks .
// This field has no effect if Path is empty.
PathExclude *regexp.Regexp
// TransferStats is upload/download statistics.
type TransferStats struct {
CacheHits DigestStat
CacheMisses DigestStat
Streamed DigestStat // streamed transfers
Batched DigestStat // batched transfers
// DigestStat is aggregated statistics over a set of digests.
type DigestStat struct {
Digests int64 // number of unique digests
Bytes int64 // total sum of of digest sizes
// TODO(nodir): add something like TransferBytes, i.e. how much was actually transfered
// UploadOptions is optional configuration for Upload function.
// The default options are the zero value of this struct.
type UploadOptions struct {
// PreserveSymlinks specifies whether to preserve symlinks or convert them
// to regular files.
PreserveSymlinks bool
// AllowDanglingSymlinks specifies whether to upload dangling links or halt
// the upload with an error.
// This field is ignored if PreserveSymlinks is false, which is the default.
AllowDanglingSymlinks bool
// Callback is called for each file/dir to be uploaded.
// If it returns an error which is ErrSkip according to errors.Is, then the
// file/dir is not processed.
// If it returns another error, then the upload is halted with that error.
// Callback might be called multiple times for the same file if different
// UploadInputs directly/indirectly refer to the same file, but with different
// PathExclude.
// Callback is called from different goroutines.
Callback func(absPath string, mode os.FileMode) error
// ErrSkip, when returned by UploadOptions.Callback, means the file/dir must be
// not be uploaded.
// Note that if UploadOptions.PreserveSymlinks is true and the ErrSkip is
// returned for a symlink target, but not the symlink itself, then it may
// result in a dangling symlink.
var ErrSkip = errors.New("skip file")
// Upload uploads all inputs. It exits when inputC is closed or ctx is canceled.
func (c *Client) Upload(ctx context.Context, opt UploadOptions, inputC <-chan *UploadInput) (stats *TransferStats, err error) {
eg, ctx := errgroup.WithContext(ctx)
// Do not exit until all sub-goroutines exit, to prevent goroutine leaks.
defer eg.Wait()
u := &uploader{
Client: c,
UploadOptions: opt,
eg: eg,
// Initialize checkBundler, which checks if a blob is present on the server.
var wgChecks sync.WaitGroup
u.checkBundler = bundler.NewBundler(&uploadItem{}, func(items interface{}) {
// Handle errors and context cancelation via errgroup.
eg.Go(func() error {
defer wgChecks.Done()
return u.check(ctx, items.([]*uploadItem))
// Given that all digests are small (no more than 40 bytes), the count limit
// is the bottleneck.
// We might run into the request size limits only if we have >100K digests.
u.checkBundler.BundleCountThreshold = u.FindMissingBlobs.MaxItems
// Initialize batchBundler, which uploads blobs in batches.
u.batchBundler = bundler.NewBundler(&repb.BatchUpdateBlobsRequest_Request{}, func(subReq interface{}) {
// Handle errors and context cancelation via errgroup.
eg.Go(func() error {
return u.uploadBatch(ctx, subReq.([]*repb.BatchUpdateBlobsRequest_Request))
// Limit the sum of sub-request sizes to (maxRequestSize - requestOverhead).
// Subtract 1KB to be on the safe side.
u.batchBundler.BundleByteLimit = c.BatchUpdateBlobs.MaxSizeBytes - int(marshalledFieldSize(int64(len(c.InstanceName)))) - 1000
u.batchBundler.BundleCountThreshold = c.BatchUpdateBlobs.MaxItems
// Start processing input.
eg.Go(func() error {
// Before exiting this main goroutine, ensure all the work has been completed.
// Just waiting for isn't enough because some work may be temporarily
// in a bundler.
defer func() {
u.checkBundler.Flush() // only after FS walk is done.
wgChecks.Wait() // only after checkBundler is flushed
u.batchBundler.Flush() // only after wgChecks is done.
for {
select {
case <-ctx.Done():
return ctx.Err()
case in, ok := <-inputC:
if !ok {
return nil
if err := u.startProcessing(ctx, in); err != nil {
return err
return &u.stats, eg.Wait()
// uploader implements a concurrent multi-stage pipeline to read blobs from the
// file system, check their presence on the server and then upload if necessary.
// Common blobs are deduplicated.
// is used to schedule work, while concurrency of individual
// expensive operations is controlled via separate semaphores.
// Special care is taken for large files: they are read sequentially, opened
// only once per file, and read with large IO size.
// Note: uploader shouldn't store semaphores/locks that protect global
// resources, such as file system. They should be stored in the Client instead.
type uploader struct {
eg *errgroup.Group
stats TransferStats
// wgFS is used to wait for all FS walking to finish.
wgFS sync.WaitGroup
// fsCache contains already-processed files.
fsCache cache.SingleFlight
// checkBundler bundles digests that need to be checked for presence on the
// server.
checkBundler *bundler.Bundler
seenDigests sync.Map // TODO: consider making it more global
// batchBundler bundles blobs that can be uploaded using UploadBlobs RPC.
batchBundler *bundler.Bundler
// startProcessing adds the item to the appropriate stage depending on its type.
func (u *uploader) startProcessing(ctx context.Context, in *UploadInput) error {
if in.Path == "" {
// Simple case: the blob is in memory.
return u.scheduleCheck(ctx, uploadItemFromBlob("", in.Content))
// Schedule a file system walk.
u.wgFS.Add(1) error {
defer u.wgFS.Done()
// Compute the absolute path only once per directory tree.
absPath, err := filepath.Abs(in.Path)
if err != nil {
return errors.Wrapf(err, "failed to get absolute path of %q", in.Path)
// Do not use os.Stat() here. We want to know if it is a symlink.
info, err := os.Lstat(absPath)
if err != nil {
return errors.WithStack(err)
_, err = u.visitPath(ctx, absPath, info, in.PathExclude)
return errors.Wrapf(err, "%q", absPath)
return nil
// visitPath visits the file/dir depending on its type (regular, dir, symlink).
// Visits each file only once.
// If the file should be skipped, then returns (nil, nil).
func (u *uploader) visitPath(ctx context.Context, absPath string, info os.FileInfo, pathExclude *regexp.Regexp) (dirEntry proto.Message, err error) {
// First, check if the file passes all filters.
if pathExclude != nil && pathExclude.MatchString(absPath) {
return nil, nil
// Call the callback only after checking the pathExclude.
if u.Callback != nil {
switch err := u.Callback(absPath, info.Mode()); {
case errors.Is(err, ErrSkip):
return nil, nil
case err != nil:
return nil, err
// Make a cache key.
type cacheKeyType struct {
AbsPath string
ExcludeRegexp string
cacheKey := cacheKeyType{
AbsPath: absPath,
if pathExclude != nil {
cacheKey.ExcludeRegexp = pathExclude.String()
node, err := u.fsCache.LoadOrStore(cacheKey, func() (interface{}, error) {
switch {
case info.Mode()&os.ModeSymlink == os.ModeSymlink:
return u.visitSymlink(ctx, absPath, pathExclude)
case info.Mode().IsDir():
return u.visitDir(ctx, absPath, pathExclude)
case info.Mode().IsRegular():
return u.visitRegularFile(ctx, absPath, info)
return nil, fmt.Errorf("unexpected file mode %s", info.Mode())
if err != nil {
return nil, err
return node.(proto.Message), nil
// visitRegularFile computes the hash of a regular file and schedules a presence
// check.
// It distinguishes three categories of file sizes:
// - small: small files are buffered in memory entirely, thus read only once.
// They are treated same as UploadInput with Contents and without Path.
// See also ClientConfig.SmallFileThreshold.
// - medium: the hash is computed, the file is closed and a presence check is
// scheduled.
// - large: the hash is computed, the file is rewinded without closing and
// streamed via ByteStream.
// If the file is already present on the server, the ByteStream preempts
// the stream with EOF and WriteResponse.CommittedSize == Digest.Size.
// Rewinding helps locality: there is no delay between reading the file for
// the first and the second times.
// Only one large file is processed at a time because most GCE disks are
// network disks. Reading many large files concurrently appears to saturate
// the network and slows down the progress.
// See also ClientConfig.LargeFileThreshold.
func (u *uploader) visitRegularFile(ctx context.Context, absPath string, info os.FileInfo) (*repb.FileNode, error) {
isLarge := info.Size() >= u.LargeFileThreshold
// Lock the mutex before acquiring a semaphore to avoid hogging the latter.
if isLarge {
// Read only one large file at a time.
defer u.muLargeFile.Unlock()
if err := u.semFileIO.Acquire(ctx, 1); err != nil {
return nil, err
defer u.semFileIO.Release(1)
f, err := u.openFileSource(absPath)
if err != nil {
return nil, err
defer f.Close()
ret := &repb.FileNode{
Name: info.Name(),
IsExecutable: (info.Mode() & 0100) != 0,
if info.Size() <= u.SmallFileThreshold {
// This file is small enough to buffer it entirely.
contents, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
item := uploadItemFromBlob(absPath, contents)
ret.Digest = item.Digest
return ret, u.scheduleCheck(ctx, item)
// It is a medium or large file.
// Compute the hash.
dig, err := digest.NewFromReader(f)
if err != nil {
return nil, errors.Wrapf(err, "failed to compute hash")
ret.Digest = dig.ToProto()
item := &uploadItem{
Title: absPath,
Digest: ret.Digest,
if isLarge {
// Large files are special: locality is important - we want to re-read the
// file ASAP.
// Also we are not going to use BatchUploads anyway, so we can take
// advantage of ByteStream's built-in presence check.
item.Open = func() (uploadSource, error) {
return f, f.SeekStart(0)
return ret,, item, true)
// Schedule a check and close the file (in defer).
// item.Open will reopen the file.
item.Open = func() (uploadSource, error) {
return u.openFileSource(absPath)
return ret, u.scheduleCheck(ctx, item)
func (u *uploader) openFileSource(absPath string) (uploadSource, error) {
f, err := os.Open(absPath)
if err != nil {
return nil, err
return newFileSource(f, &u.fileBufReaders), nil
// visitDir reads a directory and its descendants. The function blocks until
// each descendant is visited, but the visitation happens concurrently, using
func (u *uploader) visitDir(ctx context.Context, absPath string, pathExclude *regexp.Regexp) (*repb.DirectoryNode, error) {
var mu sync.Mutex
dir := &repb.Directory{}
var subErr error
var wgChildren sync.WaitGroup
// This sub-function exist to avoid holding the semaphore while waiting for
// children.
err := func() error {
if err := u.semFileIO.Acquire(ctx, 1); err != nil {
return err
defer u.semFileIO.Release(1)
f, err := os.Open(absPath)
if err != nil {
return err
defer f.Close()
// Check the context, since file IO functions don't.
for ctx.Err() == nil {
infos, err := f.Readdir(128)
if err == io.EOF {
if err != nil {
return err
for _, info := range infos {
info := info
absChild := joinFilePathsFast(absPath, info.Name())
u.wgFS.Add(1) error {
defer wgChildren.Done()
defer u.wgFS.Done()
node, err := u.visitPath(ctx, absChild, info, pathExclude)
defer mu.Unlock()
if err != nil {
subErr = err
return err
switch node := node.(type) {
case *repb.FileNode:
dir.Files = append(dir.Files, node)
case *repb.DirectoryNode:
dir.Directories = append(dir.Directories, node)
case *repb.SymlinkNode:
dir.Symlinks = append(dir.Symlinks, node)
case nil:
// This file should be ignored.
// This condition is impossible because all functions in this file
// return one of the three types above.
panic(fmt.Sprintf("unexpected node type %T", node))
return nil
return nil
if err != nil {
return nil, err
if subErr != nil {
return nil, errors.Wrapf(subErr, "failed to read the directory %q entirely", absPath)
item := uploadItemFromDirMsg(absPath, dir)
if err := u.scheduleCheck(ctx, item); err != nil {
return nil, err
return &repb.DirectoryNode{
Name: filepath.Base(absPath),
Digest: item.Digest,
}, nil
// visitSymlink converts a symlink to a directory node and schedules visitation
// of the target file.
// If u.PreserveSymlinks is true, then returns a SymlinkNode, otherwise
// returns the directory node of the target file.
func (u *uploader) visitSymlink(ctx context.Context, absPath string, pathExclude *regexp.Regexp) (proto.Message, error) {
target, err := os.Readlink(absPath)
if err != nil {
return nil, errors.Wrapf(err, "os.ReadLink")
// Determine absolute and relative paths of the target.
var absTarget, relTarget string
symlinkDir := filepath.Dir(absPath)
target = filepath.Clean(target) // target may end with slash
if filepath.IsAbs(target) {
absTarget = target
if relTarget, err = filepath.Rel(symlinkDir, absTarget); err != nil {
return nil, err
} else {
relTarget = target
// Note: we can't use joinFilePathsFast here because relTarget may start
// with "../".
absTarget = filepath.Join(symlinkDir, relTarget)
symlinkNode := &repb.SymlinkNode{
Name: filepath.Base(absPath),
Target: filepath.ToSlash(relTarget),
targetInfo, err := os.Lstat(absTarget)
switch {
case os.IsNotExist(err) && u.PreserveSymlinks && u.AllowDanglingSymlinks:
// Special case for preserved dangling links.
return symlinkNode, nil
case err != nil:
return nil, errors.WithStack(err)
switch targetNode, err := u.visitPath(ctx, absTarget, targetInfo, pathExclude); {
case err != nil:
return nil, err
case !u.PreserveSymlinks:
return targetNode, nil
case targetNode == nil && !u.AllowDanglingSymlinks:
// The target got skipped via Callback or PathExclude,
// resulting in a dangling symlink, which is not allowed.
return nil, errors.Wrapf(ErrFilteredSymlinkTarget, "path: %q, target: %q", absPath, target)
// Note: even though we throw away targetNode, it was still important to visit the target.
return symlinkNode, nil
// uploadItem is a blob to potentially upload.
type uploadItem struct {
Title string
Digest *repb.Digest
Open func() (uploadSource, error)
func (item *uploadItem) ReadAll() ([]byte, error) {
r, err := item.Open()
if err != nil {
return nil, err
defer r.Close()
return ioutil.ReadAll(r)
// scheduleCheck schedules a blob presence check on the server. If it fails,
// then the blob is uploaded.
func (u *uploader) scheduleCheck(ctx context.Context, item *uploadItem) error {
if u.testScheduleCheck != nil {
return u.testScheduleCheck(ctx, item)
// Do not check the same digest twice.
cacheKey := digest.NewFromProtoUnvalidated(item.Digest)
if _, ok := u.seenDigests.LoadOrStore(cacheKey, struct{}{}); ok {
return nil
return u.checkBundler.AddWait(ctx, item, 0)
// check checks which items are present on the server, and schedules upload for
// the missing ones.
func (u *uploader) check(ctx context.Context, items []*uploadItem) error {
if err := u.semFindMissingBlobs.Acquire(ctx, 1); err != nil {
return err
req := &repb.FindMissingBlobsRequest{
InstanceName: u.InstanceName,
BlobDigests: make([]*repb.Digest, len(items)),
byDigest := make(map[digest.Digest]*uploadItem, len(items))
totalBytes := int64(0)
for i, item := range items {
req.BlobDigests[i] = item.Digest
byDigest[digest.NewFromProtoUnvalidated(item.Digest)] = item
totalBytes += item.Digest.SizeBytes
var res *repb.FindMissingBlobsResponse
err := u.unaryRPC(ctx, &u.FindMissingBlobs, func(ctx context.Context) (err error) {
res, err = u.cas.FindMissingBlobs(ctx, req)
if err != nil {
return err
missingBytes := int64(0)
for _, d := range res.MissingBlobDigests {
missingBytes += d.SizeBytes
item := byDigest[digest.NewFromProtoUnvalidated(d)]
if err := u.scheduleUpload(ctx, item); err != nil {
return errors.Wrapf(err, "%q", item.Title)
atomic.AddInt64(&u.stats.CacheMisses.Digests, int64(len(res.MissingBlobDigests)))
atomic.AddInt64(&u.stats.CacheMisses.Bytes, missingBytes)
atomic.AddInt64(&u.stats.CacheHits.Digests, int64(len(items)-len(res.MissingBlobDigests)))
atomic.AddInt64(&u.stats.CacheHits.Bytes, totalBytes-missingBytes)
return nil
func (u *uploader) scheduleUpload(ctx context.Context, item *uploadItem) error {
// Check if this blob can be uploaded in a batch.
if marshalledRequestSize(item.Digest) > int64(u.batchBundler.BundleByteLimit) {
// There is no way this blob can fit in a batch request. error {
return errors.Wrap(, item, false), item.Title)
return nil
// Since this blob is small enough, just read it entirely.
contents, err := item.ReadAll()
if err != nil {
return errors.Wrapf(err, "failed to read the item")
req := &repb.BatchUpdateBlobsRequest_Request{Digest: item.Digest, Data: contents}
return u.batchBundler.AddWait(ctx, req, proto.Size(req))
// uploadBatch uploads blobs in using BatchUpdateBlobs RPC.
func (u *uploader) uploadBatch(ctx context.Context, reqs []*repb.BatchUpdateBlobsRequest_Request) error {
if err := u.semBatchUpdateBlobs.Acquire(ctx, 1); err != nil {
return err
defer u.semBatchUpdateBlobs.Release(1)
reqMap := make(map[digest.Digest]*repb.BatchUpdateBlobsRequest_Request, len(reqs))
for _, r := range reqs {
reqMap[digest.NewFromProtoUnvalidated(r.Digest)] = r
req := &repb.BatchUpdateBlobsRequest{
InstanceName: u.InstanceName,
Requests: reqs,
return u.unaryRPC(ctx, &u.BatchUpdateBlobs, func(ctx context.Context) error {
res, err := u.cas.BatchUpdateBlobs(ctx, req)
if err != nil {
return err
bytesTransferred := int64(0)
digestsTransferred := int64(0)
var retriableErr error
req.Requests = req.Requests[:0] // reset for the next attempt
for _, r := range res.Responses {
if err := status.FromProto(r.Status).Err(); err != nil {
if !retry.TransientOnly(err) {
return err
// This error is retriable. Save it to return later, and
// save the failed sub-request for the next attempt.
retriableErr = err
req.Requests = append(req.Requests, reqMap[digest.NewFromProtoUnvalidated(r.Digest)])
bytesTransferred += r.Digest.SizeBytes
atomic.AddInt64(&u.stats.Batched.Bytes, bytesTransferred)
atomic.AddInt64(&u.stats.Batched.Digests, digestsTransferred)
return retriableErr
// stream uploads the item using ByteStream service.
// If the blob is already uploaded, then the function returns quickly and
// without an error.
func (u *uploader) stream(ctx context.Context, item *uploadItem, updateCacheStats bool) error {
if err := u.semByteStreamWrite.Acquire(ctx, 1); err != nil {
return err
defer u.semByteStreamWrite.Release(1)
// Open the item.
r, err := item.Open()
if err != nil {
return err
defer r.Close()
// TODO(nodir): implement per-RPC timeouts. No nice way to do it.
rewind := false
return u.withRetries(ctx, func(ctx context.Context) error {
// TODO(nodir): add support for resumable uploads.
// Do not rewind if this is the first attempt.
if rewind {
if err := r.SeekStart(0); err != nil {
return err
rewind = true
if u.CompressedBytestreamThreshold < 0 || item.Digest.SizeBytes < u.CompressedBytestreamThreshold {
// No compression.
return u.streamFromReader(ctx, r, item.Digest, false, updateCacheStats)
// Compress using an in-memory pipe. This is mostly to accomodate the fact
// that zstd package expects a writer.
// Note that using io.Pipe() means we buffer only bytes that were not uploaded yet.
pr, pw := io.Pipe()
enc := zstdEncoders.Get().(*zstd.Encoder)
defer func() {
// Read from disk and make RPCs concurrently.
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
switch _, err := enc.ReadFrom(r); {
case err == io.ErrClosedPipe:
// The other goroutine exited before we finished encoding.
// Might be a cache hit or context cancelation.
// In any case, the other goroutine has the actual error, so return nil
// here.
return nil
case err != nil:
return errors.Wrapf(err, "failed to read the file/blob")
if err := enc.Close(); err != nil {
return errors.Wrapf(err, "failed to close the zstd encoder")
return pw.Close()
eg.Go(func() error {
defer pr.Close()
return u.streamFromReader(ctx, pr, item.Digest, true, updateCacheStats)
return eg.Wait()
func (u *uploader) streamFromReader(ctx context.Context, r io.Reader, digest *repb.Digest, compressed, updateCacheStats bool) error {
stream, err := u.byteStream.Write(ctx)
if err != nil {
return err
defer stream.CloseSend()
req := &bspb.WriteRequest{}
if compressed {
req.ResourceName = fmt.Sprintf("%s/uploads/%s/compressed-blobs/zstd/%s/%d", u.InstanceName, uuid.New(), digest.Hash, digest.SizeBytes)
} else {
req.ResourceName = fmt.Sprintf("%s/uploads/%s/blobs/%s/%d", u.InstanceName, uuid.New(), digest.Hash, digest.SizeBytes)
buf := u.streamBufs.Get().([]byte)
defer u.streamBufs.Put(buf)
for {
// Before reading, check if the context if canceled.
if ctx.Err() != nil {
return ctx.Err()
// Read the next chunk from the pipe.
// Use ReadFull to ensure we aren't sending tiny blobs over RPC.
n, err := io.ReadFull(r, buf)
switch {
case err == io.EOF || err == io.ErrUnexpectedEOF:
req.FinishWrite = true
case err != nil:
return err
req.Data = buf[:n] // must limit by `:n` in ErrUnexpectedEOF case
// Send the chunk.
switch err = stream.Send(req); {
case err == io.EOF:
// The server closed the stream.
// Most likely the file is already uploaded, see the CommittedSize check below.
break chunkLoop
case err != nil:
return err
case req.FinishWrite:
break chunkLoop
// Prepare the next request.
req.ResourceName = "" // send the resource name only in the first request
req.WriteOffset += int64(len(req.Data))
// Finalize the request.
switch res, err := stream.CloseAndRecv(); {
case err != nil:
return err
case res.CommittedSize != digest.SizeBytes:
return fmt.Errorf("unexpected commitSize: got %d, want %d", res.CommittedSize, digest.SizeBytes)
// Update stats.
cacheHit := !req.FinishWrite
if !cacheHit {
atomic.AddInt64(&u.stats.Streamed.Bytes, digest.SizeBytes)
atomic.AddInt64(&u.stats.Streamed.Digests, 1)
if updateCacheStats {
st := &u.stats.CacheMisses
if cacheHit {
st = &u.stats.CacheHits
atomic.AddInt64(&st.Bytes, digest.SizeBytes)
atomic.AddInt64(&st.Digests, 1)
return nil
// uploadItemFromDirMsg creates an upload item for a directory.
// Sorts directory entries.
func uploadItemFromDirMsg(title string, dir *repb.Directory) *uploadItem {
// Normalize the dir before marshaling, for determinism.
sort.Slice(dir.Files, func(i, j int) bool {
return dir.Files[i].Name < dir.Files[j].Name
sort.Slice(dir.Directories, func(i, j int) bool {
return dir.Directories[i].Name < dir.Directories[j].Name
sort.Slice(dir.Symlinks, func(i, j int) bool {
return dir.Symlinks[i].Name < dir.Symlinks[j].Name
blob, err := proto.Marshal(dir)
if err != nil {
panic(err) // impossible
return uploadItemFromBlob(title, blob)
func uploadItemFromBlob(title string, blob []byte) *uploadItem {
item := &uploadItem{
Title: title,
Digest: digest.NewFromBlob(blob).ToProto(),
Open: func() (uploadSource, error) {
return newByteSliceSource(blob), nil
if item.Title == "" {
item.Title = fmt.Sprintf("digest %s/%d", item.Digest.Hash, item.Digest.SizeBytes)
return item
const pathSep = string(filepath.Separator)
// joinFilePathsFast is a faster version of filepath.Join because it does not
// call filepath.Clean.
func joinFilePathsFast(a, b string) string {
return a + pathSep + b
func marshalledFieldSize(size int64) int64 {
return 1 + int64(proto.SizeVarint(uint64(size))) + size
func marshalledRequestSize(d *repb.Digest) int64 {
// An additional BatchUpdateBlobsRequest_Request includes the Digest and data fields,
// as well as the message itself. Every field has a 1-byte size tag, followed by
// the varint field size for variable-sized fields (digest hash and data).
// Note that the BatchReadBlobsResponse_Response field is similar, but includes
// and additional Status proto which can theoretically be unlimited in size.
// We do not account for it here, relying on the Client setting a large (100MB)
// limit for incoming messages.
digestSize := marshalledFieldSize(int64(len(d.Hash)))
if d.SizeBytes > 0 {
digestSize += 1 + int64(proto.SizeVarint(uint64(d.SizeBytes)))
reqSize := marshalledFieldSize(digestSize)
if d.SizeBytes > 0 {
reqSize += marshalledFieldSize(int64(d.SizeBytes))
return marshalledFieldSize(reqSize)