blob: ebfb2c0a0d36976d3ccc6acdfb895a3ba28a4e2e [file] [log] [blame]
#!/usr/bin/env python3
"""Checks and enforces Gerrit configuration policies.
Run with "help" for usage.
"""
# TODO(dbort): Add more policy checks, refactoring as we go.
# Possible checks:
# - Projects are either public or hidden/read-only
# - No rules.pl files exist outside of All-Projects
# - rules.pl is in sync across all hosts
# - Upstream sync configs are in place for all third_party repos
# - All upstreams point to gob-mirrored repos
# TODO(dbort): Rewrite in Go if it seems worth the trouble.
import collections
import json
import logging
import subprocess
import sys
import textwrap
class GerritHost(object):
"""Represents a remote Gerrit Git-on-Borg host."""
def __init__(self, gob_name):
self.__url = 'https://{}-review.googlesource.com/a'.format(gob_name)
@property
def url(self):
return self.__url
def __GetJson(self, query):
"""Performs a GET query against the host and returns parsed JSON.
Args:
query: The HTTP query string; e.g., '/projects/?t'.
Returns:
Parsed JSON from the body of the HTTP response.
"""
url = '{}/{}'.format(self.__url, query.lstrip('/'))
# check=True raises subprocess.CalledProcessError on failure.
args = ['gob-curl', '--nointeractive', url]
logging.debug('Run: %s', ' '.join(args))
p = subprocess.run(args, stdout=subprocess.PIPE, check=True)
stdout = p.stdout.decode('utf-8')
# Remove the first line, which contains the )]}' guard.
_, body = stdout.split('\n', 1)
return json.loads(body)
def GetProjects(self):
"""Returns a dict that maps project names to ProjectInfo entries.
https://gerrit-review.googlesource.com/Documentation/rest-api-projects.html#project-info
"""
# 't' requests "tree view", which includes the parent projects.
return self.__GetJson('/projects/?t')
class ThirdPartyParents(object):
# XParent fields:
# name: Gerrit project name
# id: Gerrit project id
# has: Project's current parent ID
# needs: Project's required parent ID, according to policy
ValidParent = collections.namedtuple('ValidParent', ('name', 'id', 'has'))
ChangeParent = collections.namedtuple('ChangeParent',
('name', 'id', 'has', 'needs'))
UnknownParent = collections.namedtuple('UnknownParent',
('name', 'id', 'has'))
# Contains sequences of ValidParent/ChangeParent/UnknownParent.
Checks = collections.namedtuple('Checks', ('valid', 'change', 'unknown'))
def __init__(self, projects):
# Only hold onto //third_party projects.
self.__projects = {
name: project
for name, project in projects.items()
if name.startswith('third_party/')
}
def __Validate(self):
valid = []
change = []
unknown = []
for name, info in self.__projects.items():
pid = info.get('id', '')
if not pid:
raise KeyError('No id for project "{}"'.format(name))
parent = info.get('parent', '')
entry = {
'name': name,
'id': pid,
'has': parent,
}
if parent in ('Third-Party-Projects',
'Third-Party-Commit-Queue-Projects'):
valid.append(self.ValidParent(**entry))
continue
elif parent == 'Public-Projects':
entry['needs'] = 'Third-Party-Projects'
change.append(self.ChangeParent(**entry))
elif parent in ('Commit-Queue', 'gerrit/commit-queue-projects'):
entry['needs'] = 'Third-Party-Commit-Queue-Projects'
change.append(self.ChangeParent(**entry))
else:
# TODO(dbort): Figure out what to do with All-Projects.
# If they're supposed to be visible, they should at
# least be Public-Projects. Look at the hidden and
# read-only states. That should be a separate policy
# even for non-third_party repos.
unknown.append(self.UnknownParent(**entry))
return self.Checks(
valid=tuple(sorted(valid)),
change=tuple(sorted(change)),
unknown=tuple(sorted(unknown)))
def Check(self):
checks = self.__Validate()
for p in checks.valid:
print(p)
for p in checks.change:
print(p)
for p in checks.unknown:
print(p)
print("%d total //third_party projects:" %
(len(checks.valid) + len(checks.change) + len(checks.unknown)))
print(" %d with valid parents" % len(checks.valid))
print(" %d need to change parents" % len(checks.change))
print(" %d with unrecognized parents" % len(checks.unknown))
if checks.unknown:
print(" [%s]" % ', '.join(
sorted(set([p.has for p in checks.unknown]))))
# TODO(dbort): Return failure if change/unknown are non-empty
def Enforce(self, gerrit_host, out_script):
"""Creates a shell script to enforce the policy.
Args:
gerrit_host: The host that the script should modify.
out_script: The file-like object to write a bash script to.
"""
checks = self.__Validate()
header = textwrap.dedent(r"""
# Generated by gerrit-policy.py:%(gen)s
set -o nounset
set -o errexit
set -o pipefail
### TODO: Set to a non-empty value
readonly BUG_ID=""
### TODO: Set DRY_RUN=0 to enable this script
readonly DRY_RUN=1
### No edits required below this line ###
readonly HOST_URL='%(host_url)s'
if [[ -z "${BUG_ID}" ]]; then
echo "ERROR: Must set BUG_ID by editing this file ($0)"
exit 1
fi
# run [<args>...]
# Prints the args, and runs them if DRY_RUN == 0
run() {
if (( DRY_RUN )); then
echo -n "DRY_RUN:"
printf " %%q" "$@"
echo ""
else
echo -n "RUNNING:"
printf " %%q" "$@"
echo ""
"$@"
return $?
fi
}
# change-parent <project-name> <new-parent>
change-parent() {
local project="$1"
local new_parent="$2"
run gob-curl \
-d '{"parent": "'${new_parent}'", "commit_message": "Set parent to '${new_parent}'\n\nBug: '${BUG_ID}'"}' \
-H "Content-Type: application/json" \
-X PUT \
"${HOST_URL}/projects/${project}/parent"
}
""" % {
'gen': self.__class__.__name__,
'host_url': gerrit_host.url
})
lines = [
'#!/bin/bash',
header,
]
if checks.valid:
lines.extend([
'# Nothing to do for %d valid projects' % len(checks.valid),
'',
])
lines.append('# %d projects to change' % len(checks.change))
for p in checks.change:
lines.append('change-parent \'%s\' \'%s\'' % (p.id, p.needs))
lines.append(
'\n# %d projects with unhandled status' % len(checks.unknown))
lines.append('\necho DONE')
out_script.write('\n'.join(lines))
def main(argv):
def Usage(argv):
print(textwrap.dedent("""\
Usage: {argv0} <command> <gob-host>
command:
check: Print repo policy violations but do not modify anything
enforce: Generate a script to fix policy violations
gob-host: "fuchsia", "cobalt-analytics", etc.
""".format(argv0=argv[0])))
if len(argv) < 3:
Usage(argv)
sys.exit(1)
if argv[1] in ('help', '-help', '--help', '-h'):
Usage(argv)
sys.exit(0)
host = GerritHost(argv[2])
projects = host.GetProjects()
tpp = ThirdPartyParents(projects)
if argv[1] == 'check':
tpp.Check()
elif argv[1] == 'enforce':
# TODO(dbort): Add a flag to allow specifying the bug ID to be injected
# into the generated script
# TODO(dbort): Make out_script a flag
out_script = '/tmp/policy.sh'
with open(out_script, 'w') as f:
tpp.Enforce(host, f)
logging.info('Enforcement script written to %s', out_script)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main(sys.argv)