blob: 23954b15b1bccde3ca9713f9060000a852722b03 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"os/signal"
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
"cloud.google.com/go/bigquery"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/common/proto/git"
"go.chromium.org/luci/grpc/prpc"
resultpb "go.chromium.org/luci/resultdb/proto/v1"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"go.fuchsia.dev/infra/functools"
"go.fuchsia.dev/infra/gerrit"
"go.fuchsia.dev/infra/gitiles"
)
// maxClusterCount is the maximum number of clusters that will be analyzed. If
// there are a large number of clusters to analyze then they probably have
// similar underlying causes so it's not useful to analyze each one
// individually, which would take a really long time.
const maxClusterCount = 10
func cmdCulprit(authOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "culprit [flags] BUILD_ID [BUILD_ID]...",
ShortDesc: "Find culprit changes for CI breakages",
LongDesc: "Find culprit changes for CI breakages",
CommandRun: func() subcommands.CommandRun {
c := &culpritCmd{}
c.Init(authOpts)
return c
},
}
}
type culpritCmd struct {
commonFlags
jsonOutputFile string
buildIDs []int64
bqClient *bigquery.Client
bbClient buildbucketpb.BuildsClient
rdbClient resultpb.ResultDBClient
gitilesClient *gitiles.Client
gerritClient *gerritMultiClient
}
func (c *culpritCmd) Init(defaultAuthOpts auth.Options) {
c.commonFlags.Init(defaultAuthOpts)
c.Flags.StringVar(
&c.jsonOutputFile,
"json-output",
"",
"Path to which to write output JSON. Use '-' for stdout Use '-' for stdout.")
}
func (c *culpritCmd) parseArgs(args []string) error {
if err := c.commonFlags.Parse(); err != nil {
return err
}
for _, rawBuildID := range args {
buildID, err := strconv.ParseInt(strings.TrimPrefix(rawBuildID, "b"), 10, 64)
if err != nil {
return fmt.Errorf("positional argument %q is not an integer or a valid build URL", rawBuildID)
}
c.buildIDs = append(c.buildIDs, buildID)
}
return nil
}
func (c *culpritCmd) Run(a subcommands.Application, args []string, _ subcommands.Env) int {
if err := c.parseArgs(args); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
if err := c.main(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
return 0
}
func (c *culpritCmd) initClients(ctx context.Context) error {
authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts)
tokenSource, err := authenticator.TokenSource()
if err != nil {
if err == auth.ErrLoginRequired {
fmt.Fprintf(os.Stderr, "You need to login first by running:\n")
fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " "))
}
return err
}
c.bqClient, err = bigquery.NewClient(ctx, "fuchsia-infra", option.WithTokenSource(tokenSource))
if err != nil {
return err
}
authClient, err := authenticator.Client()
if err != nil {
return fmt.Errorf("failed to initialize auth client: %w", err)
}
c.gitilesClient, err = gitiles.NewClient(
// TODO(olivernewman): Generalize the autogardener to other
// repositories.
"turquoise-internal.googlesource.com", "integration", authClient)
if err != nil {
return err
}
c.bbClient = buildbucketpb.NewBuildsPRPCClient(&prpc.Client{
C: authClient,
Host: "cr-buildbucket.appspot.com",
})
c.rdbClient = resultpb.NewResultDBPRPCClient(&prpc.Client{
C: authClient,
// TODO(olivernewman): Get the ResultDB host name from the build proto.
Host: "results.api.cr.dev",
})
c.gerritClient = &gerritMultiClient{
authClient: authClient,
clients: make(map[gerritClientKey]*gerrit.Client),
}
return nil
}
func (c *culpritCmd) main() error {
ctx := gologger.StdConfig.Use(context.Background())
logging.SetLevel(ctx, logging.Debug)
// Print goroutine stack trace to stderr after a Ctrl-C. This is helpful for
// debugging deadlocks and slow operations.
go func() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()
<-ctx.Done()
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
}()
if err := c.initClients(ctx); err != nil {
return err
}
buildResults, err := c.fetchBuildResults(ctx)
if err != nil {
return err
}
clusters := clusterFailureModes(buildResults)
if len(clusters) > maxClusterCount {
// Choose a random maxClusterCount clusters.
newClusters := make(map[failureSignature][]buildResult)
for k, v := range clusters {
newClusters[k] = v
if len(newClusters) == maxClusterCount {
break
}
}
clusters = newClusters
}
noun := "cluster"
if len(clusters) != 1 {
noun += "s"
}
fmt.Printf("Found %d %s\n", len(clusters), noun)
eg, _ := errgroup.WithContext(ctx)
results := make(map[failureSignature][]suspectCommit)
for signature, buildResults := range clusters {
// Make copies of the variables that can be safely used in the closure.
signature, buildResults := signature, buildResults
eg.Go(func() error {
suspects, err := c.diagnoseCluster(ctx, signature, buildResults)
results[signature] = suspects
return err
})
}
if err := eg.Wait(); err != nil {
return err
}
var outputLines []string
var jsonOutput culpritOutput
// Sort results by confidence in the first suspect, so that failure modes
// with lower culprit confidence are less prominent, since they're more
// likely to not be actionable.
sortedSignatures := maps.Keys(results)
functools.SortBy(sortedSignatures, func(sig failureSignature) int {
if len(results[sig]) == 0 {
return 0
}
return -results[sig][0].score()
})
for _, signature := range sortedSignatures {
suspects := results[signature]
// Don't bother emitting markdown output for failure modes with no
// suspects. It just adds noise to the output.
if len(suspects) == 0 {
continue
}
outputLines = append(outputLines, "### "+signature.FailedTest, "")
var outputSuspects []outputSuspect
// Only show the top N suspects.
for i, suspect := range suspects {
outputLines = append(outputLines, fmt.Sprintf("%2d. (%3d%%) %s", i+1, suspect.score(), suspect.gerritURL()))
outputLines = append(outputLines, " "+suspect.commitSummary())
outputSuspects = append(outputSuspects, outputSuspect{
suspectCommit: suspect,
GerritURL: suspect.gerritURL(),
ChangedFiles: suspect.changedFiles(),
CommitSummary: suspect.commitSummary(),
Score: suspect.score(),
Features: suspect.features(),
})
}
outputLines = append(outputLines, "")
jsonOutput.Clusters = append(jsonOutput.Clusters, outputCluster{
Signature: signature,
Suspects: outputSuspects,
})
}
markdownOutput := strings.Join(outputLines, "\n")
if c.jsonOutputFile == "" {
fmt.Println(markdownOutput)
return nil
}
jsonOutput.MarkdownOutput = markdownOutput
rawJSON, err := json.MarshalIndent(jsonOutput, "", " ")
rawJSON = append(rawJSON, '\n')
if err != nil {
return err
}
var outputWriter io.Writer
if c.jsonOutputFile == "-" {
outputWriter = os.Stdout
} else {
f, err := os.Create(c.jsonOutputFile)
if err != nil {
return err
}
defer f.Close()
outputWriter = f
}
if _, err := outputWriter.Write(rawJSON); err != nil {
return err
}
return nil
}
// fetchBuildResults downloads the build proto and failed test names for each
// build.
func (c *culpritCmd) fetchBuildResults(ctx context.Context) ([]buildResult, error) {
var res []buildResult
for _, buildID := range c.buildIDs {
build, err := c.bbClient.GetBuild(ctx, &buildbucketpb.GetBuildRequest{
Id: buildID,
Mask: &buildbucketpb.BuildMask{
Fields: &fieldmaskpb.FieldMask{
Paths: []string{
"id",
"builder",
"status",
"start_time",
"summary_markdown",
"input",
"output.properties",
"infra.resultdb",
},
},
},
})
if err != nil {
return nil, err
}
resp, err := c.rdbClient.QueryTestResults(ctx, &resultpb.QueryTestResultsRequest{
Invocations: []string{build.Infra.Resultdb.Invocation},
Predicate: &resultpb.TestResultPredicate{
// Only include non-exonerated failed tests.
Expectancy: resultpb.TestResultPredicate_VARIANTS_WITH_UNEXPECTED_RESULTS,
},
ReadMask: &fieldmaskpb.FieldMask{
Paths: []string{"test_id", "status", "tags", "failure_reason"},
},
})
if err != nil {
return nil, err
}
failedTests := functools.Filter(resp.TestResults, func(t *resultpb.TestResult) bool {
for _, tag := range t.Tags {
// Only consider top-level test suites. Test cases don't yet
// have all the metadata we care about.
if tag.Key == "test_case_count" {
return true
}
}
return false
})
res = append(res, buildResult{
Build: build,
FailedTests: failedTests,
})
}
return res, nil
}
func (c *culpritCmd) diagnoseCluster(ctx context.Context, sig failureSignature, buildResults []buildResult) ([]suspectCommit, error) {
// Sort in descending order by revision count.
functools.SortBy(buildResults, func(b buildResult) int {
irc, _ := b.integrationRevisionCount()
return -1 * irc
})
mostRecentBuild := buildResults[0].Build
windowEnd := mostRecentBuild.StartTime.AsTime()
commits, err := c.gitilesClient.Log(
ctx, mostRecentBuild.Input.GitilesCommit.Id, 300)
if err != nil {
return nil, err
}
baseCommitPosition, ok := buildResults[0].integrationRevisionCount()
if !ok {
baseCommitPosition = 0
}
var suspects []suspectCommit
for i, commit := range commits {
gerritChange, err := commitToGerritChange(commit)
if err != nil {
return nil, err
}
suspect := suspectCommit{
signature: sig,
CommitInfo: commit,
GerritChange: gerritChange,
BlamelistDistances: make(map[string]int),
TagMatchesTest: hasMatchingTag(commit.Message, sig.FailedTest),
}
if baseCommitPosition > 0 {
suspect.CommitPosition = baseCommitPosition - i
}
suspects = append(suspects, suspect)
}
// Used to control write access to `suspects`.
var mu sync.Mutex
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
affectingChanges, err := getChangesAffectingTest(ctx, c.bqClient, sig, windowEnd)
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
for _, ac := range affectingChanges {
for i, suspect := range suspects {
if gerritChangesEqual(ac.Change, suspect.GerritChange) {
suspects[i].AffectedTest = true
}
}
}
return err
})
eg.Go(func() error {
results, err := getNearbyTestResults(ctx, c.bqClient, sig, windowEnd)
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
return calculateBlamelistDistances(results, suspects)
})
if err := eg.Wait(); err != nil {
return nil, err
}
// Filter out suspects with a score of zero.
suspects = functools.Filter(suspects, func(c suspectCommit) bool {
return c.score() > 0
})
functools.SortBy(suspects, func(c suspectCommit) int {
return -1 * c.score()
})
if len(suspects) > 10 {
suspects = suspects[:10]
}
// Some data sources are expensive to query, so we only use them as
// additional data after the initial filtering has been done using cheaper data sources.
eg, _ = errgroup.WithContext(ctx)
eg.Go(func() error {
// The Gerrit API doesn't include changed files for the most recent
// revision by default, so we must explicitly request them.
opts := []gerritpb.QueryOption{
gerritpb.QueryOption_ALL_FILES,
gerritpb.QueryOption_CURRENT_REVISION,
}
gerritEG, _ := errgroup.WithContext(ctx)
for i, suspect := range suspects {
// Make copies of the variables that can be safely used in the closure.
i, suspect := i, suspect
gerritEG.Go(func() error {
changeInfo, err := c.gerritClient.getChange(ctx, suspect.GerritChange, opts...)
if err != nil {
return err
}
mu.Lock()
suspects[i].ChangeInfo = changeInfo
mu.Unlock()
return nil
})
}
return gerritEG.Wait()
})
if err := eg.Wait(); err != nil {
return nil, err
}
// If all the remaining suspects have the same score, that indicates low
// confidence in each individual suspect since we can't distinguish
// between suspects. It's also just not very helpful to print a ranking
// of suspects where each one has the same score.
uniqueScores := make(map[int]struct{})
for _, suspect := range suspects {
uniqueScores[suspect.score()] = struct{}{}
}
if len(uniqueScores) <= 1 {
suspects = nil
}
functools.SortBy(suspects, func(c suspectCommit) int {
return -1 * c.score()
})
return suspects, nil
}
// clusterFailureModes takes a set of failed builds and attempts to cluster them
// into groups by common failure reason. A single build might be included in
// multiple clusters if it had multiple failure reasons.
//
// If *any* cluster has more than one build, then we'll only return clusters
// that appear with more than one build. Any failure mode that appears only once
// is much less likely to be significant than a failure mode that appears
// multiple times.
func clusterFailureModes(buildResults []buildResult) map[failureSignature][]buildResult {
clusters := make(map[failureSignature][]buildResult)
for _, br := range buildResults {
if len(br.FailedTests) == 0 {
continue
}
for _, test := range br.FailedTests {
sig := failureSignature{
FailedTest: test.TestId,
}
for _, tag := range test.Tags {
if tag.Key == "gn_label" {
sig.TestGNLabel = tag.Value
}
}
if test.FailureReason != nil {
sig.FailureReason = test.FailureReason.PrimaryErrorMessage
}
clusters[sig] = append(clusters[sig], br)
}
}
minRequiredSize := 1
for _, cluster := range clusters {
if len(cluster) > 1 {
minRequiredSize = 2
break
}
}
for sig, buildResults := range clusters {
if len(buildResults) < minRequiredSize {
delete(clusters, sig)
}
}
return clusters
}
func gerritChangesEqual(c1, c2 *buildbucketpb.GerritChange) bool {
if c1 == nil || c2 == nil {
return false
}
return c1.Host == c2.Host &&
c1.Project == c2.Project &&
c1.Change == c2.Change
}
func commitToGerritChange(commit *git.Commit) (*buildbucketpb.GerritChange, error) {
lines := strings.Split(commit.Message, "\n")
changeURL := parseFooter(lines, "Original-Reviewed-on")
if changeURL == "" {
changeURL = parseFooter(lines, "Reviewed-on")
}
if changeURL == "" {
return nil, fmt.Errorf("no reviewed-on footer for commit %s", commit.Id)
}
return parseGerritChangeURL(changeURL)
}
func parseFooter(msgLines []string, footer string) string {
prefix := fmt.Sprintf("%s: ", footer)
for _, line := range msgLines {
if strings.HasPrefix(line, prefix) {
return strings.TrimPrefix(line, prefix)
}
}
return ""
}
func parseGerritChangeURL(changeURL string) (*buildbucketpb.GerritChange, error) {
u, err := url.Parse(changeURL)
if err != nil {
return nil, err
}
path := strings.TrimPrefix(u.Path, "/c/")
project, changeNumStr, ok := strings.Cut(path, "/+/")
if !ok {
return nil, fmt.Errorf("malformed gerrit URL: %q", changeURL)
}
var patchset int
changeNumStr, patchsetStr, ok := strings.Cut(changeNumStr, "/")
if ok {
patchset, err = strconv.Atoi(patchsetStr)
if err != nil {
return nil, err
}
}
changeNum, err := strconv.Atoi(changeNumStr)
if err != nil {
return nil, err
}
return &buildbucketpb.GerritChange{
Host: u.Host,
Project: project,
Change: int64(changeNum),
Patchset: int64(patchset),
}, nil
}
type culpritOutput struct {
// Human-readable markdown output.
MarkdownOutput string `json:"markdown_output"`
Clusters []outputCluster
}
type outputCluster struct {
Signature failureSignature `json:"signature"`
Suspects []outputSuspect `json:"suspects"`
}
type outputSuspect struct {
suspectCommit
GerritURL string `json:"gerrit_url"`
ChangedFiles []string `json:"changed_files,omitempty"`
CommitSummary string `json:"commit_summary"`
Score int `json:"score"`
Features []culpritFeature `json:"features"`
}