[cas] Add PathSpec.Allowlist

In the Isolate use case, we need to upload a directory partially.
Currently cas package does not let doing it efficiently.

It is possible to upload individual files/directories, but then the
caller has to construct the partial merkle tree themselves. For example,
to upload "/foo" with only "/foo/bar" and "/foo/baz", but not
"/foo/qux", the client has to upload bar and baz, and then calculate the
digest of /foo manually.

It is possible to exclude some paths using UploadOptions.Prelude,
but this filtering is at the UploadOptions level, while we need it at
the PathSpec level. Placing a callback in the PathSpec level makes
caching of intermediate directories difficult. In Go, it is not possible
to tell if two function pointers represent the same function with same
captured input.

Introduce PathSpec.Allowlist which is a list of paths, relative to
PathSpec.Path, that must be uploaded. The client uploads them the usual
way, and then constructs a partial Merkle tree to upload the root, as
well as the intermediate directories between the root and the
allowlisted paths.

The allowlist does not participate in the cache key. Doing so is
possible, but would require hashing the allowlist for each PathSpec and
might be an overkill *for now*. Also it complicates the documentation.
Instead, document that if a given PathSpec pointer was passed to
Client.Upload() earlier, then the same pointer is guaranteed to work in
UploadResult.Digest(). This is in addition to structural cache key which
is used to retrieve digests of files that were uploaded implicitly.
diff --git a/go/pkg/cas/upload.go b/go/pkg/cas/upload.go
index 0e9fbdb..1141aaf 100644
--- a/go/pkg/cas/upload.go
+++ b/go/pkg/cas/upload.go
@@ -48,6 +48,17 @@
 	// Must be absolute.
 	Path string
 
+	// Allowlist is a filter for files/directories under Path.
+	// If a file is not a present in Allowlist and does not reside in a directory
+	// present in the Allowlist, then the file is ignored.
+	// This is equivalent to deleting all not-matched files/dirs before
+	// uploading.
+	//
+	// Each path in the Allowlist must be relative to UploadInput.Path.
+	//
+	// Must be empty if Path points to a regular file.
+	Allowlist []string
+
 	// Exclude is a file/dir filter. If Exclude is not nil and the
 	// absolute path of a file/dir match this regexp, then the file/dir is skipped.
 	// Forward-slash-separated paths are matched aginst the regexp: PathExclude
@@ -57,11 +68,24 @@
 	// See ErrSkip comments for more details on semantics regarding excluding symlinks .
 	Exclude *regexp.Regexp
 
-	cleanPath string
+	cleanPath      string
+	cleanAllowlist []string
+
 	// pathInfo is result of Lstat(UploadInput.Path)
 	pathInfo os.FileInfo
 
-	digest              *repb.Digest
+	// tree maps from a file/dir path to its digest and a directory node.
+	// The path is relative to UploadInput.Path.
+	//
+	// Once digests are computed successfully, guaranteed to have key ".".
+	// If allowlist is not empty, then also has a key for each clean allowlisted
+	// path, as well as each intermediate directory between the root and an
+	// allowlisted dir.
+	//
+	// The main purpose of this field is an UploadInput-local cache that couldn't
+	// be placed in uploader.fsCache because of UploadInput-specific parameters
+	// that are hard to incorporate into the cache key, namedly the allowlist.
+	tree                map[string]*digested
 	digestsComputed     chan struct{}
 	digestsComputedInit sync.Once
 	u                   *uploader
@@ -81,8 +105,13 @@
 	if in.cleanPath == "" {
 		return digest.Digest{}, errors.Errorf("Digest called too soon")
 	}
-	if relPath == "." {
-		return digest.NewFromProtoUnvalidated(in.digest), nil
+
+	relPath = filepath.Clean(relPath)
+
+	// Check if this is the root or one of the intermediate nodes in the partial
+	// Merkle tee.
+	if dig, ok := in.tree[relPath]; ok {
+		return digest.NewFromProtoUnvalidated(dig.digest), nil
 	}
 
 	absPath := filepath.Join(in.cleanPath, relPath)
@@ -120,6 +149,102 @@
 	return in.ensureDigestsComputedInited()
 }
 
