blob: 71497071bff0dcb879872e8049ba79e33cd72e4e [file] [log] [blame]
package client
import (
"bytes"
"context"
"fmt"
"io"
"os"
log "github.com/golang/glog"
"github.com/pkg/errors"
bspb "google.golang.org/genproto/googleapis/bytestream"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
)
// WriteBytes uploads a byte slice.
func (c *Client) WriteBytes(ctx context.Context, name string, data []byte) error {
ue := uploadinfo.EntryFromBlob(data)
ch, err := chunker.New(ue, false, int(c.ChunkMaxSize))
if err != nil {
return err
}
_, err = c.writeChunked(ctx, name, ch, false, 0)
return err
}
// WriteBytesAtRemoteOffset uploads a byte slice with a given resource name to the CAS
// at an arbitrary offset but retries still resend from the initial Offset. As of now(2023-02-08),
// ByteStream.WriteRequest.FinishWrite and an arbitrary offset are supported for uploads with LogStream
// resource name. If doNotFinalize is set to true, ByteStream.WriteRequest.FinishWrite will be set to false.
func (c *Client) WriteBytesAtRemoteOffset(ctx context.Context, name string, data []byte, doNotFinalize bool, initialOffset int64) (int64, error) {
ue := uploadinfo.EntryFromBlob(data)
ch, err := chunker.New(ue, false, int(c.ChunkMaxSize))
if err != nil {
return 0, errors.Wrap(err, "failed to create a chunk")
}
writtenBytes, err := c.writeChunked(ctx, name, ch, doNotFinalize, initialOffset)
if err != nil {
return 0, err
}
return writtenBytes, nil
}
// writeChunked uploads chunked data with a given resource name to the CAS.
func (c *Client) writeChunked(ctx context.Context, name string, ch *chunker.Chunker, doNotFinalize bool, initialOffset int64) (int64, error) {
var totalBytes int64
closure := func() error {
// Retry by starting the stream from the beginning.
if err := ch.Reset(); err != nil {
return errors.Wrap(err, "failed to Reset")
}
totalBytes = int64(0)
// TODO(olaola): implement resumable uploads. initialOffset passed in allows to
// start writing data at an arbitrary offset, but retries still restart from initialOffset.
stream, err := c.Write(ctx)
if err != nil {
return err
}
for ch.HasNext() {
req := &bspb.WriteRequest{ResourceName: name}
chunk, err := ch.Next()
if err != nil {
return err
}
req.WriteOffset = chunk.Offset + initialOffset
req.Data = chunk.Data
if !ch.HasNext() && !doNotFinalize {
req.FinishWrite = true
}
err = c.CallWithTimeout(ctx, "Write", func(_ context.Context) error { return stream.Send(req) })
if err == io.EOF {
break
}
if err != nil {
return err
}
totalBytes += int64(len(req.Data))
}
if _, err := stream.CloseAndRecv(); err != nil {
return err
}
return nil
}
err := c.Retrier.Do(ctx, closure)
return totalBytes, err
}
// ReadBytes fetches a resource's contents into a byte slice.
//
// ReadBytes panics with ErrTooLarge if an attempt is made to read a resource with contents too
// large to fit into a byte array.
func (c *Client) ReadBytes(ctx context.Context, name string) ([]byte, error) {
buf := &bytes.Buffer{}
_, err := c.readStreamedRetried(ctx, name, 0, 0, buf)
return buf.Bytes(), err
}
// ReadResourceToFile fetches a resource's contents, saving it into a file.
//
// The provided resource name must be a child resource of this client's instance,
// e.g. '/blobs/abc-123/45' (NOT 'projects/foo/bar/baz').
//
// The number of bytes read is returned.
func (c *Client) ReadResourceToFile(ctx context.Context, name, fpath string) (int64, error) {
return c.readToFile(ctx, c.InstanceName+name, fpath)
}
func (c *Client) readToFile(ctx context.Context, name string, fpath string) (int64, error) {
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.RegularMode)
if err != nil {
return 0, err
}
defer f.Close()
return c.readStreamedRetried(ctx, name, 0, 0, f)
}
// readStreamed reads from a bytestream and copies the result to the provided Writer, starting
// offset bytes into the stream and reading at most limit bytes (or no limit if limit==0). The
// offset must be non-negative, and an error may be returned if the offset is past the end of the
// stream. The limit must be non-negative, although offset+limit may exceed the length of the
// stream.
func (c *Client) readStreamed(ctx context.Context, name string, offset, limit int64, w io.Writer) (int64, error) {
stream, err := c.Read(ctx, &bspb.ReadRequest{
ResourceName: name,
ReadOffset: offset,
ReadLimit: limit,
})
if err != nil {
return 0, err
}
var n int64
for {
var resp *bspb.ReadResponse
err := c.CallWithTimeout(ctx, "Read", func(_ context.Context) error {
r, err := stream.Recv()
resp = r
return err
})
if err == io.EOF {
break
}
if err != nil {
return 0, err
}
log.V(3).Infof("Read: resource:%s offset:%d len(data):%d", name, offset, len(resp.Data))
nm, err := w.Write(resp.Data)
if err != nil {
// Wrapping the error to ensure it may never get retried.
return int64(nm), fmt.Errorf("failed to write to output stream: %v", err)
}
sz := len(resp.Data)
if nm != sz {
return int64(nm), fmt.Errorf("received %d bytes but could only write %d", sz, nm)
}
n += int64(sz)
if limit > 0 {
limit -= int64(sz)
if limit <= 0 {
break
}
}
}
return n, nil
}
func (c *Client) readStreamedRetried(ctx context.Context, name string, offset, limit int64, w io.Writer) (int64, error) {
var n int64
closure := func() error {
m, err := c.readStreamed(ctx, name, offset+n, limit, w)
n += m
return err
}
return n, c.Retrier.Do(ctx, closure)
}