storage: add CopierFrom and ComposerFrom

Copying is handled via a Copier object, which provides for setting
destination attributes and accessing the rewrite token.

The API's copy and rewrite operations are collapsed into
Copier.Run. It always calls rewrite. We lose no functionality or
performance in doing so, because the storage service's copy calls
rewrite.

Multiple calls to rewrite are handled automatically. The user can
provide a callback to get progress on the number of bytes transferred,
and to stop the copy.

For consistency, there is also an ObjectHandle.ComposeFrom method and a
Composer type.

ObjectHandle.CopyTo and ObjectHandle.ComposeFrom are deprecated.

This is not a breaking change.

Change-Id: I2f3fa666af47867da0478efd2d8d872674586074
Reviewed-on: https://code-review.googlesource.com/7697
Reviewed-by: Dave Day <djd@golang.org>
diff --git a/storage/copy.go b/storage/copy.go
new file mode 100644
index 0000000..c0e4041
--- /dev/null
+++ b/storage/copy.go
@@ -0,0 +1,188 @@
+// Copyright 2016 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package storage contains a Google Cloud Storage client.
+//
+// This package is experimental and may make backwards-incompatible changes.
+package storage
+
+import (
+	"errors"
+	"fmt"
+	"reflect"
+	"unicode/utf8"
+
+	"golang.org/x/net/context"
+	raw "google.golang.org/api/storage/v1"
+)
+
+// CopierFrom creates a Copier that can copy src to dst.
+// You can immediately call Run on the returned Copier, or
+// you can configure it first.
+func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier {
+	return &Copier{dst: dst, src: src}
+}
+
+// A Copier copies a source object to a destination.
+type Copier struct {
+	// ObjectAttrs are optional attributes to set on the destination object.
+	// Any attributes must be initialized before any calls on the Copier. Nil
+	// or zero-valued attributes are ignored.
+	ObjectAttrs
+
+	// RewriteToken can be set before calling Run to resume a copy
+	// operation. After Run returns a non-nil error, RewriteToken will
+	// have been updated to contain the value needed to resume the copy.
+	RewriteToken string
+
+	// ProgressFunc can be used to monitor the progress of a multi-RPC copy
+	// operation. If ProgressFunc is not nil and CopyFrom requires multiple
+	// calls to the underlying service (see
+	// https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then
+	// ProgressFunc will be invoked after each call with the number of bytes of
+	// content copied so far and the total size in bytes of the source object.
+	//
+	// ProgressFunc is intended to make upload progress available to the
+	// application. For example, the implementation of ProgressFunc may update
+	// a progress bar in the application's UI, or log the result of
+	// float64(copiedBytes)/float64(totalBytes).
+	//
+	// ProgressFunc should return quickly without blocking.
+	ProgressFunc func(copiedBytes, totalBytes uint64)
+
+	dst, src *ObjectHandle
+}
+
+// Run performs the copy.
+func (c *Copier) Run(ctx context.Context) (*ObjectAttrs, error) {
+	// TODO(jba): add ObjectHandle.validate to do these checks.
+	if c.src.bucket == "" || c.dst.bucket == "" {
+		return nil, errors.New("storage: the source and destination bucket names must both be non-empty")
+	}
+	if c.src.object == "" || c.dst.object == "" {
+		return nil, errors.New("storage: the source and destination object names must both be non-empty")
+	}
+	if !utf8.ValidString(c.src.object) {
+		return nil, fmt.Errorf("storage: object name %q is not valid UTF-8", c.src.object)
+	}
+	if !utf8.ValidString(c.dst.object) {
+		return nil, fmt.Errorf("storage: dst name %q is not valid UTF-8", c.dst.object)
+	}
+	var rawObject *raw.Object
+	// If any attribute was set, then we make sure the name matches the destination
+	// name, and we check that ContentType is non-empty so we can provide a better
+	// error message than the service.
+	if !reflect.DeepEqual(c.ObjectAttrs, ObjectAttrs{}) {
+		c.ObjectAttrs.Name = c.dst.object
+		if c.ObjectAttrs.ContentType == "" {
+			return nil, errors.New("storage: Copier.ContentType must be non-empty")
+		}
+		rawObject = c.ObjectAttrs.toRawObject(c.dst.bucket)
+	}
+	for {
+		res, err := c.callRewrite(ctx, c.src, rawObject)
+		if err != nil {
+			return nil, err
+		}
+		if c.ProgressFunc != nil {
+			c.ProgressFunc(res.TotalBytesRewritten, res.ObjectSize)
+		}
+		if res.Done { // Finished successfully.
+			return newObject(res.Resource), nil
+		}
+	}
+	return nil, nil
+}
+
+func (c *Copier) callRewrite(ctx context.Context, src *ObjectHandle, rawObj *raw.Object) (*raw.RewriteResponse, error) {
+	call := c.dst.c.raw.Objects.Rewrite(src.bucket, src.object, c.dst.bucket, c.dst.object, rawObj)
+
+	call.Context(ctx).Projection("full")
+	if c.RewriteToken != "" {
+		call.RewriteToken(c.RewriteToken)
+	}
+	if err := applyConds("Copy destination", c.dst.conds, call); err != nil {
+		return nil, err
+	}
+	if err := applyConds("Copy source", toSourceConds(c.src.conds), call); err != nil {
+		return nil, err
+	}
+	res, err := call.Do()
+	if err != nil {
+		return nil, err
+	}
+	c.RewriteToken = res.RewriteToken
+	return res, nil
+}
+
+// ComposerFrom creates a Composer that can compose srcs into dst.
+// You can immediately call Run on the returned Composer, or you can
+// configure it first.
+func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer {
+	return &Composer{dst: dst, srcs: srcs}
+}
+
+// A Composer composes source objects into a destination object.
+type Composer struct {
+	// ObjectAttrs are optional attributes to set on the destination object.
+	// Any attributes must be initialized before any calls on the Composer. Nil
+	// or zero-valued attributes are ignored.
+	ObjectAttrs
+
+	dst  *ObjectHandle
+	srcs []*ObjectHandle
+}
+
+// Run performs the compose operation.
+func (c *Composer) Run(ctx context.Context) (*ObjectAttrs, error) {
+	if c.dst.bucket == "" || c.dst.object == "" {
+		return nil, errors.New("storage: the destination bucket and object names must be non-empty")
+	}
+	if len(c.srcs) == 0 {
+		return nil, errors.New("storage: at least one source object must be specified")
+	}
+
+	req := &raw.ComposeRequest{}
+	if !reflect.DeepEqual(c.ObjectAttrs, ObjectAttrs{}) {
+		req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
+		req.Destination.Name = c.dst.object
+	}
+
+	for _, src := range c.srcs {
+		if src.bucket != c.dst.bucket {
+			return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket)
+		}
+		if src.object == "" {
+			return nil, errors.New("storage: all source object names must be non-empty")
+		}
+		srcObj := &raw.ComposeRequestSourceObjects{
+			Name: src.object,
+		}
+		if err := applyConds("ComposeFrom source", src.conds, composeSourceObj{srcObj}); err != nil {
+			return nil, err
+		}
+		req.SourceObjects = append(req.SourceObjects, srcObj)
+	}
+
+	call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
+	if err := applyConds("ComposeFrom destination", c.dst.conds, call); err != nil {
+		return nil, err
+	}
+
+	obj, err := call.Do()
+	if err != nil {
+		return nil, err
+	}
+	return newObject(obj), nil
+}
diff --git a/storage/example_test.go b/storage/example_test.go
index 9a6bfa7..8d6060e 100644
--- a/storage/example_test.go
+++ b/storage/example_test.go
@@ -393,18 +393,74 @@
 	fmt.Println(aclRules)
 }
 