+var oneDot = []string{"."}
+
+// init initializes internal fields.
+func (in *UploadInput) init(u *uploader) error {
+	in.u = u
+
+	if !filepath.IsAbs(in.Path) {
+		return errors.Errorf("%q is not absolute", in.Path)
+	}
+	in.cleanPath = filepath.Clean(in.Path)
+
+	// Do not use os.Stat() here. We want to know if it is a symlink.
+	var err error
+	if in.pathInfo, err = os.Lstat(in.cleanPath); err != nil {
+		return errors.WithStack(err)
+	}
+
+	// Process the allowlist.
+	in.tree = make(map[string]*digested, 1+len(in.Allowlist))
+	switch {
+	case len(in.Allowlist) == 0:
+		in.cleanAllowlist = oneDot
+
+	case in.pathInfo.Mode().IsRegular():
+		return errors.Errorf("the Allowlist is not supported for regular files")
+
+	default:
+		in.cleanAllowlist = make([]string, len(in.Allowlist))
+		for i, subPath := range in.Allowlist {
+			if filepath.IsAbs(subPath) {
+				return errors.Errorf("the allowlisted path %q is not relative", subPath)
+			}
+
+			cleanSubPath := filepath.Clean(subPath)
+			if cleanSubPath == ".." || strings.HasPrefix(cleanSubPath, parentDirPrefix) {
+				return errors.Errorf("the allowlisted path %q is not contained by %q", subPath, in.Path)
+			}
+			in.cleanAllowlist[i] = cleanSubPath
+		}
+	}
+	return nil
+}
+
+// partialMerkleTree ensures that for each node in in.tree, not included by any
+// other node, all its ancestors are also present in the tree. For example, if
+// the tree contains only "foo/bar" and "foo/baz", then partialMerkleTree adds
+// "foo" and ".". The latter is the root.
+//
+// All tree keys must be clean relative paths.
+// Returns prepared *uploadItems that represent the ancestors that were added to
+// the tree.
+func (in *UploadInput) partialMerkleTree() (added []*uploadItem) {
+	// Establish parent->child edges.
+	children := map[string]map[string]struct{}{}
+	for relPath := range in.tree {
+		for relPath != "." {
+			parent := dirNameRelFast(relPath)
+			if childSet, ok := children[parent]; ok {
+				childSet[relPath] = struct{}{}
+			} else {
+				children[parent] = map[string]struct{}{relPath: {}}
+			}
+			relPath = parent
+		}
+	}
+
+	// Add the missing ancestors by traversing in post-order.
+	var dfs func(relPath string) proto.Message
+	dfs = func(relPath string) proto.Message {
+		if dig, ok := in.tree[relPath]; ok {
+			return dig.dirEntry
+		}
+
+		dir := &repb.Directory{}
+		for child := range children[relPath] {
+			addDirEntry(dir, dfs(child))
+		}
+
+		// Prepare an uploadItem.
+		absPath := joinFilePathsFast(in.cleanPath, relPath)
+		item := uploadItemFromDirMsg(absPath, dir) // normalizes the dir
+		added = append(added, item)
+
+		// Compute a directory entry for the parent.
+		node := &repb.DirectoryNode{
+			Name:   filepath.Base(absPath),
+			Digest: item.Digest,
+		}
+
+		in.tree[relPath] = &digested{dirEntry: node, digest: item.Digest}
+		return node
+	}
+	dfs(".")
+	return added
+}
+
 // TransferStats is upload/download statistics.
 type TransferStats struct {
 	CacheHits   DigestStat
@@ -308,24 +433,61 @@
 	if !filepath.IsAbs(in.Path) {
 		return errors.Errorf("%q is not absolute", in.Path)
 	}
-	in.u = u
-	in.cleanPath = filepath.Clean(in.Path)
+
+	if err := in.init(u); err != nil {
+		return errors.WithStack(err)
+	}
 
 	// Schedule a file system walk.
 	u.wgFS.Add(1)
 	u.eg.Go(func() error {
 		defer u.wgFS.Done()
-		// Do not use os.Stat() here. We want to know if it is a symlink.
-		var err error
-		if in.pathInfo, err = os.Lstat(in.cleanPath); err != nil {
+
+		// Concurrently visit each allowlisted path, and use the results to
+		// construct a partial Merkle tree. Note that we are not visiting
+		// the entire in.cleanPath, which may be much larger than the union of the
+		// allowlisted paths.
+		localEg, ctx := errgroup.WithContext(ctx)
+		var treeMu sync.Mutex
+		for _, relPath := range in.cleanAllowlist {
+			relPath := relPath
+			// Schedule a file system walk.
+			localEg.Go(func() error {
+				absPath := in.cleanPath
+				info := in.pathInfo
+				if relPath != "." {
+					absPath = joinFilePathsFast(in.cleanPath, relPath)
+					var err error
+					// TODO(nodir): cache this syscall too.
+					if info, err = os.Lstat(absPath); err != nil {
+						return errors.WithStack(err)
+					}
+				}
+
+				switch dig, err := u.visitPath(ctx, absPath, info, in.Exclude); {
+				case err != nil:
+					return errors.Wrapf(err, "%q", absPath)
+				case dig != nil:
+					treeMu.Lock()
+					in.tree[relPath] = dig
+					treeMu.Unlock()
+				}
+				return nil
+			})
+		}
+		if err := localEg.Wait(); err != nil {
 			return errors.WithStack(err)
 		}
 
-		dig, err := u.visitPath(ctx, in.cleanPath, in.pathInfo, in.Exclude)
-		if err != nil {
-			return errors.Wrapf(err, "%q", in.Path)
+		// At this point, all allowlisted paths are digest'ed, and we only need to
+		// compute a partial Merkle tree and upload the implied ancestors.
+		for _, item := range in.partialMerkleTree() {
+			if err := u.scheduleCheck(ctx, item); err != nil {
+				return err
+			}
 		}
-		in.digest = dig.digest
+
+		// The entire tree is digested. Notify the caller.
 		close(in.ensureDigestsComputedInited())
 		return nil
 	})
@@ -973,11 +1135,17 @@
 	return item
 }
 
-const pathSep = string(filepath.Separator)
+const (
+	pathSep         = string(filepath.Separator)
+	parentDirPrefix = ".." + pathSep
+)
 
 // joinFilePathsFast is a faster version of filepath.Join because it does not
 // call filepath.Clean. Assumes arguments are clean according to filepath.Clean specs.
 func joinFilePathsFast(a, b string) string {
+	if b == "." {
+		return a
+	}
 	if strings.HasSuffix(a, pathSep) {
 		// May happen if a is the root.
 		return a + b
@@ -985,6 +1153,17 @@
 	return a + pathSep + b
 }
 
+// dirNameRelFast is a faster version of filepath.Dir because it does not call
+// filepath.Clean. Assumes the argument is clean and relative.
+// Does not work for absolute paths.
+func dirNameRelFast(relPath string) string {
+	i := strings.LastIndex(relPath, pathSep)
+	if i < 0 {
+		return "."
+	}
+	return relPath[:i]
+}
+
 func marshalledFieldSize(size int64) int64 {
 	return 1 + int64(proto.SizeVarint(uint64(size))) + size
 }
diff --git a/go/pkg/cas/upload_test.go b/go/pkg/cas/upload_test.go
index 5f00718..27d4559 100644
--- a/go/pkg/cas/upload_test.go
+++ b/go/pkg/cas/upload_test.go
@@ -39,12 +39,30 @@
 	putFile(t, filepath.Join(tmpDir, "root", "subdir", "c"), "c")
 	cItem := uploadItemFromBlob(filepath.Join(tmpDir, "root", "subdir", "c"), []byte("c"))
 
+	putFile(t, filepath.Join(tmpDir, "root", "subdir", "d"), "d")
+	dItem := uploadItemFromBlob(filepath.Join(tmpDir, "root", "subdir", "d"), []byte("d"))
+
 	subdirItem := uploadItemFromDirMsg(filepath.Join(tmpDir, "root", "subdir"), &repb.Directory{
-		Files: []*repb.FileNode{{
-			Name:   "c",
-			Digest: cItem.Digest,
-		}},
+		Files: []*repb.FileNode{
+			{
+				Name:   "c",
+				Digest: cItem.Digest,
+			},
+			{
+				Name:   "d",
+				Digest: dItem.Digest,
+			},
+		},
 	})
+	subdirWithoutDItem := uploadItemFromDirMsg(filepath.Join(tmpDir, "root", "subdir"), &repb.Directory{
+		Files: []*repb.FileNode{
+			{
+				Name:   "c",
+				Digest: cItem.Digest,
+			},
+		},
+	})
+
 	rootItem := uploadItemFromDirMsg(filepath.Join(tmpDir, "root"), &repb.Directory{
 		Files: []*repb.FileNode{
 			{Name: "a", Digest: aItem.Digest},
@@ -68,6 +86,15 @@
 			{Name: "b", Digest: bItem.Digest},
 		},
 	})
+	rootWithoutDItem := uploadItemFromDirMsg(filepath.Join(tmpDir, "root"), &repb.Directory{
+		Files: []*repb.FileNode{
+			{Name: "a", Digest: aItem.Digest},
+			{Name: "b", Digest: bItem.Digest},
+		},
+		Directories: []*repb.DirectoryNode{
+			{Name: "subdir", Digest: subdirWithoutDItem.Digest},
+		},
+	})
 
 	putFile(t, filepath.Join(tmpDir, "medium-dir", "medium"), "medium")
 	mediumItem := uploadItemFromBlob(filepath.Join(tmpDir, "medium-dir", "medium"), []byte("medium"))
@@ -129,7 +156,7 @@
 			desc:                "root",
 			inputs:              []*UploadInput{{Path: filepath.Join(tmpDir, "root")}},
 			wantDigests:         digSlice(rootItem),
-			wantScheduledChecks: []*uploadItem{rootItem, aItem, bItem, subdirItem, cItem},
+			wantScheduledChecks: []*uploadItem{rootItem, aItem, bItem, subdirItem, cItem, dItem},
 		},
 		{
 			desc:        "root-without-a-using-callback",
@@ -143,7 +170,25 @@
 					return nil
 				},
 			},
-			wantScheduledChecks: []*uploadItem{rootWithoutAItem, bItem, subdirItem, cItem},
+			wantScheduledChecks: []*uploadItem{rootWithoutAItem, bItem, subdirItem, cItem, dItem},
+		},
+		{
+			desc:                "root-without-a-using-allowlist",
+			inputs:              []*UploadInput{{Path: filepath.Join(tmpDir, "root"), Allowlist: []string{"b", "subdir"}}},
+			wantDigests:         digSlice(rootWithoutAItem),
+			wantScheduledChecks: []*uploadItem{rootWithoutAItem, bItem, subdirItem, cItem, dItem},
+		},
+		{
+			desc:                "root-without-subdir-using-allowlist",
+			inputs:              []*UploadInput{{Path: filepath.Join(tmpDir, "root"), Allowlist: []string{"a", "b"}}},
+			wantDigests:         digSlice(rootWithoutSubdirItem),
+			wantScheduledChecks: []*uploadItem{rootWithoutSubdirItem, aItem, bItem},
+		},
+		{
+			desc:                "root-without-d-using-allowlist",
+			inputs:              []*UploadInput{{Path: filepath.Join(tmpDir, "root"), Allowlist: []string{"a", "b", filepath.Join("subdir", "c")}}},
+			wantDigests:         digSlice(rootWithoutDItem),
+			wantScheduledChecks: []*uploadItem{rootWithoutDItem, aItem, bItem, subdirWithoutDItem, cItem},
 		},
 		{
 			desc: "root-without-b-using-exclude",
@@ -152,7 +197,7 @@
 				Exclude: regexp.MustCompile(`[/\\]a$`),
 			}},
 			wantDigests:         digSlice(rootWithoutAItem),
