blob: 78a1aac8da0e4be657be424e21724a3ce00457fd [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_api
GSUTIL_CIPD_VERSION = "version:4.41"
class GSUtilApi(recipe_api.RecipeApi):
"""GSUtilApi provides support for GSUtil."""
def __call__(self, *args, **kwargs):
"""Return a step to run arbitrary gsutil command."""
assert self._gsutil_tool
name = kwargs.pop("name", "gsutil " + args[0])
infra_step = kwargs.pop("infra_step", True)
cmd_prefix = []
# Note that metadata arguments have to be passed before the command.
metadata = kwargs.pop("metadata", [])
if metadata:
for k, v in sorted(metadata.iteritems(), key=lambda (k, _): k):
field = self._get_metadata_field(k)
param = (field) if v is None else ("%s:%s" % (field, v))
cmd_prefix.extend(["-h", param])
options = kwargs.pop("options", {})
options["software_update_check_period"] = 0
if options:
for k, v in options.iteritems():
cmd_prefix.extend(["-o", "GSUtil:%s=%s" % (k, v)])
if kwargs.pop("multithreaded", False):
cmd_prefix.extend(["-m"])
return self.m.python(
name,
self._gsutil_tool,
cmd_prefix + list(args),
venv=self.resource("gsutil.vpython"),
infra_step=infra_step,
**kwargs
)
@property
def _gsutil_tool(self):
return self.m.cipd.ensure_tool("infra/gsutil", GSUTIL_CIPD_VERSION)
@recipe_api.non_step
def normalize(self, url):
"""Normalize a GS URL using the gs:// URL prefix."""
gs_prefix = "gs://"
# Defines the regex that matches a normalized URL.
for prefix in (
gs_prefix,
"https://storage.cloud.google.com/",
"https://storage.googleapis.com/",
):
if url.startswith(prefix):
return gs_prefix + url[len(prefix) :]
raise AssertionError("%s cannot be normalized" % url)
@recipe_api.non_step
def join(self, *parts):
"""Constructs a GS path from composite parts."""
return "/".join(p.strip("/") for p in parts)
def _http_url(self, bucket, dest, unauthenticated_url=False):
if unauthenticated_url:
base = "https://storage.googleapis.com/%s/%s"
else:
base = "https://storage.cloud.google.com/%s/%s"
return base % (bucket, self.m.url.quote(dest))
def _directory_listing_url(self, bucket, dest):
"""Returns the URL for a GCS bucket subdirectory listing in the GCP console."""
return "https://console.cloud.google.com/storage/browser/%s/%s" % (
bucket,
self.m.url.quote(dest),
)
@staticmethod
def _get_metadata_field(name, provider_prefix=None):
"""Returns: (str) the metadata field to use with Google Storage
The Google Storage specification for metadata can be found at:
https://developers.google.com/storage/docs/gsutil/addlhelp/WorkingWithObjectMetadata
"""
# Already contains custom provider prefix
if name.lower().startswith("x-"):
return name
# See if it's innately supported by Google Storage
if name in (
"Cache-Control",
"Content-Disposition",
"Content-Encoding",
"Content-Language",
"Content-MD5",
"Content-Type",
):
return name
# Add provider prefix
if not provider_prefix:
provider_prefix = "x-goog-meta"
return "%s-%s" % (provider_prefix, name)
def upload(
self,
bucket,
src,
dst,
link_name="gsutil.upload",
unauthenticated_url=False,
recursive=False,
no_clobber=False,
gzip_exts=(),
**kwargs
):
args = ["cp"]
if recursive:
args.append("-r")
if no_clobber:
args.append("-n")
if gzip_exts:
args.extend(["-j"] + gzip_exts)
args.extend([src, "gs://%s/%s" % (bucket, dst)])
step = self(*args, **kwargs)
if link_name:
link_url = self._http_url(
bucket, dst, unauthenticated_url=unauthenticated_url
)
step.presentation.links[link_name] = link_url
return step
def rsync(
self,
bucket,
src,
dst,
link_name="gsutil.rsync",
recursive=True,
no_clobber=False,
gzip_exts=(),
**kwargs
):
args = ["rsync"]
if recursive:
args.append("-r")
if no_clobber:
# This will skip files already existing in dst with a later
# timestamp.
args.append("-u")
if gzip_exts:
args.extend(["-j"] + gzip_exts)
args.extend([src, "gs://%s/%s" % (bucket, dst)])
step = self(*args, **kwargs)
if link_name:
link_url = self._directory_listing_url(bucket, dst)
step.presentation.links[link_name] = link_url
return step
def copy(
self,
src_bucket,
src,
dst_bucket,
dst,
link_name="gsutil.copy",
unauthenticated_url=False,
recursive=False,
**kwargs
):
args = ["cp"]
if recursive:
args.append("-r")
args.extend(
["gs://%s/%s" % (src_bucket, src), "gs://%s/%s" % (dst_bucket, dst)]
)
step = self(*args, **kwargs)
if link_name:
step.presentation.links[link_name] = self._http_url(
dst_bucket, dst, unauthenticated_url=unauthenticated_url
)
return step
def download(self, src_bucket, src, dest, recursive=False, **kwargs):
"""Downloads gcs bucket file to local disk.
Args:
src_bucket (str): gcs bucket name.
src (str): gcs file or path name.
recursive (bool): bool to indicate to copy recursively.
dest (str): local file path root to copy to.
Returns:
builder step.
"""
args = ["cp"]
if recursive:
args.append("-r")
args.extend(["gs://%s/%s" % (src_bucket, src), dest])
return self(*args, **kwargs)