| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "io" |
| "net/url" |
| "os" |
| "os/signal" |
| "runtime/pprof" |
| "strconv" |
| "strings" |
| "sync" |
| "syscall" |
| |
| "cloud.google.com/go/bigquery" |
| "github.com/maruel/subcommands" |
| "go.chromium.org/luci/auth" |
| buildbucketpb "go.chromium.org/luci/buildbucket/proto" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/logging/gologger" |
| gerritpb "go.chromium.org/luci/common/proto/gerrit" |
| "go.chromium.org/luci/common/proto/git" |
| "go.chromium.org/luci/grpc/prpc" |
| resultpb "go.chromium.org/luci/resultdb/proto/v1" |
| "golang.org/x/exp/maps" |
| "golang.org/x/sync/errgroup" |
| "google.golang.org/api/option" |
| "google.golang.org/protobuf/types/known/fieldmaskpb" |
| |
| "go.fuchsia.dev/infra/functools" |
| "go.fuchsia.dev/infra/gerrit" |
| "go.fuchsia.dev/infra/gitiles" |
| ) |
| |
| // maxClusterCount is the maximum number of clusters that will be analyzed. If |
| // there are a large number of clusters to analyze then they probably have |
| // similar underlying causes so it's not useful to analyze each one |
| // individually, which would take a really long time. |
| const maxClusterCount = 10 |
| |
| func cmdCulprit(authOpts auth.Options) *subcommands.Command { |
| return &subcommands.Command{ |
| UsageLine: "culprit [flags] BUILD_ID [BUILD_ID]...", |
| ShortDesc: "Find culprit changes for CI breakages", |
| LongDesc: "Find culprit changes for CI breakages", |
| CommandRun: func() subcommands.CommandRun { |
| c := &culpritCmd{} |
| c.Init(authOpts) |
| return c |
| }, |
| } |
| } |
| |
| type culpritCmd struct { |
| commonFlags |
| jsonOutputFile string |
| |
| buildIDs []int64 |
| bqClient *bigquery.Client |
| bbClient buildbucketpb.BuildsClient |
| rdbClient resultpb.ResultDBClient |
| gitilesClient *gitiles.Client |
| gerritClient *gerritMultiClient |
| } |
| |
| func (c *culpritCmd) Init(defaultAuthOpts auth.Options) { |
| c.commonFlags.Init(defaultAuthOpts) |
| c.Flags.StringVar( |
| &c.jsonOutputFile, |
| "json-output", |
| "", |
| "Path to which to write output JSON. Use '-' for stdout Use '-' for stdout.") |
| } |
| |
| func (c *culpritCmd) parseArgs(args []string) error { |
| if err := c.commonFlags.Parse(); err != nil { |
| return err |
| } |
| for _, rawBuildID := range args { |
| buildID, err := strconv.ParseInt(strings.TrimPrefix(rawBuildID, "b"), 10, 64) |
| if err != nil { |
| return fmt.Errorf("positional argument %q is not an integer or a valid build URL", rawBuildID) |
| } |
| c.buildIDs = append(c.buildIDs, buildID) |
| } |
| return nil |
| } |
| |
| func (c *culpritCmd) Run(a subcommands.Application, args []string, _ subcommands.Env) int { |
| if err := c.parseArgs(args); err != nil { |
| fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
| return 1 |
| } |
| |
| if err := c.main(); err != nil { |
| fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
| return 1 |
| } |
| return 0 |
| } |
| |
| func (c *culpritCmd) initClients(ctx context.Context) error { |
| authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts) |
| tokenSource, err := authenticator.TokenSource() |
| if err != nil { |
| if err == auth.ErrLoginRequired { |
| fmt.Fprintf(os.Stderr, "You need to login first by running:\n") |
| fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " ")) |
| } |
| return err |
| } |
| |
| c.bqClient, err = bigquery.NewClient(ctx, "fuchsia-infra", option.WithTokenSource(tokenSource)) |
| if err != nil { |
| return err |
| } |
| |
| authClient, err := authenticator.Client() |
| if err != nil { |
| return fmt.Errorf("failed to initialize auth client: %w", err) |
| } |
| |
| c.gitilesClient, err = gitiles.NewClient( |
| // TODO(olivernewman): Generalize the autogardener to other |
| // repositories. |
| "turquoise-internal.googlesource.com", "integration", authClient) |
| if err != nil { |
| return err |
| } |
| |
| c.bbClient = buildbucketpb.NewBuildsPRPCClient(&prpc.Client{ |
| C: authClient, |
| Host: "cr-buildbucket.appspot.com", |
| }) |
| |
| c.rdbClient = resultpb.NewResultDBPRPCClient(&prpc.Client{ |
| C: authClient, |
| // TODO(olivernewman): Get the ResultDB host name from the build proto. |
| Host: "results.api.cr.dev", |
| }) |
| |
| c.gerritClient = &gerritMultiClient{ |
| authClient: authClient, |
| clients: make(map[gerritClientKey]*gerrit.Client), |
| } |
| |
| return nil |
| } |
| |
| func (c *culpritCmd) main() error { |
| ctx := gologger.StdConfig.Use(context.Background()) |
| logging.SetLevel(ctx, logging.Debug) |
| |
| // Print goroutine stack trace to stderr after a Ctrl-C. This is helpful for |
| // debugging deadlocks and slow operations. |
| go func() { |
| ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) |
| defer cancel() |
| <-ctx.Done() |
| pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) |
| }() |
| |
| if err := c.initClients(ctx); err != nil { |
| return err |
| } |
| |
| buildResults, err := c.fetchBuildResults(ctx) |
| if err != nil { |
| return err |
| } |
| |
| clusters := clusterFailureModes(buildResults) |
| if len(clusters) > maxClusterCount { |
| // Choose a random maxClusterCount clusters. |
| newClusters := make(map[failureSignature][]buildResult) |
| for k, v := range clusters { |
| newClusters[k] = v |
| if len(newClusters) == maxClusterCount { |
| break |
| } |
| } |
| clusters = newClusters |
| } |
| |
| noun := "cluster" |
| if len(clusters) != 1 { |
| noun += "s" |
| } |
| fmt.Printf("Found %d %s\n", len(clusters), noun) |
| |
| eg, _ := errgroup.WithContext(ctx) |
| results := make(map[failureSignature][]suspectCommit) |
| for signature, buildResults := range clusters { |
| // Make copies of the variables that can be safely used in the closure. |
| signature, buildResults := signature, buildResults |
| eg.Go(func() error { |
| suspects, err := c.diagnoseCluster(ctx, signature, buildResults) |
| results[signature] = suspects |
| return err |
| }) |
| } |
| |
| if err := eg.Wait(); err != nil { |
| return err |
| } |
| |
| var outputLines []string |
| |
| var jsonOutput culpritOutput |
| |
| // Sort results by confidence in the first suspect, so that failure modes |
| // with lower culprit confidence are less prominent, since they're more |
| // likely to not be actionable. |
| sortedSignatures := maps.Keys(results) |
| functools.SortBy(sortedSignatures, func(sig failureSignature) int { |
| if len(results[sig]) == 0 { |
| return 0 |
| } |
| return -results[sig][0].score() |
| }) |
| |
| for _, signature := range sortedSignatures { |
| suspects := results[signature] |
| // Don't bother emitting markdown output for failure modes with no |
| // suspects. It just adds noise to the output. |
| if len(suspects) == 0 { |
| continue |
| } |
| |
| outputLines = append(outputLines, "### "+signature.FailedTest, "") |
| |
| var outputSuspects []outputSuspect |
| |
| // Only show the top N suspects. |
| for i, suspect := range suspects { |
| outputLines = append(outputLines, fmt.Sprintf("%2d. (%3d%%) %s", i+1, suspect.score(), suspect.gerritURL())) |
| outputLines = append(outputLines, " "+suspect.commitSummary()) |
| |
| outputSuspects = append(outputSuspects, outputSuspect{ |
| suspectCommit: suspect, |
| GerritURL: suspect.gerritURL(), |
| ChangedFiles: suspect.changedFiles(), |
| CommitSummary: suspect.commitSummary(), |
| Score: suspect.score(), |
| Features: suspect.features(), |
| }) |
| } |
| |
| outputLines = append(outputLines, "") |
| |
| jsonOutput.Clusters = append(jsonOutput.Clusters, outputCluster{ |
| Signature: signature, |
| Suspects: outputSuspects, |
| }) |
| } |
| |
| markdownOutput := strings.Join(outputLines, "\n") |
| if c.jsonOutputFile == "" { |
| fmt.Println(markdownOutput) |
| return nil |
| } |
| jsonOutput.MarkdownOutput = markdownOutput |
| |
| rawJSON, err := json.MarshalIndent(jsonOutput, "", " ") |
| rawJSON = append(rawJSON, '\n') |
| if err != nil { |
| return err |
| } |
| |
| var outputWriter io.Writer |
| if c.jsonOutputFile == "-" { |
| outputWriter = os.Stdout |
| } else { |
| f, err := os.Create(c.jsonOutputFile) |
| if err != nil { |
| return err |
| } |
| defer f.Close() |
| outputWriter = f |
| } |
| |
| if _, err := outputWriter.Write(rawJSON); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| // fetchBuildResults downloads the build proto and failed test names for each |
| // build. |
| func (c *culpritCmd) fetchBuildResults(ctx context.Context) ([]buildResult, error) { |
| var res []buildResult |
| for _, buildID := range c.buildIDs { |
| build, err := c.bbClient.GetBuild(ctx, &buildbucketpb.GetBuildRequest{ |
| Id: buildID, |
| Mask: &buildbucketpb.BuildMask{ |
| Fields: &fieldmaskpb.FieldMask{ |
| Paths: []string{ |
| "id", |
| "builder", |
| "status", |
| "start_time", |
| "summary_markdown", |
| "input", |
| "output.properties", |
| "infra.resultdb", |
| }, |
| }, |
| }, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| |
| resp, err := c.rdbClient.QueryTestResults(ctx, &resultpb.QueryTestResultsRequest{ |
| Invocations: []string{build.Infra.Resultdb.Invocation}, |
| Predicate: &resultpb.TestResultPredicate{ |
| // Only include non-exonerated failed tests. |
| Expectancy: resultpb.TestResultPredicate_VARIANTS_WITH_UNEXPECTED_RESULTS, |
| }, |
| ReadMask: &fieldmaskpb.FieldMask{ |
| Paths: []string{"test_id", "status", "tags", "failure_reason"}, |
| }, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| |
| failedTests := functools.Filter(resp.TestResults, func(t *resultpb.TestResult) bool { |
| for _, tag := range t.Tags { |
| // Only consider top-level test suites. Test cases don't yet |
| // have all the metadata we care about. |
| if tag.Key == "test_case_count" { |
| return true |
| } |
| } |
| return false |
| }) |
| |
| res = append(res, buildResult{ |
| Build: build, |
| FailedTests: failedTests, |
| }) |
| } |
| return res, nil |
| } |
| |
| func (c *culpritCmd) diagnoseCluster(ctx context.Context, sig failureSignature, buildResults []buildResult) ([]suspectCommit, error) { |
| // Sort in descending order by revision count. |
| functools.SortBy(buildResults, func(b buildResult) int { |
| irc, _ := b.integrationRevisionCount() |
| return -1 * irc |
| }) |
| mostRecentBuild := buildResults[0].Build |
| |
| windowEnd := mostRecentBuild.StartTime.AsTime() |
| |
| commits, err := c.gitilesClient.Log( |
| ctx, mostRecentBuild.Input.GitilesCommit.Id, 300) |
| if err != nil { |
| return nil, err |
| } |
| |
| baseCommitPosition, ok := buildResults[0].integrationRevisionCount() |
| if !ok { |
| baseCommitPosition = 0 |
| } |
| |
| var suspects []suspectCommit |
| for i, commit := range commits { |
| gerritChange, err := commitToGerritChange(commit) |
| if err != nil { |
| return nil, err |
| } |
| suspect := suspectCommit{ |
| signature: sig, |
| CommitInfo: commit, |
| GerritChange: gerritChange, |
| BlamelistDistances: make(map[string]int), |
| TagMatchesTest: hasMatchingTag(commit.Message, sig.FailedTest), |
| } |
| if baseCommitPosition > 0 { |
| suspect.CommitPosition = baseCommitPosition - i |
| } |
| |
| suspects = append(suspects, suspect) |
| } |
| |
| // Used to control write access to `suspects`. |
| var mu sync.Mutex |
| |
| eg, _ := errgroup.WithContext(ctx) |
| eg.Go(func() error { |
| affectingChanges, err := getChangesAffectingTest(ctx, c.bqClient, sig, windowEnd) |
| if err != nil { |
| return err |
| } |
| mu.Lock() |
| defer mu.Unlock() |
| for _, ac := range affectingChanges { |
| for i, suspect := range suspects { |
| if gerritChangesEqual(ac.Change, suspect.GerritChange) { |
| suspects[i].AffectedTest = true |
| } |
| } |
| } |
| return err |
| }) |
| |
| eg.Go(func() error { |
| results, err := getNearbyTestResults(ctx, c.bqClient, sig, windowEnd) |
| if err != nil { |
| return err |
| } |
| |
| mu.Lock() |
| defer mu.Unlock() |
| |
| return calculateBlamelistDistances(results, suspects) |
| }) |
| |
| if err := eg.Wait(); err != nil { |
| return nil, err |
| } |
| |
| // Filter out suspects with a score of zero. |
| suspects = functools.Filter(suspects, func(c suspectCommit) bool { |
| return c.score() > 0 |
| }) |
| |
| functools.SortBy(suspects, func(c suspectCommit) int { |
| return -1 * c.score() |
| }) |
| if len(suspects) > 10 { |
| suspects = suspects[:10] |
| } |
| |
| // Some data sources are expensive to query, so we only use them as |
| // additional data after the initial filtering has been done using cheaper data sources. |
| eg, _ = errgroup.WithContext(ctx) |
| eg.Go(func() error { |
| // The Gerrit API doesn't include changed files for the most recent |
| // revision by default, so we must explicitly request them. |
| opts := []gerritpb.QueryOption{ |
| gerritpb.QueryOption_ALL_FILES, |
| gerritpb.QueryOption_CURRENT_REVISION, |
| } |
| gerritEG, _ := errgroup.WithContext(ctx) |
| for i, suspect := range suspects { |
| // Make copies of the variables that can be safely used in the closure. |
| i, suspect := i, suspect |
| gerritEG.Go(func() error { |
| changeInfo, err := c.gerritClient.getChange(ctx, suspect.GerritChange, opts...) |
| if err != nil { |
| return err |
| } |
| mu.Lock() |
| suspects[i].ChangeInfo = changeInfo |
| mu.Unlock() |
| return nil |
| }) |
| } |
| return gerritEG.Wait() |
| }) |
| |
| if err := eg.Wait(); err != nil { |
| return nil, err |
| } |
| |
| // If all the remaining suspects have the same score, that indicates low |
| // confidence in each individual suspect since we can't distinguish |
| // between suspects. It's also just not very helpful to print a ranking |
| // of suspects where each one has the same score. |
| uniqueScores := make(map[int]struct{}) |
| for _, suspect := range suspects { |
| uniqueScores[suspect.score()] = struct{}{} |
| } |
| if len(uniqueScores) <= 1 { |
| suspects = nil |
| } |
| |
| functools.SortBy(suspects, func(c suspectCommit) int { |
| return -1 * c.score() |
| }) |
| |
| return suspects, nil |
| } |
| |
| // clusterFailureModes takes a set of failed builds and attempts to cluster them |
| // into groups by common failure reason. A single build might be included in |
| // multiple clusters if it had multiple failure reasons. |
| // |
| // If *any* cluster has more than one build, then we'll only return clusters |
| // that appear with more than one build. Any failure mode that appears only once |
| // is much less likely to be significant than a failure mode that appears |
| // multiple times. |
| func clusterFailureModes(buildResults []buildResult) map[failureSignature][]buildResult { |
| clusters := make(map[failureSignature][]buildResult) |
| for _, br := range buildResults { |
| if len(br.FailedTests) == 0 { |
| continue |
| } |
| |
| for _, test := range br.FailedTests { |
| sig := failureSignature{ |
| FailedTest: test.TestId, |
| } |
| for _, tag := range test.Tags { |
| if tag.Key == "gn_label" { |
| sig.TestGNLabel = tag.Value |
| } |
| } |
| if test.FailureReason != nil { |
| sig.FailureReason = test.FailureReason.PrimaryErrorMessage |
| } |
| clusters[sig] = append(clusters[sig], br) |
| } |
| } |
| |
| minRequiredSize := 1 |
| for _, cluster := range clusters { |
| if len(cluster) > 1 { |
| minRequiredSize = 2 |
| break |
| } |
| } |
| |
| for sig, buildResults := range clusters { |
| if len(buildResults) < minRequiredSize { |
| delete(clusters, sig) |
| } |
| } |
| |
| return clusters |
| } |
| |
| func gerritChangesEqual(c1, c2 *buildbucketpb.GerritChange) bool { |
| if c1 == nil || c2 == nil { |
| return false |
| } |
| return c1.Host == c2.Host && |
| c1.Project == c2.Project && |
| c1.Change == c2.Change |
| } |
| |
| func commitToGerritChange(commit *git.Commit) (*buildbucketpb.GerritChange, error) { |
| lines := strings.Split(commit.Message, "\n") |
| changeURL := parseFooter(lines, "Original-Reviewed-on") |
| if changeURL == "" { |
| changeURL = parseFooter(lines, "Reviewed-on") |
| } |
| if changeURL == "" { |
| return nil, fmt.Errorf("no reviewed-on footer for commit %s", commit.Id) |
| } |
| return parseGerritChangeURL(changeURL) |
| } |
| |
| func parseFooter(msgLines []string, footer string) string { |
| prefix := fmt.Sprintf("%s: ", footer) |
| for _, line := range msgLines { |
| if strings.HasPrefix(line, prefix) { |
| return strings.TrimPrefix(line, prefix) |
| } |
| } |
| return "" |
| } |
| |
| func parseGerritChangeURL(changeURL string) (*buildbucketpb.GerritChange, error) { |
| u, err := url.Parse(changeURL) |
| if err != nil { |
| return nil, err |
| } |
| path := strings.TrimPrefix(u.Path, "/c/") |
| project, changeNumStr, ok := strings.Cut(path, "/+/") |
| if !ok { |
| return nil, fmt.Errorf("malformed gerrit URL: %q", changeURL) |
| } |
| |
| var patchset int |
| changeNumStr, patchsetStr, ok := strings.Cut(changeNumStr, "/") |
| if ok { |
| patchset, err = strconv.Atoi(patchsetStr) |
| if err != nil { |
| return nil, err |
| } |
| } |
| changeNum, err := strconv.Atoi(changeNumStr) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &buildbucketpb.GerritChange{ |
| Host: u.Host, |
| Project: project, |
| Change: int64(changeNum), |
| Patchset: int64(patchset), |
| }, nil |
| } |
| |
| type culpritOutput struct { |
| // Human-readable markdown output. |
| MarkdownOutput string `json:"markdown_output"` |
| |
| Clusters []outputCluster |
| } |
| |
| type outputCluster struct { |
| Signature failureSignature `json:"signature"` |
| Suspects []outputSuspect `json:"suspects"` |
| } |
| |
| type outputSuspect struct { |
| suspectCommit |
| |
| GerritURL string `json:"gerrit_url"` |
| ChangedFiles []string `json:"changed_files,omitempty"` |
| CommitSummary string `json:"commit_summary"` |
| Score int `json:"score"` |
| Features []culpritFeature `json:"features"` |
| } |