-			wantScheduledChecks: []*uploadItem{rootWithoutAItem, bItem, subdirItem, cItem},
+			wantScheduledChecks: []*uploadItem{rootWithoutAItem, bItem, subdirItem, cItem, dItem},
 		},
 		{
 			desc: "same-regular-file-is-read-only-once",
@@ -171,7 +216,7 @@
 			// OnDigest is called for each UploadItem separately.
 			wantDigests: digSlice(rootItem, rootItem),
 			// Directories are checked twice, but files are checked only once.
-			wantScheduledChecks: []*uploadItem{rootItem, rootItem, aItem, bItem, subdirItem, subdirItem, cItem},
+			wantScheduledChecks: []*uploadItem{rootItem, rootItem, aItem, bItem, subdirItem, subdirItem, cItem, dItem},
 		},
 		{
 			desc:   "root-without-subdir",
@@ -198,13 +243,13 @@
 			opt:                 UploadOptions{PreserveSymlinks: true},
 			inputs:              []*UploadInput{{Path: filepath.Join(tmpDir, "with-symlinks")}},
 			wantDigests:         digSlice(withSymlinksItemPreserved),
-			wantScheduledChecks: []*uploadItem{aItem, subdirItem, cItem, withSymlinksItemPreserved},
+			wantScheduledChecks: []*uploadItem{aItem, subdirItem, cItem, dItem, withSymlinksItemPreserved},
 		},
 		{
 			desc:                "symlinks-not-preserved",
 			inputs:              []*UploadInput{{Path: filepath.Join(tmpDir, "with-symlinks")}},
 			wantDigests:         digSlice(withSymlinksItemNotPreserved),
-			wantScheduledChecks: []*uploadItem{aItem, subdirItem, cItem, withSymlinksItemNotPreserved},
+			wantScheduledChecks: []*uploadItem{aItem, subdirItem, cItem, dItem, withSymlinksItemNotPreserved},
 		},
 		{
 			desc:    "dangling-symlinks-disallow",
@@ -299,6 +344,7 @@
 	putFile(t, filepath.Join(tmpDir, "root", "a"), "a")
 	putFile(t, filepath.Join(tmpDir, "root", "b"), "b")
 	putFile(t, filepath.Join(tmpDir, "root", "subdir", "c"), "c")
+	putFile(t, filepath.Join(tmpDir, "root", "subdir", "d"), "d")
 
 	e, cleanup := fakes.NewTestEnv(t)
 	defer cleanup()
@@ -312,35 +358,64 @@
 		t.Fatal(err)
 	}
 
