| # Copyright 2020 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| Utilities for requesting information for a Gerrit server via HTTPS. |
| |
| https://gerrit-review.googlesource.com/Documentation/rest-api.html |
| """ |
| |
| import base64 |
| import http.client |
| import http.cookiejar as cookielib |
| import io |
| import json |
| import logging |
| import os |
| import subprocess |
| import sys |
| import urllib |
| import urllib.error |
| import urllib.parse |
| import urllib.request |
| from typing import Any, Dict, FrozenSet, List, Optional, Tuple, cast |
| |
| import util |
| |
| LOGGER = logging.getLogger() |
| |
| # Maximum number of times to retry a failing HTTP request. |
| _MAX_HTTP_RETRIES = 5 |
| |
| # Controls the transport protocol used to communicate with Gerrit. |
| # This is parameterized primarily to enable GerritTestCase. |
| GERRIT_PROTOCOL = "https" |
| |
| |
| def read_file(path: str) -> str: |
| """Read the contents of the given file as a string.""" |
| with open(path, "rb") as f: |
| return f.read().decode("utf-8", errors="surrogateescape") |
| |
| |
| class GerritError(Exception): |
| """Exception class for errors communicating with the gerrit-on-borg service.""" |
| |
| def __init__(self, http_status: int, message: str): |
| self.http_status = http_status |
| self.message = "(%d) %s" % (self.http_status, message) |
| super().__init__(self.message) |
| |
| |
| def _QueryString(params, first_param=None): |
| """Encodes query parameters in the key:val[+key:val...] format specified here: |
| |
| https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#list-changes |
| """ |
| q = [urllib.parse.quote(first_param)] if first_param else [] |
| q.extend(["%s:%s" % (key, val) for key, val in params]) |
| return "+".join(q) |
| |
| |
| class Authenticator: |
| """Authenticator implementation that uses ".gitcookies" for token.""" |
| |
| def __init__(self) -> None: |
| # Credentials will be loaded lazily on first use. This ensures Authenticator |
| # get() can always construct an authenticator, even if something is broken. |
| # This allows 'creds-check' to proceed to actually checking creds later, |
| # rigorously (instead of blowing up with a cryptic error if they are wrong). |
| self._gitcookies: Optional[Dict[str, Tuple[str, str]]] = None |
| |
| @property |
| def gitcookies(self) -> Dict[str, Tuple[str, str]]: |
| if self._gitcookies is None: |
| self._gitcookies = self._get_gitcookies() |
| return self._gitcookies |
| |
| @classmethod |
| def get_new_password_url(cls, host: str) -> str: |
| """Generate a URL to instructions for setting up a ".gitcookies" entry.""" |
| assert not host.startswith("http") |
| # Assume *.googlesource.com pattern. |
| parts = host.split(".") |
| # Remove the suffix '-review' from the host subdomain. |
| if parts[0].endswith("-review"): |
| parts[0] = parts[0][: -len("-review")] |
| return "https://%s/new-password" % (".".join(parts)) |
| |
| @classmethod |
| def get_new_password_message(cls, host: str) -> str: |
| if host is None: |
| return ( |
| "Git host for Gerrit upload is unknown. Check your remote " |
| "and the branch your branch is tracking. This tool assumes " |
| "that you are using a git server at *.googlesource.com." |
| ) |
| url = cls.get_new_password_url(host) |
| return "You can (re)generate your credentials by visiting %s" % url |
| |
| @classmethod |
| def get_gitcookies_path(cls) -> str: |
| # Read from the environment. |
| env_path = os.getenv("GIT_COOKIES_PATH") |
| if env_path is not None: |
| return env_path |
| |
| # Attempt to read from git's config. |
| try: |
| path = subprocess.check_output( |
| ["git", "config", "--path", "http.cookiefile"] |
| ) |
| return path.decode("utf-8", "ignore").strip() |
| except subprocess.CalledProcessError: |
| # Guess a default. |
| return os.path.expanduser(os.path.join("~", ".gitcookies")) |
| |
| @classmethod |
| def _get_gitcookies(cls) -> Dict[str, Tuple[str, str]]: |
| # Read the cookies file. |
| path = cls.get_gitcookies_path() |
| if not os.path.exists(path): |
| return {} |
| try: |
| f = read_file(path) |
| except IOError: |
| return {} |
| |
| # Parse each line. |
| gitcookies: Dict[str, Tuple[str, str]] = {} |
| for line in f.splitlines(): |
| try: |
| fields = line.strip().split("\t") |
| if line.strip().startswith("#") or len(fields) != 7: |
| continue |
| domain, xpath, key, value = ( |
| fields[0], |
| fields[2], |
| fields[5], |
| fields[6], |
| ) |
| if xpath == "/" and key == "o": |
| if value.startswith("git-"): |
| login, secret_token = value.split("=", 1) |
| gitcookies[domain] = (login, secret_token) |
| else: |
| gitcookies[domain] = ("", value) |
| except (IndexError, ValueError, TypeError) as exc: |
| LOGGER.warning(exc) |
| return gitcookies |
| |
| def _get_auth_for_host(self, host: str) -> Optional[Tuple[str, str]]: |
| for domain, creds in self.gitcookies.items(): |
| if cookielib.domain_match(host, domain): # type: ignore |
| return creds |
| return None |
| |
| def get_auth_header(self, host: str) -> Optional[str]: |
| a = self._get_auth_for_host(host) |
| if a: |
| if a[0]: |
| secret = base64.b64encode( |
| ("%s:%s" % (a[0], a[1])).encode("utf-8") |
| ) |
| return "Basic %s" % secret.decode("utf-8") |
| else: |
| return "Bearer %s" % a[1] |
| return None |
| |
| def get_auth_email(self, host: str) -> Optional[str]: |
| """Best effort parsing of email to be used for auth for the given host.""" |
| a = self._get_auth_for_host(host) |
| if not a: |
| return None |
| login = a[0] |
| # login typically looks like 'git-xxx.example.com' |
| if not login.startswith("git-") or "." not in login: |
| return None |
| username, domain = login[len("git-") :].split(".", 1) |
| return "%s@%s" % (username, domain) |
| |
| |
| def EnsureAuthenticated(host: str) -> None: |
| """Attempt to determine if we are authenticated with Gerrit server.""" |
| # See if we have an authentication header available for the given host. |
| auth = Authenticator() |
| if auth.get_auth_header(host): |
| return |
| |
| # If not, print instructions and quit. |
| print( |
| 'Could not find credentials for host "%(host)s".\n' |
| "\n" |
| "Credentials are read from %(filename)s.\n" |
| "\n" |
| "%(instructions)s" |
| % { |
| "host": host, |
| "filename": auth.get_gitcookies_path(), |
| "instructions": auth.get_new_password_message(host), |
| } |
| ) |
| sys.exit(1) |
| |
| |
| class GerritHttpRequest: |
| """A HTTP request to a URL that can be issued multiple times.""" |
| |
| def __init__( |
| self, |
| url: str, |
| data: bytes = None, |
| headers: Optional[Dict[str, str]] = None, |
| method: str = "GET", |
| ): |
| self.url: str = url |
| self.data: Optional[bytes] = data |
| self.headers: Dict[str, str] = headers or {} |
| self.method: str = method |
| |
| @property |
| def host(self) -> str: |
| return urllib.parse.urlparse(self.url).netloc |
| |
| def execute(self) -> Tuple[http.client.HTTPResponse, bytes]: |
| request = urllib.request.Request( |
| self.url, data=self.data, headers=self.headers, method=self.method |
| ) |
| try: |
| response = urllib.request.urlopen(request) |
| return (response, response.read()) |
| except urllib.error.HTTPError as e: |
| return (cast(http.client.HTTPResponse, e), e.read()) |
| |
| |
| def _SendGerritHttpRequest( |
| host: str, |
| path: str, |
| reqtype: str = "GET", |
| headers: Optional[Dict[str, str]] = None, |
| body: Any = None, |
| accept_statuses: FrozenSet[int] = frozenset([200]), |
| ) -> io.StringIO: |
| """Send a request to the given Gerrit host. |
| |
| Args: |
| host: Gerrit host to connect to. |
| path: Path to send request to. |
| reqtype: HTTP request type (or, "HTTP verb"). |
| body: JSON-encodable object to send. |
| accept_statuses: Treat any of these statuses as success. Default: [200] |
| Common additions include 204, 400, and 404. |
| |
| Returns: A string buffer containing the connection's reply. |
| """ |
| headers = headers or {} |
| bare_host = host.partition(":")[0] |
| |
| # Set authentication header if available. |
| a = Authenticator().get_auth_header(bare_host) |
| if a: |
| headers.setdefault("Authorization", a) |
| |
| # If we have an authentication header, use an authenticated URL path. |
| # |
| # From Gerrit docs: "Users (and programs) can authenticate with HTTP |
| # passwords by prefixing the endpoint URL with /a/. For example to |
| # authenticate to /projects/, request the URL /a/projects/. Gerrit will use |
| # HTTP basic authentication with the HTTP password from the user’s account |
| # settings page". |
| url = path |
| if not url.startswith("/"): |
| url = "/" + url |
| if "Authorization" in headers and not url.startswith("/a/"): |
| url = "/a%s" % url |
| |
| body_bytes: Optional[bytes] = None |
| if body: |
| body_bytes = json.dumps(body, sort_keys=True).encode("utf-8") |
| headers.setdefault("Content-Type", "application/json") |
| |
| # Create a request |
| request = GerritHttpRequest( |
| urllib.parse.urljoin("%s://%s" % (GERRIT_PROTOCOL, host), url), |
| data=body_bytes, |
| headers=headers, |
| method=reqtype, |
| ) |
| |
| # Send the request, retrying if there are transient errors. |
| backoff = util.ExponentialBackoff( |
| util.Clock(), min_poll_seconds=10.0, max_poll_seconds=600.0 |
| ) |
| attempts = 0 |
| while True: |
| attempts += 1 |
| |
| # Attempt to perform the fetch. |
| response, contents_bytes = request.execute() |
| contents = contents_bytes.decode("utf-8", "replace") |
| |
| # If we have a valid status, return the contents. |
| if response.status in accept_statuses: |
| return io.StringIO(contents) |
| |
| # If the error looks transient, retry. |
| # |
| # We treat the following errors as transient: |
| # * Internal server errors (>= 500) |
| # * Errors caused by conflicts (409 "conflict"). |
| # * Quota issues (429 "too many requests") |
| if ( |
| response.status >= 500 or response.status in [409, 429] |
| ) and attempts < _MAX_HTTP_RETRIES: |
| LOGGER.warn( |
| "A transient error occurred while querying %s (%s): %s", |
| request.url, |
| request.method, |
| response.msg, |
| ) |
| backoff.wait() |
| continue |
| |
| LOGGER.debug( |
| "got response %d for %s %s", |
| response.status, |
| request.method, |
| request.url, |
| ) |
| |
| # If we got a 400 error ("bad request"), that may indicate bad authentication. |
| # |
| # Add some more context. |
| if response.status == 400: |
| raise GerritError( |
| response.status, |
| "HTTP Error: %s: %s.\n\n" |
| "This may indicate a bad request (likely caused by a bug) " |
| "or that authentication failed.\n\n" |
| "Try generating a new authentication password: <%s>.\n" |
| % ( |
| response.msg, |
| contents.strip(), |
| Authenticator.get_new_password_url(host), |
| ), |
| ) |
| |
| # Otherwise, throw a generic error. |
| raise GerritError( |
| response.status, |
| "HTTP Error: %s: %s" % (response.msg, contents.strip()), |
| ) |
| |
| |
| def _SendGerritJsonRequest( |
| host: str, |
| path: str, |
| reqtype: str = "GET", |
| headers: Optional[Dict[str, str]] = None, |
| body: Any = None, |
| accept_statuses: FrozenSet[int] = frozenset([200]), |
| ) -> Optional[Any]: |
| """Send a request to Gerrit, expecting a JSON response.""" |
| result = _SendGerritHttpRequest( |
| host, path, reqtype, headers, body, accept_statuses |
| ) |
| |
| # The first line of the response should always be: )]}' |
| s = result.readline() |
| if s and s.rstrip() != ")]}'": |
| raise GerritError(200, "Unexpected json output: %s" % s) |
| |
| # Read the rest of the response. |
| s = result.read() |
| if not s: |
| return None |
| return json.loads(s) |
| |
| |
| def QueryChanges( |
| host: str, |
| params: List[Tuple[str, str]], |
| first_param: Optional[Any] = None, |
| limit: Optional[int] = None, |
| o_params: Optional[List[Any]] = None, |
| start: Optional[int] = None, |
| ) -> List[Any]: |
| """ |
| Queries a gerrit-on-borg server for changes matching query terms. |
| |
| Args: |
| params: A list of key:value pairs for search parameters, as documented |
| here (e.g. ('is', 'owner') for a parameter 'is:owner'): |
| https://gerrit-review.googlesource.com/Documentation/user-search.html#search-operators |
| first_param: A change identifier |
| limit: Maximum number of results to return. |
| start: how many changes to skip (starting with the most recent) |
| o_params: A list of additional output specifiers, as documented here: |
| https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#list-changes |
| |
| Returns: |
| A list of json-decoded query results. |
| """ |
| # Note that no attempt is made to escape special characters; YMMV. |
| if not params and not first_param: |
| raise RuntimeError("QueryChanges requires search parameters") |
| path = "changes/?q=%s" % _QueryString(params, first_param) |
| if start: |
| path = "%s&start=%s" % (path, start) |
| if limit: |
| path = "%s&n=%d" % (path, limit) |
| if o_params: |
| path = "%s&%s" % (path, "&".join(["o=%s" % p for p in o_params])) |
| try: |
| response = _SendGerritJsonRequest(host, path) |
| except GerritError as e: |
| if e.http_status == 404: |
| # Not found. |
| return [] |
| raise |
| if response is None: |
| raise GerritError(200, "No response from Gerrit.") |
| return response |
| |
| |
| def GetGerritFetchUrl(host): |
| """Given a Gerrit host name returns URL of a Gerrit instance to fetch from.""" |
| return "%s://%s/" % (GERRIT_PROTOCOL, host) |
| |
| |
| def GetCodeReviewTbrScore(host, project): |
| """Given a Gerrit host name and project, return the Code-Review score for TBR.""" |
| conn = CreateHttpConn( |
| host, "/projects/%s" % urllib.parse.quote(project, "") |
| ) |
| project = ReadHttpJsonResponse(conn) |
| if ( |
| "labels" not in project |
| or "Code-Review" not in project["labels"] |
| or "values" not in project["labels"]["Code-Review"] |
| ): |
| return 1 |
| return max([int(x) for x in project["labels"]["Code-Review"]["values"]]) |
| |
| |
| def GetChangePageUrl(host, change_number): |
| """Given a Gerrit host name and change number, returns change page URL.""" |
| return "%s://%s/#/c/%d/" % (GERRIT_PROTOCOL, host, change_number) |
| |
| |
| def GetChangeUrl(host, change): |
| """Given a Gerrit host name and change ID, returns a URL for the change.""" |
| return "%s://%s/a/changes/%s" % (GERRIT_PROTOCOL, host, change) |
| |
| |
| def GetChange(host, change): |
| """Queries a Gerrit server for information about a single change.""" |
| path = "changes/%s" % change |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def GetChangeDetail(host, change, o_params=None): |
| """Queries a Gerrit server for extended information about a single change.""" |
| path = "changes/%s/detail" % change |
| if o_params: |
| path += "?%s" % "&".join(["o=%s" % p for p in o_params]) |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def GetChangeCommit(host, change, revision="current"): |
| """Query a Gerrit server for a revision associated with a change.""" |
| path = "changes/%s/revisions/%s/commit?links" % (change, revision) |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def GetChangeCurrentRevision(host, change): |
| """Get information about the latest revision for a given change.""" |
| return QueryChanges(host, [], change, o_params=("CURRENT_REVISION",)) |
| |
| |
| def GetChangeRevisions(host, change): |
| """Gets information about all revisions associated with a change.""" |
| return QueryChanges(host, [], change, o_params=("ALL_REVISIONS",)) |
| |
| |
| def GetChangeReview(host, change, revision=None): |
| """Gets the current review information for a change.""" |
| if not revision: |
| jmsg = GetChangeRevisions(host, change) |
| if not jmsg: |
| return None |
| elif len(jmsg) > 1: |
| raise GerritError( |
| 200, "Multiple changes found for ChangeId %s." % change |
| ) |
| revision = jmsg[0]["current_revision"] |
| path = "changes/%s/revisions/%s/review" |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def GetChangeComments(host, change): |
| """Get the line- and file-level comments on a change.""" |
| path = "changes/%s/comments" % change |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def GetRelatedChanges(host: str, change: str, revision: str = "current") -> Any: |
| """Gets information about changes related to a given change.""" |
| path = "changes/%s/revisions/%s/related" % (change, revision) |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def SubmitChange(host, change, wait_for_merge=True): |
| """Submits a Gerrit change via Gerrit.""" |
| path = "changes/%s/submit" % change |
| body = {"wait_for_merge": wait_for_merge} |
| conn = CreateHttpConn(host, path, reqtype="POST", body=body) |
| return ReadHttpJsonResponse(conn) |
| |
| |
| def SetReview( |
| host: str, |
| change: str, |
| msg: Optional[str] = None, |
| labels: Optional[Dict[str, Any]] = None, |
| notify: bool = False, |
| ready: bool = False, |
| ) -> None: |
| """Sets labels and/or adds a message to a code review.""" |
| if not msg and not labels: |
| return |
| path = "changes/%s/revisions/current/review" % change |
| body: Dict[str, Any] = {"drafts": "KEEP"} |
| if msg: |
| body["message"] = msg |
| if labels: |
| body["labels"] = labels |
| if notify: |
| body["notify"] = "ALL" if notify else "NONE" |
| if ready: |
| body["ready"] = True |
| response = _SendGerritJsonRequest(host, path, reqtype="POST", body=body) |
| if response is None: |
| raise GerritError(200, "No response from Gerrit.") |
| if labels: |
| for key, val in labels.items(): |
| if ( |
| "labels" not in response |
| or key not in response["labels"] |
| or int(response["labels"][key] != int(val)) |
| ): |
| raise GerritError( |
| 200, |
| 'Unable to set "%s" label on change %s.' % (key, change), |
| ) |
| |
| |
| def GetReviewers(host, change): |
| """Gets information about all reviewers attached to a change.""" |
| path = "changes/%s/reviewers" % change |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def GetReview(host, change, revision): |
| """Gets review information about a specific revision of a change.""" |
| path = "changes/%s/revisions/%s/review" % (change, revision) |
| return _SendGerritJsonRequest(host, path) |
| |
| |
| def ChangeIdentifier(project, change_number): |
| """Returns change identifier "project~number" suitable for |change| arg of |
| this module API. |
| |
| Such format is allows for more efficient Gerrit routing of HTTP requests, |
| comparing to specifying just change_number. |
| """ |
| assert int(change_number) |
| return "%s~%s" % (urllib.parse.quote(project, ""), change_number) |