blob: 23954b15b1bccde3ca9713f9060000a852722b03 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
buildbucketpb ""
gerritpb ""
resultpb ""
// maxClusterCount is the maximum number of clusters that will be analyzed. If
// there are a large number of clusters to analyze then they probably have
// similar underlying causes so it's not useful to analyze each one
// individually, which would take a really long time.
const maxClusterCount = 10
func cmdCulprit(authOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "culprit [flags] BUILD_ID [BUILD_ID]...",
ShortDesc: "Find culprit changes for CI breakages",
LongDesc: "Find culprit changes for CI breakages",
CommandRun: func() subcommands.CommandRun {
c := &culpritCmd{}
return c
type culpritCmd struct {
jsonOutputFile string
buildIDs []int64
bqClient *bigquery.Client
bbClient buildbucketpb.BuildsClient
rdbClient resultpb.ResultDBClient
gitilesClient *gitiles.Client
gerritClient *gerritMultiClient
func (c *culpritCmd) Init(defaultAuthOpts auth.Options) {
"Path to which to write output JSON. Use '-' for stdout Use '-' for stdout.")
func (c *culpritCmd) parseArgs(args []string) error {
if err := c.commonFlags.Parse(); err != nil {
return err
for _, rawBuildID := range args {
buildID, err := strconv.ParseInt(strings.TrimPrefix(rawBuildID, "b"), 10, 64)
if err != nil {
return fmt.Errorf("positional argument %q is not an integer or a valid build URL", rawBuildID)
c.buildIDs = append(c.buildIDs, buildID)
return nil
func (c *culpritCmd) Run(a subcommands.Application, args []string, _ subcommands.Env) int {
if err := c.parseArgs(args); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
if err := c.main(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
return 0
func (c *culpritCmd) initClients(ctx context.Context) error {
authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts)
tokenSource, err := authenticator.TokenSource()
if err != nil {
if err == auth.ErrLoginRequired {
fmt.Fprintf(os.Stderr, "You need to login first by running:\n")
fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " "))
return err
c.bqClient, err = bigquery.NewClient(ctx, "fuchsia-infra", option.WithTokenSource(tokenSource))
if err != nil {
return err
authClient, err := authenticator.Client()
if err != nil {
return fmt.Errorf("failed to initialize auth client: %w", err)
c.gitilesClient, err = gitiles.NewClient(
// TODO(olivernewman): Generalize the autogardener to other
// repositories.
"", "integration", authClient)
if err != nil {
return err
c.bbClient = buildbucketpb.NewBuildsPRPCClient(&prpc.Client{
C: authClient,
Host: "",
c.rdbClient = resultpb.NewResultDBPRPCClient(&prpc.Client{
C: authClient,
// TODO(olivernewman): Get the ResultDB host name from the build proto.
Host: "",
c.gerritClient = &gerritMultiClient{
authClient: authClient,
clients: make(map[gerritClientKey]*gerrit.Client),
return nil
func (c *culpritCmd) main() error {
ctx := gologger.StdConfig.Use(context.Background())
logging.SetLevel(ctx, logging.Debug)
// Print goroutine stack trace to stderr after a Ctrl-C. This is helpful for
// debugging deadlocks and slow operations.
go func() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
if err := c.initClients(ctx); err != nil {
return err
buildResults, err := c.fetchBuildResults(ctx)
if err != nil {
return err
clusters := clusterFailureModes(buildResults)
if len(clusters) > maxClusterCount {
// Choose a random maxClusterCount clusters.
newClusters := make(map[failureSignature][]buildResult)
for k, v := range clusters {
newClusters[k] = v
if len(newClusters) == maxClusterCount {
clusters = newClusters
noun := "cluster"
if len(clusters) != 1 {
noun += "s"
fmt.Printf("Found %d %s\n", len(clusters), noun)
eg, _ := errgroup.WithContext(ctx)
results := make(map[failureSignature][]suspectCommit)
for signature, buildResults := range clusters {
// Make copies of the variables that can be safely used in the closure.
signature, buildResults := signature, buildResults
eg.Go(func() error {
suspects, err := c.diagnoseCluster(ctx, signature, buildResults)
results[signature] = suspects
return err
if err := eg.Wait(); err != nil {
return err
var outputLines []string
var jsonOutput culpritOutput
// Sort results by confidence in the first suspect, so that failure modes
// with lower culprit confidence are less prominent, since they're more
// likely to not be actionable.
sortedSignatures := maps.Keys(results)
functools.SortBy(sortedSignatures, func(sig failureSignature) int {
if len(results[sig]) == 0 {
return 0
return -results[sig][0].score()
for _, signature := range sortedSignatures {
suspects := results[signature]
// Don't bother emitting markdown output for failure modes with no
// suspects. It just adds noise to the output.
if len(suspects) == 0 {
outputLines = append(outputLines, "### "+signature.FailedTest, "")
var outputSuspects []outputSuspect
// Only show the top N suspects.
for i, suspect := range suspects {
outputLines = append(outputLines, fmt.Sprintf("%2d. (%3d%%) %s", i+1, suspect.score(), suspect.gerritURL()))
outputLines = append(outputLines, " "+suspect.commitSummary())
outputSuspects = append(outputSuspects, outputSuspect{
suspectCommit: suspect,
GerritURL: suspect.gerritURL(),
ChangedFiles: suspect.changedFiles(),
CommitSummary: suspect.commitSummary(),
Score: suspect.score(),
Features: suspect.features(),
outputLines = append(outputLines, "")
jsonOutput.Clusters = append(jsonOutput.Clusters, outputCluster{
Signature: signature,
Suspects: outputSuspects,
markdownOutput := strings.Join(outputLines, "\n")
if c.jsonOutputFile == "" {
return nil
jsonOutput.MarkdownOutput = markdownOutput
rawJSON, err := json.MarshalIndent(jsonOutput, "", " ")
rawJSON = append(rawJSON, '\n')
if err != nil {
return err
var outputWriter io.Writer
if c.jsonOutputFile == "-" {
outputWriter = os.Stdout
} else {
f, err := os.Create(c.jsonOutputFile)
if err != nil {
return err
defer f.Close()
outputWriter = f
if _, err := outputWriter.Write(rawJSON); err != nil {
return err
return nil
// fetchBuildResults downloads the build proto and failed test names for each
// build.
func (c *culpritCmd) fetchBuildResults(ctx context.Context) ([]buildResult, error) {
var res []buildResult
for _, buildID := range c.buildIDs {
build, err := c.bbClient.GetBuild(ctx, &buildbucketpb.GetBuildRequest{
Id: buildID,
Mask: &buildbucketpb.BuildMask{
Fields: &fieldmaskpb.FieldMask{
Paths: []string{
if err != nil {
return nil, err
resp, err := c.rdbClient.QueryTestResults(ctx, &resultpb.QueryTestResultsRequest{
Invocations: []string{build.Infra.Resultdb.Invocation},
Predicate: &resultpb.TestResultPredicate{
// Only include non-exonerated failed tests.
Expectancy: resultpb.TestResultPredicate_VARIANTS_WITH_UNEXPECTED_RESULTS,
ReadMask: &fieldmaskpb.FieldMask{
Paths: []string{"test_id", "status", "tags", "failure_reason"},
if err != nil {
return nil, err
failedTests := functools.Filter(resp.TestResults, func(t *resultpb.TestResult) bool {
for _, tag := range t.Tags {
// Only consider top-level test suites. Test cases don't yet
// have all the metadata we care about.
if tag.Key == "test_case_count" {
return true
return false
res = append(res, buildResult{
Build: build,
FailedTests: failedTests,
return res, nil
func (c *culpritCmd) diagnoseCluster(ctx context.Context, sig failureSignature, buildResults []buildResult) ([]suspectCommit, error) {
// Sort in descending order by revision count.
functools.SortBy(buildResults, func(b buildResult) int {
irc, _ := b.integrationRevisionCount()
return -1 * irc
mostRecentBuild := buildResults[0].Build
windowEnd := mostRecentBuild.StartTime.AsTime()
commits, err := c.gitilesClient.Log(
ctx, mostRecentBuild.Input.GitilesCommit.Id, 300)
if err != nil {
return nil, err
baseCommitPosition, ok := buildResults[0].integrationRevisionCount()
if !ok {
baseCommitPosition = 0
var suspects []suspectCommit
for i, commit := range commits {
gerritChange, err := commitToGerritChange(commit)
if err != nil {
return nil, err
suspect := suspectCommit{
signature: sig,
CommitInfo: commit,
GerritChange: gerritChange,
BlamelistDistances: make(map[string]int),
TagMatchesTest: hasMatchingTag(commit.Message, sig.FailedTest),
if baseCommitPosition > 0 {
suspect.CommitPosition = baseCommitPosition - i
suspects = append(suspects, suspect)
// Used to control write access to `suspects`.
var mu sync.Mutex
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
affectingChanges, err := getChangesAffectingTest(ctx, c.bqClient, sig, windowEnd)
if err != nil {
return err
defer mu.Unlock()
for _, ac := range affectingChanges {
for i, suspect := range suspects {
if gerritChangesEqual(ac.Change, suspect.GerritChange) {
suspects[i].AffectedTest = true
return err
eg.Go(func() error {
results, err := getNearbyTestResults(ctx, c.bqClient, sig, windowEnd)
if err != nil {
return err
defer mu.Unlock()
return calculateBlamelistDistances(results, suspects)
if err := eg.Wait(); err != nil {
return nil, err
// Filter out suspects with a score of zero.
suspects = functools.Filter(suspects, func(c suspectCommit) bool {
return c.score() > 0
functools.SortBy(suspects, func(c suspectCommit) int {
return -1 * c.score()
if len(suspects) > 10 {
suspects = suspects[:10]
// Some data sources are expensive to query, so we only use them as
// additional data after the initial filtering has been done using cheaper data sources.
eg, _ = errgroup.WithContext(ctx)
eg.Go(func() error {
// The Gerrit API doesn't include changed files for the most recent
// revision by default, so we must explicitly request them.
opts := []gerritpb.QueryOption{
gerritEG, _ := errgroup.WithContext(ctx)
for i, suspect := range suspects {
// Make copies of the variables that can be safely used in the closure.
i, suspect := i, suspect
gerritEG.Go(func() error {
changeInfo, err := c.gerritClient.getChange(ctx, suspect.GerritChange, opts...)
if err != nil {
return err
suspects[i].ChangeInfo = changeInfo
return nil
return gerritEG.Wait()
if err := eg.Wait(); err != nil {
return nil, err
// If all the remaining suspects have the same score, that indicates low
// confidence in each individual suspect since we can't distinguish
// between suspects. It's also just not very helpful to print a ranking
// of suspects where each one has the same score.
uniqueScores := make(map[int]struct{})
for _, suspect := range suspects {
uniqueScores[suspect.score()] = struct{}{}
if len(uniqueScores) <= 1 {
suspects = nil
functools.SortBy(suspects, func(c suspectCommit) int {
return -1 * c.score()
return suspects, nil
// clusterFailureModes takes a set of failed builds and attempts to cluster them
// into groups by common failure reason. A single build might be included in
// multiple clusters if it had multiple failure reasons.
// If *any* cluster has more than one build, then we'll only return clusters
// that appear with more than one build. Any failure mode that appears only once
// is much less likely to be significant than a failure mode that appears
// multiple times.
func clusterFailureModes(buildResults []buildResult) map[failureSignature][]buildResult {
clusters := make(map[failureSignature][]buildResult)
for _, br := range buildResults {
if len(br.FailedTests) == 0 {
for _, test := range br.FailedTests {
sig := failureSignature{
FailedTest: test.TestId,
for _, tag := range test.Tags {
if tag.Key == "gn_label" {
sig.TestGNLabel = tag.Value
if test.FailureReason != nil {
sig.FailureReason = test.FailureReason.PrimaryErrorMessage
clusters[sig] = append(clusters[sig], br)
minRequiredSize := 1
for _, cluster := range clusters {
if len(cluster) > 1 {
minRequiredSize = 2
for sig, buildResults := range clusters {
if len(buildResults) < minRequiredSize {
delete(clusters, sig)
return clusters
func gerritChangesEqual(c1, c2 *buildbucketpb.GerritChange) bool {
if c1 == nil || c2 == nil {
return false
return c1.Host == c2.Host &&
c1.Project == c2.Project &&
c1.Change == c2.Change
func commitToGerritChange(commit *git.Commit) (*buildbucketpb.GerritChange, error) {
lines := strings.Split(commit.Message, "\n")
changeURL := parseFooter(lines, "Original-Reviewed-on")
if changeURL == "" {
changeURL = parseFooter(lines, "Reviewed-on")
if changeURL == "" {
return nil, fmt.Errorf("no reviewed-on footer for commit %s", commit.Id)
return parseGerritChangeURL(changeURL)
func parseFooter(msgLines []string, footer string) string {
prefix := fmt.Sprintf("%s: ", footer)
for _, line := range msgLines {
if strings.HasPrefix(line, prefix) {
return strings.TrimPrefix(line, prefix)
return ""
func parseGerritChangeURL(changeURL string) (*buildbucketpb.GerritChange, error) {
u, err := url.Parse(changeURL)
if err != nil {
return nil, err
path := strings.TrimPrefix(u.Path, "/c/")
project, changeNumStr, ok := strings.Cut(path, "/+/")
if !ok {
return nil, fmt.Errorf("malformed gerrit URL: %q", changeURL)
var patchset int
changeNumStr, patchsetStr, ok := strings.Cut(changeNumStr, "/")
if ok {
patchset, err = strconv.Atoi(patchsetStr)
if err != nil {
return nil, err
changeNum, err := strconv.Atoi(changeNumStr)
if err != nil {
return nil, err
return &buildbucketpb.GerritChange{
Host: u.Host,
Project: project,
Change: int64(changeNum),
Patchset: int64(patchset),
}, nil
type culpritOutput struct {
// Human-readable markdown output.
MarkdownOutput string `json:"markdown_output"`
Clusters []outputCluster
type outputCluster struct {
Signature failureSignature `json:"signature"`
Suspects []outputSuspect `json:"suspects"`
type outputSuspect struct {
GerritURL string `json:"gerrit_url"`
ChangedFiles []string `json:"changed_files,omitempty"`
CommitSummary string `json:"commit_summary"`
Score int `json:"score"`
Features []culpritFeature `json:"features"`