[cas] Unembed cas.Client.ClientConfig. (#333)
Currently cas.Client struct embeds ClientConfig struct.
As a result, IDE autocompletion for cas.Client suggests lots
of low-level not-that-interesting fields, so Upload() is buried
among them.
Unembed the ClientConfig struct.
diff --git a/go/pkg/cas/client.go b/go/pkg/cas/client.go
index 309ad7b..d2cc09c 100644
--- a/go/pkg/cas/client.go
+++ b/go/pkg/cas/client.go
@@ -30,8 +30,8 @@
// InstanceName is the full name of the RBE instance.
InstanceName string
- // ClientConfig is the configuration that the client was created with.
- ClientConfig
+ // Config is the configuration that the client was created with.
+ Config ClientConfig
byteStream bspb.ByteStreamClient
cas repb.ContentAddressableStorageClient
@@ -243,12 +243,12 @@
client := &Client{
InstanceName: instanceName,
- ClientConfig: config,
+ Config: config,
conn: conn,
byteStream: bspb.NewByteStreamClient(conn),
cas: repb.NewContentAddressableStorageClient(conn),
}
- if !client.IgnoreCapabilities {
+ if !client.Config.IgnoreCapabilities {
if err := client.checkCapabilities(ctx); err != nil {
return nil, errors.Wrapf(err, "checking capabilities")
}
@@ -265,18 +265,18 @@
// creating a real gRPC connection. This function exists purely to aid testing,
// and is tightly coupled with NewClientWithConfig.
func (c *Client) init() {
- c.semFindMissingBlobs = semaphore.NewWeighted(int64(c.FindMissingBlobs.Concurrency))
- c.semBatchUpdateBlobs = semaphore.NewWeighted(int64(c.BatchUpdateBlobs.Concurrency))
- c.semByteStreamWrite = semaphore.NewWeighted(int64(c.ByteStreamWrite.Concurrency))
+ c.semFindMissingBlobs = semaphore.NewWeighted(int64(c.Config.FindMissingBlobs.Concurrency))
+ c.semBatchUpdateBlobs = semaphore.NewWeighted(int64(c.Config.BatchUpdateBlobs.Concurrency))
+ c.semByteStreamWrite = semaphore.NewWeighted(int64(c.Config.ByteStreamWrite.Concurrency))
- c.semFileIO = semaphore.NewWeighted(int64(c.FSConcurrency))
+ c.semFileIO = semaphore.NewWeighted(int64(c.Config.FSConcurrency))
c.fileBufReaders.New = func() interface{} {
- return bufio.NewReaderSize(emptyReader, int(c.FileIOSize))
+ return bufio.NewReaderSize(emptyReader, int(c.Config.FileIOSize))
}
streamBufSize := 32 * 1024 // by default, send 32KiB chunks.
- if streamBufSize > c.ByteStreamWrite.MaxSizeBytes {
- streamBufSize = int(c.ByteStreamWrite.MaxSizeBytes)
+ if streamBufSize > c.Config.ByteStreamWrite.MaxSizeBytes {
+ streamBufSize = int(c.Config.ByteStreamWrite.MaxSizeBytes)
}
c.streamBufs.New = func() interface{} {
return make([]byte, streamBufSize)
@@ -295,7 +295,7 @@
}
func (c *Client) withRetries(ctx context.Context, f func(context.Context) error) error {
- return retry.WithPolicy(ctx, retry.TransientOnly, c.RetryPolicy, func() error {
+ return retry.WithPolicy(ctx, retry.TransientOnly, c.Config.RetryPolicy, func() error {
return f(ctx)
})
}
@@ -312,8 +312,8 @@
return errors.Wrapf(err, "digest function mismatch")
}
- if c.BatchUpdateBlobs.MaxSizeBytes > int(caps.CacheCapabilities.MaxBatchTotalSizeBytes) {
- c.BatchUpdateBlobs.MaxSizeBytes = int(caps.CacheCapabilities.MaxBatchTotalSizeBytes)
+ if c.Config.BatchUpdateBlobs.MaxSizeBytes > int(caps.CacheCapabilities.MaxBatchTotalSizeBytes) {
+ c.Config.BatchUpdateBlobs.MaxSizeBytes = int(caps.CacheCapabilities.MaxBatchTotalSizeBytes)
}
// TODO(nodir): check compression capabilities.
diff --git a/go/pkg/cas/upload.go b/go/pkg/cas/upload.go
index fb68e97..3e7c142 100644
--- a/go/pkg/cas/upload.go
+++ b/go/pkg/cas/upload.go
@@ -142,7 +142,7 @@
// Given that all digests are small (no more than 40 bytes), the count limit
// is the bottleneck.
// We might run into the request size limits only if we have >100K digests.
- u.checkBundler.BundleCountThreshold = u.FindMissingBlobs.MaxItems
+ u.checkBundler.BundleCountThreshold = u.Config.FindMissingBlobs.MaxItems
// Initialize batchBundler, which uploads blobs in batches.
u.batchBundler = bundler.NewBundler(&repb.BatchUpdateBlobsRequest_Request{}, func(subReq interface{}) {
@@ -153,8 +153,8 @@
})
// Limit the sum of sub-request sizes to (maxRequestSize - requestOverhead).
// Subtract 1KB to be on the safe side.
- u.batchBundler.BundleByteLimit = c.BatchUpdateBlobs.MaxSizeBytes - int(marshalledFieldSize(int64(len(c.InstanceName)))) - 1000
- u.batchBundler.BundleCountThreshold = c.BatchUpdateBlobs.MaxItems
+ u.batchBundler.BundleByteLimit = c.Config.BatchUpdateBlobs.MaxSizeBytes - int(marshalledFieldSize(int64(len(c.InstanceName)))) - 1000
+ u.batchBundler.BundleCountThreshold = c.Config.BatchUpdateBlobs.MaxItems
// Start processing input.
eg.Go(func() error {
@@ -316,7 +316,7 @@
// the network and slows down the progress.
// See also ClientConfig.LargeFileThreshold.
func (u *uploader) visitRegularFile(ctx context.Context, absPath string, info os.FileInfo) (*repb.FileNode, error) {
- isLarge := info.Size() >= u.LargeFileThreshold
+ isLarge := info.Size() >= u.Config.LargeFileThreshold
// Lock the mutex before acquiring a semaphore to avoid hogging the latter.
if isLarge {
@@ -341,7 +341,7 @@
IsExecutable: (info.Mode() & 0100) != 0,
}
- if info.Size() <= u.SmallFileThreshold {
+ if info.Size() <= u.Config.SmallFileThreshold {
// This file is small enough to buffer it entirely.
contents, err := ioutil.ReadAll(f)
if err != nil {
@@ -590,7 +590,7 @@
}
var res *repb.FindMissingBlobsResponse
- err := u.unaryRPC(ctx, &u.FindMissingBlobs, func(ctx context.Context) (err error) {
+ err := u.unaryRPC(ctx, &u.Config.FindMissingBlobs, func(ctx context.Context) (err error) {
res, err = u.cas.FindMissingBlobs(ctx, req)
return
})
@@ -648,7 +648,7 @@
InstanceName: u.InstanceName,
Requests: reqs,
}
- return u.unaryRPC(ctx, &u.BatchUpdateBlobs, func(ctx context.Context) error {
+ return u.unaryRPC(ctx, &u.Config.BatchUpdateBlobs, func(ctx context.Context) error {
res, err := u.cas.BatchUpdateBlobs(ctx, req)
if err != nil {
return err
@@ -707,7 +707,7 @@
}
rewind = true
- if u.CompressedBytestreamThreshold < 0 || item.Digest.SizeBytes < u.CompressedBytestreamThreshold {
+ if u.Config.CompressedBytestreamThreshold < 0 || item.Digest.SizeBytes < u.Config.CompressedBytestreamThreshold {
// No compression.
return u.streamFromReader(ctx, r, item.Digest, false, updateCacheStats)
}
@@ -752,7 +752,7 @@
}
func (u *uploader) streamFromReader(ctx context.Context, r io.Reader, digest *repb.Digest, compressed, updateCacheStats bool) error {
- ctx, cancel, withTimeout := withPerCallTimeout(ctx, u.ByteStreamWrite.Timeout)
+ ctx, cancel, withTimeout := withPerCallTimeout(ctx, u.Config.ByteStreamWrite.Timeout)
defer cancel()
stream, err := u.byteStream.Write(ctx)
diff --git a/go/pkg/cas/upload_test.go b/go/pkg/cas/upload_test.go
index 1283ecb..ef0c54f 100644
--- a/go/pkg/cas/upload_test.go
+++ b/go/pkg/cas/upload_test.go
@@ -230,7 +230,7 @@
var gotScheduledChecks []*uploadItem
client := &Client{
- ClientConfig: DefaultClientConfig(),
+ Config: DefaultClientConfig(),
testScheduleCheck: func(ctx context.Context, item *uploadItem) error {
mu.Lock()
defer mu.Unlock()
@@ -238,8 +238,8 @@
return nil
},
}
- client.SmallFileThreshold = 5
- client.LargeFileThreshold = 10
+ client.Config.SmallFileThreshold = 5
+ client.Config.LargeFileThreshold = 10
client.init()
_, err := client.Upload(ctx, tc.opt, inputChanFrom(tc.inputs...))
@@ -301,10 +301,10 @@
}
client := &Client{
InstanceName: "projects/p/instances/i",
- ClientConfig: DefaultClientConfig(),
+ Config: DefaultClientConfig(),
cas: cas,
}
- client.FindMissingBlobs.MaxItems = 2
+ client.Config.FindMissingBlobs.MaxItems = 2
client.init()
inputC := inputChanFrom(