-func ExampleObjectHandle_ComposeFrom() {
+func ExampleCopier_Run() {
+	ctx := context.Background()
+	client, err := storage.NewClient(ctx)
+	if err != nil {
+		// TODO: handle error.
+	}
+	src := client.Bucket("bucketname").Object("file1")
+	dst := client.Bucket("another-bucketname").Object("file2")
+
+	// Copy content and modify metadata.
+	copier := dst.CopierFrom(src)
+	copier.ContentType = "text/plain"
+	attrs, err := copier.Run(ctx)
+	if err != nil {
+		// TODO: Handle error, possibly resuming with copier.RewriteToken.
+	}
+	fmt.Println(attrs)
+
+	// Just copy content.
+	attrs, err = dst.CopierFrom(src).Run(ctx)
+	if err != nil {
+		// TODO: Handle error. No way to resume.
+	}
+	fmt.Println(attrs)
+}
+
+func ExampleCopier_Run_progress() {
+	// Display progress across multiple rewrite RPCs.
+	ctx := context.Background()
+	client, err := storage.NewClient(ctx)
+	if err != nil {
+		// TODO: handle error.
+	}
+	src := client.Bucket("bucketname").Object("file1")
+	dst := client.Bucket("another-bucketname").Object("file2")
+
+	copier := dst.CopierFrom(src)
+	copier.ProgressFunc = func(copiedBytes, totalBytes uint64) {
+		log.Printf("copy %.1f%% done", float64(copiedBytes)/float64(totalBytes)*100)
+	}
+	if _, err := copier.Run(ctx); err != nil {
+		// TODO: handle error.
+	}
+}
+
+func ExampleComposer_Run() {
 	ctx := context.Background()
 	client, err := storage.NewClient(ctx)
 	if err != nil {
 		// TODO: handle error.
 	}
 	bkt := client.Bucket("bucketname")
-	src1 := bkt.Object("file1")
-	src2 := bkt.Object("file2")
-	dst := bkt.Object("combo")
-	_, err = dst.ComposeFrom(ctx, []*storage.ObjectHandle{src1, src2}, nil)
+	src1 := bkt.Object("o1")
+	src2 := bkt.Object("o2")
+	dst := bkt.Object("o3")
+	// Compose and modify metadata.
+	c := dst.ComposerFrom(src1, src2)
+	c.ContentType = "text/plain"
+	attrs, err := c.Run(ctx)
 	if err != nil {
-		// TODO: handle error.
+		// TODO: Handle error.
 	}
+	fmt.Println(attrs)
+
+	// Just compose..
+	attrs, err = dst.ComposerFrom(src1, src2).Run(ctx)
+	if err != nil {
+		// TODO: Handle error.
+	}
+	fmt.Println(attrs)
 }
diff --git a/storage/integration_test.go b/storage/integration_test.go
index e30bef9..6a6065e 100644
--- a/storage/integration_test.go
+++ b/storage/integration_test.go
@@ -375,12 +375,76 @@
 	copyObj, err := bkt.Object(objName).CopyTo(ctx, bkt.Object(copyName), nil)
 	if err != nil {
 		t.Errorf("CopyTo failed with %v", err)
+	} else if !namesEqual(copyObj, bucket, copyName) {
+		t.Errorf("Copy object bucket, name: got %q.%q, want %q.%q",
+			copyObj.Bucket, copyObj.Name, bucket, copyName)
 	}
-	if copyObj.Name != copyName {
-		t.Errorf("Copy object's name = %q; want %q", copyObj.Name, copyName)
+
+	// Test copying with Copier.
+	copyObj, err = bkt.Object(copyName).CopierFrom(bkt.Object(objName)).Run(ctx)
+	if err != nil {
+		t.Errorf("Copier.Run failed with %v", err)
+	} else if !namesEqual(copyObj, bucket, copyName) {
+		t.Errorf("Copy object bucket, name: got %q.%q, want %q.%q",
+			copyObj.Bucket, copyObj.Name, bucket, copyName)
 	}
-	if copyObj.Bucket != bucket {
-		t.Errorf("Copy object's bucket = %q; want %q", copyObj.Bucket, bucket)
+
+	// Check for error setting attributes but not ContentType.
+	const (
+		contentType     = "text/html"
+		contentEncoding = "identity"
+	)
+	_, err = bkt.Object(objName).CopyTo(ctx, bkt.Object(copyName), &ObjectAttrs{
+		ContentEncoding: contentEncoding,
+	})
+	if err == nil {
+		t.Error("copy without ContentType: got nil, want error")
+	}
+
+	copier := bkt.Object(copyName).CopierFrom(bkt.Object(objName))
+	copier.ContentEncoding = contentEncoding
+	_, err = copier.Run(ctx)
+	if err == nil {
+		t.Error("copy without ContentType: got nil, want error")
+	}
+
+	// Copying with attributes.
+	copyObj, err = bkt.Object(objName).CopyTo(ctx, bkt.Object(copyName), &ObjectAttrs{
+		ContentType:     contentType,
+		ContentEncoding: contentEncoding,
+	})
+	if err != nil {
+		t.Errorf("CopyTo failed with %v", err)
+	} else {
+		if !namesEqual(copyObj, bucket, copyName) {
+			t.Errorf("Copy object bucket, name: got %q.%q, want %q.%q",
+				copyObj.Bucket, copyObj.Name, bucket, copyName)
+		}
+		if copyObj.ContentType != contentType {
+			t.Errorf("Copy ContentType: got %q, want %q", copyObj.ContentType, contentType)
+		}
+		if copyObj.ContentEncoding != contentEncoding {
+			t.Errorf("Copy ContentEncoding: got %q, want %q", copyObj.ContentEncoding, contentEncoding)
+		}
+	}
+
+	copier = bkt.Object(copyName).CopierFrom(bkt.Object(objName))
+	copier.ContentType = contentType
+	copier.ContentEncoding = contentEncoding
+	copyObj, err = copier.Run(ctx)
+	if err != nil {
+		t.Errorf("Copier.Run failed with %v", err)
+	} else {
+		if !namesEqual(copyObj, bucket, copyName) {
+			t.Errorf("Copy object bucket, name: got %q.%q, want %q.%q",
+				copyObj.Bucket, copyObj.Name, bucket, copyName)
+		}
+		if copyObj.ContentType != contentType {
+			t.Errorf("Copy ContentType: got %q, want %q", copyObj.ContentType, contentType)
+		}
+		if copyObj.ContentEncoding != contentEncoding {
+			t.Errorf("Copy ContentEncoding: got %q, want %q", copyObj.ContentEncoding, contentEncoding)
+		}
 	}
 
 	// Test UpdateAttrs.
@@ -498,9 +562,9 @@
 		compSrcs = append(compSrcs, bkt.Object(obj))
 		wantContents = append(wantContents, contents[obj]...)
 	}
-	if _, err := compDst.ComposeFrom(ctx, compSrcs, &ObjectAttrs{
-		ContentType: "text/json",
-	}); err != nil {
+	c := compDst.ComposerFrom(compSrcs...)
+	c.ContentType = "text/json"
+	if _, err := c.Run(ctx); err != nil {
 		t.Fatalf("ComposeFrom error: %v", err)
 	}
 	rc, err = compDst.NewReader(ctx)
@@ -520,6 +584,10 @@
 	}
 }
 
+func namesEqual(obj *ObjectAttrs, bucketName, objectName string) bool {
+	return obj.Bucket == bucketName && obj.Name == objectName
+}
+
 func testBucketList(t *testing.T, bkt *BucketHandle, objects []string) {
 	ctx := context.Background()
 	q := &Query{Prefix: "obj"}
diff --git a/storage/storage.go b/storage/storage.go
index f516071..3707ea9 100644
--- a/storage/storage.go
+++ b/storage/storage.go
@@ -404,86 +404,28 @@
 
 // CopyTo copies the object to the given dst.
 // The copied object's attributes are overwritten by attrs if non-nil.
+//
+// Deprecated: use ObjectHandle.CopierFrom instead.
 func (o *ObjectHandle) CopyTo(ctx context.Context, dst *ObjectHandle, attrs *ObjectAttrs) (*ObjectAttrs, error) {
-	// TODO(djd): move bucket/object name validation to a single helper func.
-	if o.bucket == "" || dst.bucket == "" {
-		return nil, errors.New("storage: the source and destination bucket names must both be non-empty")
-	}
-	if o.object == "" || dst.object == "" {
-		return nil, errors.New("storage: the source and destination object names must both be non-empty")
-	}
-	if !utf8.ValidString(o.object) {
-		return nil, fmt.Errorf("storage: object name %q is not valid UTF-8", o.object)
-	}
-	if !utf8.ValidString(dst.object) {
-		return nil, fmt.Errorf("storage: dst name %q is not valid UTF-8", dst.object)
-	}
-	var rawObject *raw.Object
+	c := dst.CopierFrom(o)
 	if attrs != nil {
-		attrs.Name = dst.object
-		if attrs.ContentType == "" {
-			return nil, errors.New("storage: attrs.ContentType must be non-empty")
-		}
-		rawObject = attrs.toRawObject(dst.bucket)
+		c.ObjectAttrs = *attrs
 	}
-	call := o.c.raw.Objects.Copy(o.bucket, o.object, dst.bucket, dst.object, rawObject).Projection("full").Context(ctx)
-	if err := applyConds("CopyTo destination", dst.conds, call); err != nil {
-		return nil, err
-	}
-	if err := applyConds("CopyTo source", toSourceConds(o.conds), call); err != nil {
-		return nil, err
-	}
-	obj, err := call.Do()
-	if err != nil {
-		return nil, err
-	}
-	return newObject(obj), nil
+	return c.Run(ctx)
 }
 
 // ComposeFrom concatenates the provided slice of source objects into a new
 // object whose destination is the receiver. The provided attrs, if not nil,
 // are used to set the attributes on the newly-created object. All source
 // objects must reside within the same bucket as the destination.
+//
+// Deprecated: use ObjectHandle.ComposerFrom instead.
 func (o *ObjectHandle) ComposeFrom(ctx context.Context, srcs []*ObjectHandle, attrs *ObjectAttrs) (*ObjectAttrs, error) {
-	if o.bucket == "" || o.object == "" {
-		return nil, errors.New("storage: the destination bucket and object names must be non-empty")
-	}
-	if len(srcs) == 0 {
-		return nil, errors.New("storage: at least one source object must be specified")
-	}
-
-	req := &raw.ComposeRequest{}
+	c := o.ComposerFrom(srcs...)
 	if attrs != nil {
-		req.Destination = attrs.toRawObject(o.bucket)
-		req.Destination.Name = o.object
+		c.ObjectAttrs = *attrs
 	}
-
-	for _, src := range srcs {
-		if src.bucket != o.bucket {
-			return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", o.bucket, src.bucket)
-		}
-		if src.object == "" {
-			return nil, errors.New("storage: all source object names must be non-empty")
-		}
-		srcObj := &raw.ComposeRequestSourceObjects{
-			Name: src.object,
-		}
-		if err := applyConds("ComposeFrom source", src.conds, composeSourceObj{srcObj}); err != nil {
-			return nil, err
-		}
-		req.SourceObjects = append(req.SourceObjects, srcObj)
-	}
-
-	call := o.c.raw.Objects.Compose(o.bucket, o.object, req).Context(ctx)
-	if err := applyConds("ComposeFrom destination", o.conds, call); err != nil {
-		return nil, err
-	}
-
-	obj, err := call.Do()
-	if err != nil {
-		return nil, err
-	}
-	return newObject(obj), nil
+	return c.Run(ctx)
 }
 
 // NewReader creates a new Reader to read the contents of the
diff --git a/storage/storage_test.go b/storage/storage_test.go
index 9c256fc..dfb84db 100644
--- a/storage/storage_test.go
+++ b/storage/storage_test.go
@@ -385,7 +385,7 @@
 			func() {
 				obj.WithConditions(IfGenerationMatch(1234)).CopyTo(ctx, dst.WithConditions(IfMetaGenerationMatch(5678)), nil)
 			},
-			"POST /storage/v1/b/buck/o/obj/copyTo/b/dstbuck/o/dst?alt=json&ifMetagenerationMatch=5678&ifSourceGenerationMatch=1234&projection=full",
+			"POST /storage/v1/b/buck/o/obj/rewriteTo/b/dstbuck/o/dst?alt=json&ifMetagenerationMatch=5678&ifSourceGenerationMatch=1234&projection=full",
 		},
 	}