-	in := &UploadInput{Path: filepath.Join(tmpDir, "root")}
-	if in.DigestsComputed() == nil {
-		t.Fatalf("DigestCopmuted() returned nil")
+	inputs := []struct {
+		input       *UploadInput
+		wantDigests map[string]digest.Digest
+	}{
+		{
+			input: &UploadInput{
+				Path:      filepath.Join(tmpDir, "root"),
+				Allowlist: []string{"a", "b", filepath.Join("subdir", "c")},
+			},
+			wantDigests: map[string]digest.Digest{
+				".":      {Hash: "9a0af914385de712675cd780ae2dcb5e17b8943dc62cf9fc6fbf8ccd6f8c940d", Size: 230},
+				"a":      {Hash: "ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb", Size: 1},
+				"subdir": {Hash: "2d5c8ba78600fcadae65bab790bdf1f6f88278ec4abe1dc3aa7c26e60137dfc8", Size: 75},
+			},
+		},
+		{
+			input: &UploadInput{
+				Path:      filepath.Join(tmpDir, "root"),
+				Allowlist: []string{"a", "b", filepath.Join("subdir", "d")},
+			},
+			wantDigests: map[string]digest.Digest{
+				".":      {Hash: "2ab9cc3c9d504c883a66da62b57eb44fc9ca57abe05e75633b435e017920d8df", Size: 230},
+				"a":      {Hash: "ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb", Size: 1},
+				"subdir": {Hash: "ce33c7475f9ff2f2ee501eafcb2f21825b24a63de6fbabf7fbb886d606a448b9", Size: 75},
+			},
+		},
 	}
 
-	if _, err := client.Upload(ctx, UploadOptions{}, uploadInputChanFrom(in)); err != nil {
+	uploadInputs := make([]*UploadInput, len(inputs))
+	for i, in := range inputs {
+		uploadInputs[i] = in.input
+		if in.input.DigestsComputed() == nil {
+			t.Fatalf("DigestCopmuted() returned nil")
+		}
+	}
+
+	if _, err := client.Upload(ctx, UploadOptions{}, uploadInputChanFrom(uploadInputs...)); err != nil {
 		t.Fatal(err)
 	}
 
-	select {
-	case <-in.DigestsComputed():
-		// Good
-	case <-time.After(time.Second):
-		t.Errorf("Upload succeeded, but DigestsComputed() is not closed")
-	}
-
-	wantDigests := map[string]digest.Digest{
-		".":      {Hash: "9a0af914385de712675cd780ae2dcb5e17b8943dc62cf9fc6fbf8ccd6f8c940d", Size: 230},
-		"a":      {Hash: "ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb", Size: 1},
-		"subdir": {Hash: "2d5c8ba78600fcadae65bab790bdf1f6f88278ec4abe1dc3aa7c26e60137dfc8", Size: 75},
-	}
-	for relPath, wantDig := range wantDigests {
-		gotDig, err := in.Digest(relPath)
-		if err != nil {
-			t.Error(err)
-			continue
+	for i, in := range inputs {
+		t.Logf("input %d", i)
+		select {
+		case <-in.input.DigestsComputed():
+			// Good
+		case <-time.After(time.Second):
+			t.Errorf("Upload succeeded, but DigestsComputed() is not closed")
 		}
-		if diff := cmp.Diff(wantDig, gotDig); diff != "" {
-			t.Errorf("unexpected digest for %s (-want +got):\n%s", relPath, diff)
+
+		for relPath, wantDig := range in.wantDigests {
+			gotDig, err := in.input.Digest(relPath)
+			if err != nil {
+				t.Error(err)
+				continue
+			}
+			if diff := cmp.Diff(gotDig, wantDig); diff != "" {
+				t.Errorf("unexpected digest for %s (-want +got):\n%s", relPath, diff)
+			}
 		}
 	}
 }
@@ -448,10 +523,11 @@
 }
 
 func TestStreaming(t *testing.T) {
-	// TODO(nodir): add tests for retries.
 	t.Parallel()
 	ctx := context.Background()
 
+	// TODO(nodir): add tests for retries.
+
 	e, cleanup := fakes.NewTestEnv(t)
 	defer cleanup()
 	conn, err := e.Server.NewClientConn(ctx)
@@ -505,6 +581,234 @@
 	}
 }
 
+func TestPartialMerkleTree(t *testing.T) {
+	t.Parallel()
+
+	mustDigest := func(m proto.Message) *repb.Digest {
+		d, err := digest.NewFromMessage(m)
+		if err != nil {
+			t.Fatal(err)
+		}
+		return d.ToProto()
+	}
+
+	type testCase struct {
+		tree      map[string]*digested
+		wantItems []*uploadItem
+	}
+
+	test := func(t *testing.T, tc testCase) {
+		in := &UploadInput{
+			tree:      tc.tree,
+			cleanPath: "/",
+		}
+		gotItems := in.partialMerkleTree()
+		sort.Slice(gotItems, func(i, j int) bool {
+			return gotItems[i].Title < gotItems[j].Title
+		})
+
+		if diff := cmp.Diff(tc.wantItems, gotItems, cmp.Comparer(compareUploadItems)); diff != "" {
+			t.Errorf("unexpected digests (-want +got):\n%s", diff)
+		}
+	}
+
+	t.Run("works", func(t *testing.T) {
+		barDigest := digest.NewFromBlob([]byte("bar")).ToProto()
+		bazDigest := mustDigest(&repb.Directory{})
+
+		foo := &repb.Directory{
+			Files: []*repb.FileNode{{
+				Name:   "bar",
+				Digest: barDigest,
+			}},
+			Directories: []*repb.DirectoryNode{{
+				Name:   "baz",
+				Digest: bazDigest,
+			}},
+		}
+
+		root := &repb.Directory{
+			Directories: []*repb.DirectoryNode{{
+				Name:   "foo",
+				Digest: mustDigest(foo),
+			}},
+		}
+
+		test(t, testCase{
+			tree: map[string]*digested{
+				"foo/bar": {
+					dirEntry: &repb.FileNode{
+						Name:   "bar",
+						Digest: barDigest,
+					},
+					digest: barDigest,
+				},
+				"foo/baz": {
+					dirEntry: &repb.DirectoryNode{
+						Name:   "baz",
+						Digest: bazDigest,
+					},
+					digest: bazDigest,
+				},
+			},
+			wantItems: []*uploadItem{
+				uploadItemFromDirMsg("/", root),
+				uploadItemFromDirMsg("/foo", foo),
+			},
+		})
+	})
+
+	t.Run("redundant info in the tree", func(t *testing.T) {
+		barDigest := mustDigest(&repb.Directory{})
+		barNode := &repb.DirectoryNode{
+			Name:   "bar",
+			Digest: barDigest,
+		}
+		foo := &repb.Directory{
+			Directories: []*repb.DirectoryNode{barNode},
+		}
+		root := &repb.Directory{
+			Directories: []*repb.DirectoryNode{{
+				Name:   "foo",
+				Digest: mustDigest(foo),
+			}},
+		}
+
+		test(t, testCase{
+			tree: map[string]*digested{
+				"foo/bar": {dirEntry: barNode, digest: barDigest},
+				// Redundant
+				"foo/bar/baz": {}, // content doesn't matter
+			},
+			wantItems: []*uploadItem{
+				uploadItemFromDirMsg("/", root),
+				uploadItemFromDirMsg("/foo", foo),
+			},
+		})
+	})
+
+	t.Run("nodes at different levels", func(t *testing.T) {
+		barDigest := digest.NewFromBlob([]byte("bar")).ToProto()
+		barNode := &repb.FileNode{
+			Name:   "bar",
+			Digest: barDigest,
+		}
+
+		bazDigest := digest.NewFromBlob([]byte("bar")).ToProto()
+		bazNode := &repb.FileNode{
+			Name:   "baz",
+			Digest: bazDigest,
+		}
+
+		foo := &repb.Directory{
+			Files: []*repb.FileNode{barNode},
+		}
+		root := &repb.Directory{
+			Directories: []*repb.DirectoryNode{{
+				Name:   "foo",
+				Digest: mustDigest(foo),
+			}},
+			Files: []*repb.FileNode{bazNode},
+		}
+
+		test(t, testCase{
+			tree: map[string]*digested{
+				"foo/bar": {dirEntry: barNode, digest: barDigest},
+				"baz":     {dirEntry: bazNode, digest: bazDigest}, // content doesn't matter
+			},
+			wantItems: []*uploadItem{
+				uploadItemFromDirMsg("/", root),
+				uploadItemFromDirMsg("/foo", foo),
+			},
+		})
+	})
+}
+
+func TestUploadInputInit(t *testing.T) {
+	t.Parallel()
+	absPath := filepath.Join(t.TempDir(), "foo")
+	testCases := []struct {
+		desc               string
+		in                 *UploadInput
+		dir                bool
+		wantCleanAllowlist []string
+		wantErrContain     string
+	}{
+		{
+			desc: "valid",
+			in:   &UploadInput{Path: absPath},
+		},
+		{
+			desc:           "relative path",
+			in:             &UploadInput{Path: "foo"},
+			wantErrContain: "not absolute",
+		},
+		{
+			desc:           "relative path",
+			in:             &UploadInput{Path: "foo"},
+			wantErrContain: "not absolute",
+		},
+		{
+			desc:           "regular file with allowlist",
+			in:             &UploadInput{Path: absPath, Allowlist: []string{"x"}},
+			wantErrContain: "the Allowlist is not supported for regular files",
+		},
+		{
+			desc:               "not clean allowlisted path",
+			in:                 &UploadInput{Path: absPath, Allowlist: []string{"bar/"}},
+			dir:                true,
+			wantCleanAllowlist: []string{"bar"},
+		},
+		{
+			desc:           "absolute allowlisted path",
+			in:             &UploadInput{Path: absPath, Allowlist: []string{"/bar"}},
+			dir:            true,
+			wantErrContain: "not relative",
+		},
+		{
+			desc:           "parent dir in allowlisted path",
+			in:             &UploadInput{Path: absPath, Allowlist: []string{"bar/../.."}},
+			dir:            true,
+			wantErrContain: "..",
+		},
+		{
+			desc:               "no allowlist",
+			in:                 &UploadInput{Path: absPath},
+			dir:                true,
+			wantCleanAllowlist: []string{"."},
+		},
+	}
+
+	for _, tc := range testCases {
+		tc := tc
+		t.Run(tc.desc, func(t *testing.T) {
+			tmpFilePath := absPath
+			if tc.dir {
+				tmpFilePath = filepath.Join(absPath, "bar")
+			}
+			putFile(t, tmpFilePath, "")
+			defer os.RemoveAll(absPath)
+
+			err := tc.in.init(&uploader{})
+			if tc.wantErrContain == "" {
+				if err != nil {
+					t.Error(err)
+				}
+			} else {
+				if err == nil || !strings.Contains(err.Error(), tc.wantErrContain) {
+					t.Errorf("expected err to contain %q; got %v", tc.wantErrContain, err)
+				}
+			}
+
+			if len(tc.wantCleanAllowlist) != 0 {
+				if diff := cmp.Diff(tc.wantCleanAllowlist, tc.in.cleanAllowlist); diff != "" {
+					t.Errorf("unexpected cleanAllowlist (-want +got):\n%s", diff)
+				}
+			}
+		})
+	}
+}
+
 func compareUploadItems(x, y *uploadItem) bool {
 	return x.Title == y.Title &&
 		proto.Equal(x.Digest, y.Digest) &&