blob: a435231658206fb13ca4aa481b759efd79b326db [file] [log] [blame]
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Utilities for requesting information for a Gerrit server via HTTPS.
https://gerrit-review.googlesource.com/Documentation/rest-api.html
"""
import base64
import http.client
import http.cookiejar as cookielib
import io
import json
import logging
import os
import re
import subprocess
import sys
import urllib
import urllib.error
import urllib.parse
import urllib.request
from typing import Tuple, Dict, Optional, List, Any, FrozenSet, Set, cast
import util
LOGGER = logging.getLogger()
# Maximum number of times to retry a failing HTTP request.
_MAX_HTTP_RETRIES = 5
# Controls the transport protocol used to communicate with Gerrit.
# This is parameterized primarily to enable GerritTestCase.
GERRIT_PROTOCOL = 'https'
def read_file(path: str) -> str:
"""Read the contents of the given file as a string."""
with open(path, 'rb') as f:
return f.read().decode('utf-8', errors='surrogateescape')
class GerritError(Exception):
"""Exception class for errors commuicating with the gerrit-on-borg service."""
def __init__(self, http_status: int, message: str):
self.http_status = http_status
self.message = '(%d) %s' % (self.http_status, message)
super().__init__(self.message)
def _QueryString(params, first_param=None):
"""Encodes query parameters in the key:val[+key:val...] format specified here:
https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#list-changes
"""
q = [urllib.parse.quote(first_param)] if first_param else []
q.extend(['%s:%s' % (key, val) for key, val in params])
return '+'.join(q)
class Authenticator:
"""Authenticator implementation that uses ".gitcookies" for token."""
def __init__(self) -> None:
# Credentials will be loaded lazily on first use. This ensures Authenticator
# get() can always construct an authenticator, even if something is broken.
# This allows 'creds-check' to proceed to actually checking creds later,
# rigorously (instead of blowing up with a cryptic error if they are wrong).
self._gitcookies: Optional[Dict[str, Tuple[str, str]]] = None
@property
def gitcookies(self) -> Dict[str, Tuple[str, str]]:
if self._gitcookies is None:
self._gitcookies = self._get_gitcookies()
return self._gitcookies
@classmethod
def get_new_password_url(cls, host: str) -> str:
"""Generate a URL to instructions for setting up a ".gitcookies" entry."""
assert not host.startswith('http')
# Assume *.googlesource.com pattern.
parts = host.split('.')
if not parts[0].endswith('-review'):
parts[0] += '-review'
return 'https://%s/new-password' % ('.'.join(parts))
@classmethod
def get_new_password_message(cls, host: str) -> str:
if host is None:
return ('Git host for Gerrit upload is unknown. Check your remote '
'and the branch your branch is tracking. This tool assumes '
'that you are using a git server at *.googlesource.com.')
url = cls.get_new_password_url(host)
return 'You can (re)generate your credentials by visiting %s' % url
@classmethod
def get_gitcookies_path(cls) -> str:
# Read from the environment.
env_path = os.getenv('GIT_COOKIES_PATH')
if env_path is not None:
return env_path
# Attempt to read from git's config.
try:
path = subprocess.check_output(
['git', 'config', '--path', 'http.cookiefile'])
return path.decode('utf-8', 'ignore').strip()
except subprocess.CalledProcessError:
# Guess a default.
return os.path.expanduser(os.path.join('~', '.gitcookies'))
@classmethod
def _get_gitcookies(cls) -> Dict[str, Tuple[str, str]]:
# Read the cookies file.
path = cls.get_gitcookies_path()
if not os.path.exists(path):
return {}
try:
f = read_file(path)
except IOError:
return {}
# Parse each line.
gitcookies: Dict[str, Tuple[str, str]] = {}
for line in f.splitlines():
try:
fields = line.strip().split('\t')
if line.strip().startswith('#') or len(fields) != 7:
continue
domain, xpath, key, value = fields[0], fields[2], fields[5], fields[6]
if xpath == '/' and key == 'o':
if value.startswith('git-'):
login, secret_token = value.split('=', 1)
gitcookies[domain] = (login, secret_token)
else:
gitcookies[domain] = ('', value)
except (IndexError, ValueError, TypeError) as exc:
LOGGER.warning(exc)
return gitcookies
def _get_auth_for_host(self, host: str) -> Optional[Tuple[str, str]]:
for domain, creds in self.gitcookies.items():
if cookielib.domain_match(host, domain): # type: ignore
return creds
return None
def get_auth_header(self, host: str) -> Optional[str]:
a = self._get_auth_for_host(host)
if a:
if a[0]:
secret = base64.b64encode(('%s:%s' % (a[0], a[1])).encode('utf-8'))
return 'Basic %s' % secret.decode('utf-8')
else:
return 'Bearer %s' % a[1]
return None
def get_auth_email(self, host: str) -> Optional[str]:
"""Best effort parsing of email to be used for auth for the given host."""
a = self._get_auth_for_host(host)
if not a:
return None
login = a[0]
# login typically looks like 'git-xxx.example.com'
if not login.startswith('git-') or '.' not in login:
return None
username, domain = login[len('git-'):].split('.', 1)
return '%s@%s' % (username, domain)
def EnsureAuthenticated(host: str) -> None:
"""Attempt to determine if we are authenticated with Gerrit server."""
# See if we have an authentication header available for the given host.
auth = Authenticator()
if auth.get_auth_header(host):
return
# If not, print instructions and quit.
print(
'Could not find credentials for host "%(host)s".\n'
'\n'
'Credentials are read from %(filename)s.\n'
'\n'
'%(instructions)s' % {
'host': host,
'filename': auth.get_gitcookies_path(),
'instructions': auth.get_new_password_message(host),
})
sys.exit(1)
class GerritHttpRequest:
"""A HTTP request to a URL that can be issued multiple times."""
def __init__(self, url: str, data: bytes = None,
headers: Optional[Dict[str, str]] = None, method: str = 'GET'):
self.url: str = url
self.data: Optional[bytes] = data
self.headers: Dict[str, str] = headers or {}
self.method: str = method
@property
def host(self) -> str:
return urllib.parse.urlparse(self.url).netloc
def execute(self) -> Tuple[http.client.HTTPResponse, bytes]:
request = urllib.request.Request(self.url, data=self.data,
headers=self.headers, method=self.method)
try:
response = urllib.request.urlopen(request)
return (response, response.read())
except urllib.error.HTTPError as e:
return (cast(http.client.HTTPResponse, e), e.read())
def _SendGerritHttpRequest(
host: str,
path: str,
reqtype: str = 'GET',
headers: Optional[Dict[str, str]] = None,
body: Any = None,
accept_statuses: FrozenSet[int] = frozenset([200]),
) -> io.StringIO:
"""Send a request to the given Gerrit host.
Args:
host: Gerrit host to connect to.
path: Path to send request to.
reqtype: HTTP request type (or, "HTTP verb").
body: JSON-encodable object to send.
accept_statuses: Treat any of these statuses as success. Default: [200]
Common additions include 204, 400, and 404.
Returns: A string buffer containing the connection's reply.
"""
headers = headers or {}
bare_host = host.partition(':')[0]
# Set authentication header if available.
a = Authenticator().get_auth_header(bare_host)
if a:
headers.setdefault('Authorization', a)
# If we have an authentication header, use an authenticated URL path.
#
# From Gerrit docs: "Users (and programs) can authenticate with HTTP
# passwords by prefixing the endpoint URL with /a/. For example to
# authenticate to /projects/, request the URL /a/projects/. Gerrit will use
# HTTP basic authentication with the HTTP password from the user’s account
# settings page".
url = path
if not url.startswith('/'):
url = '/' + url
if 'Authorization' in headers and not url.startswith('/a/'):
url = '/a%s' % url
body_bytes: Optional[bytes] = None
if body:
body_bytes = json.dumps(body, sort_keys=True).encode('utf-8')
headers.setdefault('Content-Type', 'application/json')
# Create a request
request = GerritHttpRequest(
urllib.parse.urljoin('%s://%s' % (GERRIT_PROTOCOL, host), url),
data=body_bytes,
headers=headers,
method=reqtype)
# Send the request, retrying if there are transient errors.
backoff = util.ExponentialBackoff(
util.Clock(), min_poll_seconds=10., max_poll_seconds=600.)
attempts = 0
while True:
attempts += 1
# Attempt to perform the fetch.
response, contents_bytes = request.execute()
contents = contents_bytes.decode('utf-8', 'replace')
# If we have a valid status, return the contents.
if response.status in accept_statuses:
return io.StringIO(contents)
# If the error looks transient, retry.
#
# We treat the following errors as transient:
# * Internal server errors (>= 500)
# * Errors caused by conflicts (409 "conflict").
# * Quota issues (429 "too many requests")
if ((response.status >= 500 or response.status in [409, 429]) and
attempts < _MAX_HTTP_RETRIES):
LOGGER.warn('A transient error occurred while querying %s (%s): %s',
request.url, request.method, response.msg)
backoff.wait()
continue
LOGGER.debug('got response %d for %s %s', response.status, request.method,
request.url)
# If we got a 400 error ("bad request"), that may indicate bad authentication.
#
# Add some more context.
if response.status == 400:
raise GerritError(
response.status, 'HTTP Error: %s: %s.\n\n'
'This may indicate a bad request (likely caused by a bug) '
'or that authentication failed (Check your ".gitcookies" file.)' %
(response.msg, contents.strip()))
# Otherwise, throw a generic error.
raise GerritError(response.status,
'HTTP Error: %s: %s' % (response.msg, contents.strip()))
def _SendGerritJsonRequest(
host: str,
path: str,
reqtype: str = 'GET',
headers: Optional[Dict[str, str]] = None,
body: Any = None,
accept_statuses: FrozenSet[int] = frozenset([200]),
) -> Optional[Any]:
"""Send a request to Gerrit, expecting a JSON response."""
result = _SendGerritHttpRequest(
host, path, reqtype, headers, body, accept_statuses)
# The first line of the response should always be: )]}'
s = result.readline()
if s and s.rstrip() != ")]}'":
raise GerritError(200, 'Unexpected json output: %s' % s)
# Read the rest of the response.
s = result.read()
if not s:
return None
return json.loads(s)
def QueryChanges(
host: str,
params: List[Tuple[str, str]],
first_param: Optional[Any] = None,
limit: Optional[int] = None,
o_params: Optional[List[Any]] = None,
start: Optional[int] = None
) -> List[Any]:
"""
Queries a gerrit-on-borg server for changes matching query terms.
Args:
params: A list of key:value pairs for search parameters, as documented
here (e.g. ('is', 'owner') for a parameter 'is:owner'):
https://gerrit-review.googlesource.com/Documentation/user-search.html#search-operators
first_param: A change identifier
limit: Maximum number of results to return.
start: how many changes to skip (starting with the most recent)
o_params: A list of additional output specifiers, as documented here:
https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#list-changes
Returns:
A list of json-decoded query results.
"""
# Note that no attempt is made to escape special characters; YMMV.
if not params and not first_param:
raise RuntimeError('QueryChanges requires search parameters')
path = 'changes/?q=%s' % _QueryString(params, first_param)
if start:
path = '%s&start=%s' % (path, start)
if limit:
path = '%s&n=%d' % (path, limit)
if o_params:
path = '%s&%s' % (path, '&'.join(['o=%s' % p for p in o_params]))
try:
response = _SendGerritJsonRequest(host, path)
except GerritError as e:
if e.http_status == 404:
# Not found.
return []
raise
if response is None:
raise GerritError(200, 'No response from Gerrit.')
return response
def GetGerritFetchUrl(host):
"""Given a Gerrit host name returns URL of a Gerrit instance to fetch from."""
return '%s://%s/' % (GERRIT_PROTOCOL, host)
def GetCodeReviewTbrScore(host, project):
"""Given a Gerrit host name and project, return the Code-Review score for TBR."""
conn = CreateHttpConn(
host, '/projects/%s' % urllib.parse.quote(project, ''))
project = ReadHttpJsonResponse(conn)
if ('labels' not in project
or 'Code-Review' not in project['labels']
or 'values' not in project['labels']['Code-Review']):
return 1
return max([int(x) for x in project['labels']['Code-Review']['values']])
def GetChangePageUrl(host, change_number):
"""Given a Gerrit host name and change number, returns change page URL."""
return '%s://%s/#/c/%d/' % (GERRIT_PROTOCOL, host, change_number)
def GetChangeUrl(host, change):
"""Given a Gerrit host name and change ID, returns a URL for the change."""
return '%s://%s/a/changes/%s' % (GERRIT_PROTOCOL, host, change)
def GetChange(host, change):
"""Queries a Gerrit server for information about a single change."""
path = 'changes/%s' % change
return _SendGerritJsonRequest(host, path)
def GetChangeDetail(host, change, o_params=None):
"""Queries a Gerrit server for extended information about a single change."""
path = 'changes/%s/detail' % change
if o_params:
path += '?%s' % '&'.join(['o=%s' % p for p in o_params])
return _SendGerritJsonRequest(host, path)
def GetChangeCommit(host, change, revision='current'):
"""Query a Gerrit server for a revision associated with a change."""
path = 'changes/%s/revisions/%s/commit?links' % (change, revision)
return _SendGerritJsonRequest(host, path)
def GetChangeCurrentRevision(host, change):
"""Get information about the latest revision for a given change."""
return QueryChanges(host, [], change, o_params=('CURRENT_REVISION',))
def GetChangeRevisions(host, change):
"""Gets information about all revisions associated with a change."""
return QueryChanges(host, [], change, o_params=('ALL_REVISIONS',))
def GetChangeReview(host, change, revision=None):
"""Gets the current review information for a change."""
if not revision:
jmsg = GetChangeRevisions(host, change)
if not jmsg:
return None
elif len(jmsg) > 1:
raise GerritError(200, 'Multiple changes found for ChangeId %s.' % change)
revision = jmsg[0]['current_revision']
path = 'changes/%s/revisions/%s/review'
return _SendGerritJsonRequest(host, path)
def GetChangeComments(host, change):
"""Get the line- and file-level comments on a change."""
path = 'changes/%s/comments' % change
return _SendGerritJsonRequest(host, path)
def GetRelatedChanges(host: str, change: str, revision: str = 'current') -> Any:
"""Gets information about changes related to a given change."""
path = 'changes/%s/revisions/%s/related' % (change, revision)
return _SendGerritJsonRequest(host, path)
def SubmitChange(host, change, wait_for_merge=True):
"""Submits a Gerrit change via Gerrit."""
path = 'changes/%s/submit' % change
body = {'wait_for_merge': wait_for_merge}
conn = CreateHttpConn(host, path, reqtype='POST', body=body)
return ReadHttpJsonResponse(conn)
def SetReview(
host: str,
change: str,
msg: Optional[str] = None,
labels: Optional[Dict[str, Any]] = None,
notify: bool = False,
ready: bool = False
) -> None:
"""Sets labels and/or adds a message to a code review."""
if not msg and not labels:
return
path = 'changes/%s/revisions/current/review' % change
body: Dict[str, Any] = {'drafts': 'KEEP'}
if msg:
body['message'] = msg
if labels:
body['labels'] = labels
if notify:
body['notify'] = 'ALL' if notify else 'NONE'
if ready:
body['ready'] = True
response = _SendGerritJsonRequest(host, path, reqtype='POST', body=body)
if response is None:
raise GerritError(200, 'No response from Gerrit.')
if labels:
for key, val in labels.items():
if ('labels' not in response or key not in response['labels'] or
int(response['labels'][key] != int(val))):
raise GerritError(200, 'Unable to set "%s" label on change %s.' % (
key, change))
def GetReviewers(host, change):
"""Gets information about all reviewers attached to a change."""
path = 'changes/%s/reviewers' % change
return _SendGerritJsonRequest(host, path)
def GetReview(host, change, revision):
"""Gets review information about a specific revision of a change."""
path = 'changes/%s/revisions/%s/review' % (change, revision)
return _SendGerritJsonRequest(host, path)
def ChangeIdentifier(project, change_number):
"""Returns change identifier "project~number" suitable for |change| arg of
this module API.
Such format is allows for more efficient Gerrit routing of HTTP requests,
comparing to specifying just change_number.
"""
assert int(change_number)
return '%s~%s' % (urllib.parse.quote(project, ''), change_number)