blob: b65bfa92d2adee80de63db15f17dc9651e148794 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"
"cloud.google.com/go/bigquery"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth"
cvpb "go.chromium.org/luci/cv/api/config/v2"
schedulerpb "go.chromium.org/luci/scheduler/appengine/messages"
"go.fuchsia.dev/infra/cmd/builder_oracle/queries"
"go.fuchsia.dev/infra/cmd/builder_oracle/types"
"go.fuchsia.dev/infra/flagutil"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)
const simulateLongDesc = `
Simulate the impact of adding or removing builders to/from CQ or CI.
`
// We recommend that tests constrain their flake rates to 3% or lower.
const flakeThreshold = 0.03
// This scalar allows us to squirrel away an extra amount of capacity so that we
// can handle unforeseen changes to our usage or availability.
const secretBotStash = 0.2
func cmdSimulate(authOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "simulate -config path/to/baseline,path/to/proposed",
ShortDesc: "Simulate builder configuration changes.",
LongDesc: simulateLongDesc,
CommandRun: func() subcommands.CommandRun {
c := &simulateCmd{}
c.Init(authOpts)
return c
},
}
}
type CIAddBuilder struct {
Bucket string
Builder string
Project string
BatchConfig TriggeringPolicy
Timer time.Duration
}
type TriggeringPolicy struct {
MaxBatchSize int64
MaxConcurrentInvocations int64
}
type CIRemoveBuilder struct {
Bucket string
Builder string
Project string
}
type CQAddBuilder struct {
Bucket string
Builder string
ConfigGroup string
Project string
// TODO(catduncan): Respect the builder's location_filters.
}
type CQRemoveBuilder struct {
Bucket string
Builder string
ConfigGroup string
Project string
}
type SimulateJSONOutput struct {
Warnings int `json:"warnings"`
Benefits int `json:"benefits"`
CIAddBuilders []CIAddBuilder `json:"added_ci_builders"`
CIRemoveBuilders []CIRemoveBuilder `json:"removed_ci_builders"`
CQAddBuilders []CQAddBuilder `json:"added_cq_builders"`
CQRemoveBuilders []CQRemoveBuilder `json:"removed_cq_builders"`
CLComment string `json:"cl_comment"`
}
type simulateCmd struct {
commonFlags
ciAdd []CIAddBuilder
ciRemove []CIRemoveBuilder
cqAdd []CQAddBuilder
cqRemove []CQRemoveBuilder
configDirs flagutil.RepeatedStringValue
jsonOutput SimulateJSONOutput
jsonOutputFile string
runBaseline bool
}
func (c *simulateCmd) Init(defaultAuthOpts auth.Options) {
cfg_desc := `
Repeated flag, a comma-separated string of paths to the baseline and
corresponding proposed config directories. These directories are
expected to contain lucicfg generated files which will be parsed to
determine what changes (if any) were made.
example:
-config baseline/fuchsia,proposed/fuchsia
-config baseline/turquoise,proposed/turquoise`
c.commonFlags.Init(defaultAuthOpts)
c.Flags.Var(
&c.configDirs,
"config",
cfg_desc,
)
c.Flags.BoolVar(
&c.runBaseline,
"run-baseline",
false,
"Whether to run the simulation without any proposed changes.",
)
c.Flags.StringVar(
&c.jsonOutputFile,
"json-output",
"",
"Filepath to write json output to. Use '-' for stdout.",
)
}
func (c *simulateCmd) parseArgs() error {
if err := c.commonFlags.Parse(); err != nil {
return err
}
return nil
}
func (c *simulateCmd) Run(a subcommands.Application, _ []string, _ subcommands.Env) int {
if err := c.parseArgs(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
if err := c.main(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
return 0
}
func (c *simulateCmd) main() error {
ctx := context.Background()
authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts)
tokenSource, err := authenticator.TokenSource()
if err != nil {
if err == auth.ErrLoginRequired {
fmt.Fprintf(os.Stderr, "You need to login first by running:\n")
fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " "))
}
return err
}
if c.quiet {
log.SetOutput(io.Discard)
}
client, err := bigquery.NewClient(ctx, "fuchsia-infra", option.WithTokenSource(tokenSource))
if err != nil {
return err
}
if err = c.readConfigs(); err != nil {
return err
}
if err = c.printProposedChanges(); err != nil {
return err
}
return c.runSimulation(ctx, client)
}
// printProposedChanges prints added and removed builders to the console.
func (c *simulateCmd) printProposedChanges() error {
if c.jsonOutputFile != "" {
c.jsonOutput.CIAddBuilders = c.ciAdd
c.jsonOutput.CIRemoveBuilders = c.ciRemove
c.jsonOutput.CQAddBuilders = c.cqAdd
c.jsonOutput.CQRemoveBuilders = c.cqRemove
} else {
fmt.Println("Added CI Builders:")
for _, cia := range c.ciAdd {
fmt.Printf(" %s : %s : %s\n", cia.Project, cia.Bucket, cia.Builder)
}
fmt.Println("\nRemoved CI Builders:")
for _, cir := range c.ciRemove {
fmt.Printf(" %s : %s : %s\n", cir.Project, cir.Bucket, cir.Builder)
}
fmt.Println("\nAdded Presubmit Builders:")
for _, cqa := range c.cqAdd {
fmt.Printf(" %s : %s : %s\n", cqa.Project, cqa.Bucket, cqa.Builder)
}
fmt.Println("\nRemoved Presubmit Builders:")
for _, cqr := range c.cqRemove {
fmt.Printf(" %s : %s : %s\n", cqr.Project, cqr.Bucket, cqr.Builder)
}
fmt.Println("")
}
return nil
}
// readConfigs parses lucicfg generated files to find what changes were proposed
// in the CL and adds them to the appropriate buckets for use in the simulation.
func (c *simulateCmd) readConfigs() error {
// Fetch the contents of the baseline and proposed configs.
for _, commaSepString := range c.configDirs {
dirSlice := strings.Split(commaSepString, ",")
// Guard against bad inputs.
if len(dirSlice) != 2 {
return fmt.Errorf(
"Bad Input: -config flag input\n got: %s \nwant: <path>,<path>",
commaSepString,
)
}
baseline := readLuciConfigs(dirSlice[0])
proposed := readLuciConfigs(dirSlice[1])
if err := c.parseCQChanges(baseline.commitQueueConfig, proposed.commitQueueConfig); err != nil {
return err
}
if err := c.parseCIChanges(baseline.luciSchedulerConfig, proposed.luciSchedulerConfig); err != nil {
return err
}
}
return nil
}
// parseCQChanges reads the differences between different versions of
// commit-queue.cfg and adds them to the appropriate buckets.
func (c *simulateCmd) parseCQChanges(baseline, proposed *cvpb.Config) error {
// Throw the ConfigGroups into a map for easy cross referencing.
baselineMap := map[string]*cvpb.ConfigGroup{}
proposedMap := map[string]*cvpb.ConfigGroup{}
for _, cfg := range baseline.GetConfigGroups() {
baselineMap[cfg.Name] = cfg
}
for _, cfg := range proposed.GetConfigGroups() {
proposedMap[cfg.Name] = cfg
}
// Find config groups removed by proposed changes.
for name, cfg := range baselineMap {
if _, ok := proposedMap[name]; !ok {
for _, b := range cfg.GetVerifiers().GetTryjob().GetBuilders() {
bdef, err := parseBuilderDef(b.GetName())
if err != nil {
return err
}
c.cqRemove = append(c.cqRemove, CQRemoveBuilder{
Builder: bdef.name,
ConfigGroup: cfg.GetName(),
Project: cfg.GetGerrit()[0].GetProjects()[0].GetName(),
})
}
}
}
// Find the config groups added by proposed changes.
for name, cfg := range proposedMap {
if _, ok := baselineMap[name]; !ok {
// Config exists in proposed but not baseline.
for _, b := range cfg.GetVerifiers().GetTryjob().GetBuilders() {
bdef, err := parseBuilderDef(b.GetName())
if err != nil {
return err
}
c.cqAdd = append(c.cqAdd, CQAddBuilder{
Bucket: bdef.bucket,
Builder: bdef.name,
ConfigGroup: cfg.GetName(),
Project: cfg.GetGerrit()[0].GetProjects()[0].GetName(),
})
}
}
}
// Find the config groups modified by proposed changes.
for name, cfg := range proposedMap {
if _, ok := baselineMap[name]; ok {
// Throw the Builders into a map for easy cross referencing.
baselineCfgBuilderMap := map[string]*cvpb.Verifiers_Tryjob_Builder{}
proposedCfgBuilderMap := map[string]*cvpb.Verifiers_Tryjob_Builder{}
for _, builder := range baselineMap[name].GetVerifiers().GetTryjob().GetBuilders() {
baselineCfgBuilderMap[builder.GetName()] = builder
}
for _, builder := range cfg.GetVerifiers().GetTryjob().GetBuilders() {
proposedCfgBuilderMap[builder.GetName()] = builder
}
// Find the Builders removed by proposed changes.
for name, b := range baselineCfgBuilderMap {
if _, ok := proposedCfgBuilderMap[name]; !ok {
bdef, err := parseBuilderDef(b.GetName())
if err != nil {
return err
}
c.cqRemove = append(c.cqRemove, CQRemoveBuilder{
Bucket: bdef.bucket,
Builder: bdef.name,
ConfigGroup: cfg.GetName(),
Project: cfg.GetGerrit()[0].GetProjects()[0].GetName(),
})
}
}
// Find the Builders added by proposed changes.
for name, b := range proposedCfgBuilderMap {
if _, ok := baselineCfgBuilderMap[name]; !ok {
bdef, err := parseBuilderDef(b.GetName())
if err != nil {
return err
}
c.cqAdd = append(c.cqAdd, CQAddBuilder{
Bucket: bdef.bucket,
Builder: bdef.name,
ConfigGroup: cfg.GetName(),
Project: cfg.GetGerrit()[0].GetProjects()[0].GetName(),
})
}
}
// Find the Builders modified by proposed changes.
for name, b := range proposedCfgBuilderMap {
if pb, ok := baselineCfgBuilderMap[name]; ok {
bdef, err := parseBuilderDef(b.GetName())
if err != nil {
return err
}
if pb.GetIncludableOnly() && !b.GetIncludableOnly() {
// Builder was enabled to trigger.
c.cqAdd = append(c.cqAdd, CQAddBuilder{
Bucket: bdef.bucket,
Builder: bdef.name,
ConfigGroup: cfg.GetName(),
Project: cfg.GetGerrit()[0].GetProjects()[0].GetName(),
})
} else if !pb.GetIncludableOnly() && b.GetIncludableOnly() {
// Builder was disabled from triggering.
c.cqRemove = append(c.cqRemove, CQRemoveBuilder{
Bucket: bdef.bucket,
Builder: bdef.name,
ConfigGroup: cfg.GetName(),
Project: cfg.GetGerrit()[0].GetProjects()[0].GetName(),
})
}
}
}
}
}
return nil
}
// parseCQChanges reads the differences between different versions of
// luci-scheduler.cfg and adds them to the appropriate buckets.
func (c *simulateCmd) parseCIChanges(baseline, proposed *schedulerpb.ProjectConfig) error {
// Throw the Triggers and Jobs into maps for easy cross referencing.
baselineMap := map[string]*schedulerpb.Trigger{}
proposedMap := map[string]*schedulerpb.Trigger{}
proposedJobsMap := map[string]*schedulerpb.Job{}
baselineJobsMap := map[string]*schedulerpb.Job{}
for _, trigger := range baseline.GetTrigger() {
baselineMap[trigger.Id] = trigger
}
for _, trigger := range proposed.GetTrigger() {
proposedMap[trigger.Id] = trigger
}
for _, job := range baseline.GetJob() {
baselineJobsMap[job.Id] = job
}
for _, job := range proposed.GetJob() {
proposedJobsMap[job.Id] = job
}
// Find Triggers removed by proposed changes.
for id, trigger := range baselineMap {
if _, ok := proposedMap[id]; !ok {
repoSplit := strings.Split(trigger.GetGitiles().GetRepo(), "googlesource.com/")
if len(repoSplit) < 2 {
continue
}
for _, t := range trigger.GetTriggers() {
bkt := baselineJobsMap[t].GetBuildbucket().GetBucket()
c.ciRemove = append(c.ciRemove, CIRemoveBuilder{
Bucket: bkt,
Builder: parseCIBuilderName(t, bkt),
Project: repoSplit[1],
})
}
}
}
// Find Triggers added by proposed changes.
for id, trigger := range proposedMap {
if _, ok := baselineMap[id]; !ok {
repoSplit := strings.Split(trigger.GetGitiles().GetRepo(), "googlesource.com/")
if len(repoSplit) < 2 {
continue
}
for _, t := range trigger.GetTriggers() {
bkt := proposedJobsMap[t].GetBuildbucket().GetBucket()
c.ciAdd = append(c.ciAdd, CIAddBuilder{
Bucket: bkt,
Builder: parseCIBuilderName(t, bkt),
Project: repoSplit[1],
BatchConfig: TriggeringPolicy{
MaxBatchSize: proposedJobsMap[t].GetTriggeringPolicy().GetMaxBatchSize(),
MaxConcurrentInvocations: proposedJobsMap[t].GetTriggeringPolicy().GetMaxConcurrentInvocations(),
},
})
}
}
}
// Find Triggers modified by proposed changes.
for id, trigger := range proposedMap {
if _, ok := baselineMap[id]; ok {
// Throw the BuilderTriggers into a set for easy cross referencing.
baselineBuilderTriggersMap := map[string]struct{}{}
proposedBuilderTriggersMap := map[string]struct{}{}
for _, t := range baselineMap[id].GetTriggers() {
baselineBuilderTriggersMap[t] = struct{}{}
}
for _, t := range proposedMap[id].GetTriggers() {
proposedBuilderTriggersMap[t] = struct{}{}
}
repoSplit := strings.Split(trigger.GetGitiles().GetRepo(), "googlesource.com/")
if len(repoSplit) < 2 {
continue
}
// Find BuilderTriggers removed by proposed changes.
for b := range baselineBuilderTriggersMap {
if _, ok := proposedBuilderTriggersMap[b]; !ok {
bkt := baselineJobsMap[b].GetBuildbucket().GetBucket()
c.ciRemove = append(c.ciRemove, CIRemoveBuilder{
Bucket: bkt,
Builder: parseCIBuilderName(b, bkt),
Project: repoSplit[1],
})
}
}
// Find BuilderTriggers added by proposed changes.
for p := range proposedBuilderTriggersMap {
if _, ok := baselineBuilderTriggersMap[p]; !ok {
bkt := proposedJobsMap[p].GetBuildbucket().GetBucket()
c.ciAdd = append(c.ciAdd, CIAddBuilder{
Bucket: bkt,
Builder: parseCIBuilderName(p, bkt),
Project: repoSplit[1],
BatchConfig: TriggeringPolicy{
MaxBatchSize: proposedJobsMap[p].GetTriggeringPolicy().GetMaxBatchSize(),
MaxConcurrentInvocations: proposedJobsMap[p].GetTriggeringPolicy().GetMaxConcurrentInvocations(),
},
})
}
}
}
}
return nil
}
// Query strings are defined in queries.go for ease of readability.
func (c *simulateCmd) runSimulation(ctx context.Context, client *bigquery.Client) error {
// If there are no changes to simulate and we aren't running baseline
// return early.
numChanges := len(c.ciAdd) + len(c.cqAdd) + len(c.ciRemove) + len(c.cqRemove)
if numChanges == 0 && !c.runBaseline {
fmt.Println("No changes detected, exiting early.")
return c.outputJSON()
}
pst, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
return err
}
// The last two weeks.
periodOfInterestDuration := 14 * 24 * time.Hour
endTime := time.Now().In(pst).Truncate(24 * time.Hour)
startTime := endTime.Add(-periodOfInterestDuration)
log.Printf(" Period of interest: %s - %s", startTime, endTime)
log.Print("Fetching average subtask runtime...")
var avgTaskRuntimeResult []types.AvgTaskDuration
timeframeParams := []bigquery.QueryParameter{
{
Name: "StartTime",
Value: startTime,
},
{
Name: "EndTime",
Value: endTime,
},
}
if err := runQuery(ctx, client, queries.AvgTaskRuntime, timeframeParams, &avgTaskRuntimeResult); err != nil {
return err
}
if len(avgTaskRuntimeResult) == 0 {
return errors.New("no results for avgTaskRuntimeQuery")
}
avgTaskRuntime := avgTaskRuntimeResult[0].AvgDuration
log.Print("Fetching device counts...")
var deviceCounts []types.DeviceCount
if err := runQuery(ctx, client, queries.BotCounts, timeframeParams, &deviceCounts); err != nil {
return err
}
availableBots := map[string]int{}
for _, dc := range deviceCounts {
// ex: internal_Cavium.
availableBots[dc.DeviceType] = int((1 - secretBotStash) * float64(dc.BotCount))
}
log.Print("Fetching testing task runs during period of interest...")
var testingTaskRuns []types.TestingTask
if err := runQuery(ctx, client, queries.TestingTasks, timeframeParams, &testingTaskRuns); err != nil {
return err
}
sort.SliceStable(testingTaskRuns, func(i, j int) bool {
if testingTaskRuns[i].CreateTime != testingTaskRuns[j].CreateTime {
return testingTaskRuns[i].CreateTime.Before(testingTaskRuns[j].CreateTime)
} else {
return testingTaskRuns[i].Duration < testingTaskRuns[j].Duration
}
})
taskCountPerBuilder := map[string]int{}
for _, run := range testingTaskRuns {
if run.Duration == 0 {
run.Duration = avgTaskRuntime
}
taskCountPerBuilder[run.Builder] += 1
}
log.Printf(" %d rows found", len(testingTaskRuns))
if c.verbose {
fmt.Println("Tasks:")
for builder, count := range taskCountPerBuilder {
fmt.Println("Task Count: ", builder, " | ", count)
}
fmt.Println("Bots:")
for dt, count := range deviceCounts {
fmt.Println("Bot Count: ", dt, " | ", count)
}
}
builderProfileMap := map[string]types.BuilderProfile{}
log.Print("Fetching statistics on proposed builder additions...")
var builderProfileSlice []types.BuilderProfile
if err := runQuery(ctx, client, queries.BuilderTestingTaskProfiles, timeframeParams, &builderProfileSlice); err != nil {
return err
}
for _, profile := range builderProfileSlice {
builderProfileMap[profile.Builder] = profile
}
var cqAttempts []types.CommitQueueAttempt
if len(c.cqAdd) > 0 {
log.Print("Fetching historical presubmit events...")
if err := runQuery(ctx, client, queries.CommitQueueAttempts, timeframeParams, &cqAttempts); err != nil {
return err
}
}
sim := Simulator{
baseline: true,
startTime: startTime,
endTime: endTime,
originalIdleBots: availableBots,
originalTaskRuns: testingTaskRuns,
originalCqAttempts: cqAttempts,
builderProfiles: builderProfileMap,
cqRemove: c.cqRemove,
ciRemove: c.ciRemove,
cqAdd: c.cqAdd,
ciAdd: c.ciAdd,
}
log.Print("Running Simulation to gather baseline information...")
baseline := sim.simulate()
log.Print("Running Simulation with proposed changes...")
sim.baseline = false
proposed := sim.simulate()
log.Print("Simulation Complete!")
historicalPending := map[string]time.Duration{}
for _, testingTask := range testingTaskRuns {
historicalPending[testingTask.DeviceType] += testingTask.PendingDuration
}
if c.veryVerbose {
reportSimulationDifferences(
availableBots,
baseline.StepResults,
proposed.StepResults,
)
}
c.reportRecommendation(
availableBots,
baseline,
proposed,
historicalPending,
builderProfileMap,
)
return c.outputJSON()
}
// outputJSON is a function that writes the performed analysis to a JSON file.
func (c *simulateCmd) outputJSON() error {
// Always print to stdout the full output.
if c.jsonOutputFile != "-" {
fmt.Println(c.jsonOutput.CLComment)
}
// If there are too many warnings, abbreviate the CL comment.
if c.jsonOutput.Warnings > 20 {
c.jsonOutput.CLComment = fmt.Sprintf(
"Builder Oracle has found %d issues with the proposed changes, check the stdout of the \"simulate proposed builder changes\" step for full details.",
c.jsonOutput.Warnings,
)
}
if c.jsonOutput.CLComment != "" {
// Add escape characters to format CL comment.
c.jsonOutput.CLComment = "```\n" + c.jsonOutput.CLComment + "```"
}
if c.jsonOutputFile != "" {
// Write output to json.
rawJSON, err := json.MarshalIndent(c.jsonOutput, "", " ")
rawJSON = append(rawJSON, '\n')
if err != nil {
return err
}
var f io.WriteCloser
if c.jsonOutputFile == "-" {
f = os.Stdout
} else {
f, err = os.Create(c.jsonOutputFile)
if err != nil {
return err
}
defer f.Close()
}
if _, err := f.Write(rawJSON); err != nil {
return err
}
}
return nil
}
// reportRecommendation is a function that prints out any recommended actions
// for proposed changes. If no changes are detected, it prints out the raw stats.
func (c *simulateCmd) reportRecommendation(
availableBots map[string]int,
baseline SimulateResults,
proposed SimulateResults,
historicalPending map[string]time.Duration,
builderProfiles map[string]types.BuilderProfile) {
dts := maps.Keys(availableBots)
sort.Strings(dts)
availabilityWarnings := []string{}
zeroRunWarnings := []string{}
lowRunWarnings := []string{}
longPoleWarnings := []string{}
flakeRateWarnings := []string{}
// Special case if someone invokes simulate with no builder changes.
if len(c.cqAdd)+len(c.ciAdd)+len(c.ciRemove)+len(c.cqRemove) == 0 {
fmt.Println("No changes to builders detected, reporting raw statistics")
for _, dt := range dts {
fmt.Println(dt)
fmt.Printf(
"Average simulated pending time: %.2f seconds\n",
baseline.BotStats[dt].PendingMean.Seconds(),
)
fmt.Printf(
"P75 simulated pending time: %.2f seconds\n",
baseline.BotStats[dt].PendingP75.Seconds(),
)
fmt.Printf(
"P99 simulated pending time: %.2f seconds\n",
baseline.BotStats[dt].PendingP99.Seconds(),
)
fmt.Printf(
"Simulated device availability: %.2f %%\n",
baseline.BotStats[dt].AvailableCapacityPercent,
)
fmt.Println("Builders using this device:")
builders := maps.Keys(proposed.BotStats[dt].Builders)
sort.Strings(builders)
max := maxlen(builders)
for _, builder := range builders {
// For pretty formatting to line up the output.
buf := strings.Repeat(" ", max-len(builder))
fmt.Printf(
" %s%s | %0.2fs\n",
builder,
buf,
proposed.BotStats[dt].Builders[builder].Minutes(),
)
}
fmt.Println("")
}
return
}
// Identify longpole builders.
projectBuilderDurations := map[string][]time.Duration{}
projectP90s := map[string]time.Duration{}
for _, bp := range builderProfiles {
projectBuilderDurations[bp.Project] = append(projectBuilderDurations[bp.Project], bp.AverageBuildDuration)
}
for project, durations := range projectBuilderDurations {
slices.Sort(durations)
projectP90s[project] = percentile(durations, 90)
}
// Go through added builders and report any warnings.
type addedBuilder struct {
builder string
isPresubmit bool
}
addedBuilders := []addedBuilder{}
for _, builderChange := range c.ciAdd {
ab := addedBuilder{builder: builderChange.Builder, isPresubmit: false}
if !slices.Contains(addedBuilders, ab) {
addedBuilders = append(addedBuilders, ab)
}
}
for _, builderChange := range c.cqAdd {
ab := addedBuilder{builder: builderChange.Builder, isPresubmit: true}
if !slices.Contains(addedBuilders, ab) {
addedBuilders = append(addedBuilders, ab)
}
}
warningCount := 0
benefitCount := 0
for _, builder := range addedBuilders {
profile := builderProfiles[builder.builder]
// Raise warning if low count of builds recorded for added builders.
if profile.NumBuilds == 0 {
warn := fmt.Sprintf(
"WARNING: zero runs detected for added builder %s. Proceed with caution, as impact to infrastructure is unquantified.\n",
builder.builder,
)
zeroRunWarnings = append(zeroRunWarnings, warn)
warningCount++
} else if profile.NumBuilds < 10 {
warn := fmt.Sprintf(
"WARNING: added builder %s's profile was based off of a statistically low number of recorded runs, simulation fidelity may be impacted.\n",
builder.builder,
)
lowRunWarnings = append(lowRunWarnings, warn)
warningCount++
}
// Raise warning if builder is a long pole.
if profile.AverageBuildDuration > projectP90s[profile.Project] {
warn := fmt.Sprintf(
"WARNING: added builder %s is a long pole for the context it is running in at an average runtime of %0.2f seconds.\n",
builder.builder,
profile.AverageBuildDuration.Seconds(),
)
longPoleWarnings = append(longPoleWarnings, warn)
warningCount++
}
// Raise warning if builder is flaky.
flakeRate := float32(profile.NumFlakes) / float32(profile.NumBuilds)
if flakeRate > flakeThreshold && builder.isPresubmit {
warn := fmt.Sprintf(
"WARNING: builder %s added to presubmit has a high flake rate at %.2f%%, this exceeds our recommended flake rate for presubmit.\n",
builder.builder,
100*flakeRate,
)
flakeRateWarnings = append(flakeRateWarnings, warn)
warningCount++
}
}
// Raise warning if bot pending times were impacted.
for dt, botstat := range baseline.BotStats {
deltaAvailableCapacityPercent := proposed.BotStats[dt].AvailableCapacityPercent - botstat.AvailableCapacityPercent
// We choose the AvailableCapacityPercent as the best proxy for an improvement or detriment
// Since the pending times are derived from device availability.
if deltaAvailableCapacityPercent < 0 {
warningCount++
warn := fmt.Sprintf(
"WARNING: Proposed changes reduce the availability of device: %s (details below).",
dt,
)
availabilityWarnings = append(availabilityWarnings, warn)
availabilityWarnings = append(availabilityWarnings, c.formatPendingDeltas(
dt,
proposed.BotStats[dt],
botstat,
)...)
out := fmt.Sprint("Builders using this device:")
availabilityWarnings = append(availabilityWarnings, out)
builders := maps.Keys(botstat.Builders)
sort.Strings(builders)
max := maxlen(builders)
for _, builder := range builders {
// For pretty formatting to line up the output
buf := strings.Repeat(" ", max-len(builder))
out := fmt.Sprintf(
" %s%s | %0.2fs",
builder,
buf,
botstat.Builders[builder].Minutes(),
)
availabilityWarnings = append(availabilityWarnings, out)
}
availabilityWarnings = append(availabilityWarnings, "\n")
}
if deltaAvailableCapacityPercent > 0 {
benefitCount++
benefit := fmt.Sprintf(
"This change will improve the pending times for device %s! (details below).",
dt,
)
availabilityWarnings = append(availabilityWarnings, benefit)
c.formatPendingDeltas(
dt,
proposed.BotStats[dt],
botstat,
)
}
}
c.jsonOutput.Warnings = warningCount
c.jsonOutput.Benefits = benefitCount
if warningCount > 0 {
lang := "issues"
if warningCount == 1 {
lang = "issue"
}
verdict := fmt.Sprintf(
"Builder Oracle found %d %s with the proposed builder changes, please review the warning output.\n",
warningCount,
lang,
)
outputLines := []string{verdict}
outputLines = append(outputLines, availabilityWarnings...)
outputLines = append(outputLines, longPoleWarnings...)
outputLines = append(outputLines, flakeRateWarnings...)
outputLines = append(outputLines, lowRunWarnings...)
outputLines = append(outputLines, zeroRunWarnings...)
clComment := strings.Join(outputLines, "\n")
c.jsonOutput.CLComment = clComment
} else if benefitCount > 0 {
fmt.Println("Congratulations, these changes made improvements to bot availability, examine the benefits in the output above.")
} else {
fmt.Printf("No issues detected with proposed changes, you may proceed with confidence!\n")
}
}
// formatPendingDeltas is a function that formats the change in pendingTimes for
// the proposed change into a human-readable string.
func (c *simulateCmd) formatPendingDeltas(
dt string,
proposed DeviceStatistics,
baseline DeviceStatistics,
) []string {
ret := []string{}
out := fmt.Sprintf("%s\n", dt)
ret = append(ret, out)
out = fmt.Sprintf(
"PendingMean: (before -> after) %.2fs -> %.2fs | delta: %.2fs",
baseline.PendingMean.Seconds(),
proposed.PendingMean.Seconds(),
(proposed.PendingMean - baseline.PendingMean).Seconds(),
)
ret = append(ret, out)
out = fmt.Sprintf(
"PendingP75: (before -> after) %.2fs -> %.2fs | delta: %.2fs",
baseline.PendingP75.Seconds(),
proposed.PendingP75.Seconds(),
(proposed.PendingP75 - baseline.PendingP75).Seconds(),
)
ret = append(ret, out)
out = fmt.Sprintf(
"PendingP99: (before -> after) %.2fs -> %.2fs | delta: %.2fs",
baseline.PendingP99.Seconds(),
proposed.PendingP99.Seconds(),
(proposed.PendingP99 - baseline.PendingP99).Seconds(),
)
ret = append(ret, out)
out = fmt.Sprintf(
"Availability: (before -> after) %.2f%% -> %.2f%% | delta: %.2f%%\n",
baseline.AvailableCapacityPercent,
proposed.AvailableCapacityPercent,
proposed.AvailableCapacityPercent-baseline.AvailableCapacityPercent,
)
ret = append(ret, out)
return ret
}
type SimulateResults struct {
BotStats map[string]DeviceStatistics
StepResults []types.StepStatus
}
type DeviceStatistics struct {
PendingMean time.Duration
PendingP75 time.Duration
PendingP99 time.Duration
AvailableCapacityPercent float64
Builders map[string]time.Duration
}
type RecoveringBot struct {
DeviceType string
RecoveryTime time.Time
}
type Simulator struct {
baseline bool // whether the simulation should factor in builder changes
cqAttempts []types.CommitQueueAttempt // working copy of originalCqAttempts
originalCqAttempts []types.CommitQueueAttempt // incoming presubmit verification requests
recoveringBots []RecoveringBot // bots that are performing maintenance between tasks
ciAdd []CIAddBuilder // builders added to ci
ciRemove []CIRemoveBuilder // builders removed from ci
cqAdd []CQAddBuilder // builders added to cq
cqRemove []CQRemoveBuilder // builders removed from cq
StepResults []types.StepStatus // debug logging for observing the simulation in detail
expiredTasks []types.TestingTask // tasks that were pending for longer than their expiration timeout
finishedTasks []types.TestingTask // tasks which have completed
runningTasks []types.TestingTask // tasks that are using a bot and have not completed
waitingTasks []types.TestingTask // tasks that are waiting for an available bot
originalTaskRuns []types.TestingTask // historical tasks observed over the simulated period
taskRuns []types.TestingTask // working copy of originalTaskRuns
builderProfiles map[string]types.BuilderProfile // builder information and bot footprint
botAvailability map[string]time.Duration // a mapping of bot names to duration that the bot had more than 0 available
idleBots map[string]int // working copy of originalIdleBots
originalIdleBots map[string]int // available bots for tasks
ciTriggers map[string]time.Time // next timestamp to trigger new ci builders
timestepIncrement time.Duration // how far in time the simulation should move between steps
currTime time.Time // the current time in the simulation
endTime time.Time // the end of the simulated period
startTime time.Time // the start of the simulated period
}
// Our hardware-backed bots require recovery overhead, which does not include gce or Cavium.
var recoveryExcludedBots = []string{"gce", "Cavium"}
func (s *Simulator) simulate() SimulateResults {
// initialize working variables for the new simulation.
s.initVars()
for s.currTime = s.startTime; s.currTime.Before(s.endTime); s.currTime = s.currTime.Add(s.timestepIncrement) {
// Check if any bots have finished their tasks and recovery period and
// make them available.
s.recoverBots()
// Prune finished tasks from the runningTask queue.
s.processFinishedTasks()
// Add incoming historical traffic to the waitingTask queue.
s.processIncomingTasks()
// Add traffic from proposed changes to the waitingTask queue.
if !s.baseline {
s.processCITriggers()
s.processCQAttempts()
}
// Start waiting tasks if there are available bots.
s.processWaitingTasks()
// Increment our waiting and runtime counts.
for idx := range s.waitingTasks {
s.waitingTasks[idx].PendingDuration += s.timestepIncrement
}
for bot, amt := range s.idleBots {
// Track if the bot type had units available.
if amt != 0 {
s.botAvailability[bot] += s.timestepIncrement
}
}
// Log this step in the simulation for debugging.
s.StepResults = append(s.StepResults, types.StepStatus{
CurrentTime: s.currTime,
RunningTasks: s.runningTasks,
WaitingTasks: s.waitingTasks,
IdleBots: maps.Clone(s.idleBots),
TaskRuns: s.taskRuns,
})
}
// Calculate the mean/p50/p99 pending time for tasks per device type.
pendingTimesByBots := map[string][]time.Duration{}
BotStats := map[string]DeviceStatistics{}
endedTasks := append(s.finishedTasks, s.expiredTasks...)
// Collate the pending times for tasks per device.
for _, task := range endedTasks {
pendingTimesByBots[task.DeviceType] = append(pendingTimesByBots[task.DeviceType], task.PendingDuration)
}
for bot, pt := range pendingTimesByBots {
slices.Sort(pt)
sum := 0 * time.Second
for _, p := range pt {
sum += p
}
avg := sum / time.Duration(len(pt))
p75 := percentile(pt, 75)
p99 := percentile(pt, 99)
BotStats[bot] = DeviceStatistics{
PendingMean: avg,
PendingP75: p75,
PendingP99: p99,
AvailableCapacityPercent: 100 * float64(s.botAvailability[bot]) / float64(s.endTime.Sub(s.startTime)),
Builders: map[string]time.Duration{},
}
}
// Collate the builder usage of each device type.
for _, task := range endedTasks {
BotStats[task.DeviceType].Builders[task.Builder] += task.Duration
}
return SimulateResults{
BotStats: BotStats,
StepResults: s.StepResults,
}
}
// initVars is a helper that initializes working variables before simulate() calls.
func (s *Simulator) initVars() {
s.timestepIncrement = 5 * time.Second
s.expiredTasks = []types.TestingTask{}
s.finishedTasks = []types.TestingTask{}
s.runningTasks = []types.TestingTask{}
s.waitingTasks = []types.TestingTask{}
s.recoveringBots = []RecoveringBot{}
s.idleBots = maps.Clone(s.originalIdleBots)
s.StepResults = []types.StepStatus{}
s.botAvailability = map[string]time.Duration{}
// This is a rough approximation for adding builders to CI. This assumes
// that there is a constant influx of CLs to merit triggering the builder
// after each run has completed.
// TODO(fxbug.dev/95557): Track merge events and use
// max_concurrent_invocations and batching rules to simulate with higher
// fidelity.
s.ciTriggers = map[string]time.Time{}
for _, builderChange := range s.ciAdd {
s.ciTriggers[builderChange.Builder] = s.startTime
}
s.taskRuns = s.originalTaskRuns
s.cqAttempts = s.originalCqAttempts
}
// recoverBots is a helper that adds bots to the idleBots pool if they've finished their tasks and recovered.
func (s *Simulator) recoverBots() {
var updatedRecoveringBots []RecoveringBot
for _, bot := range s.recoveringBots {
if s.currTime.After(bot.RecoveryTime) {
s.idleBots[bot.DeviceType] += 1
} else {
updatedRecoveringBots = append(updatedRecoveringBots, bot)
}
}
s.recoveringBots = updatedRecoveringBots
}
func (s *Simulator) processFinishedTasks() {
var updatedRunningTasks []types.TestingTask
for _, task := range s.runningTasks {
if s.currTime.After(task.CreateTime.Add(task.Duration)) {
s.finishedTasks = append(s.finishedTasks, task)
if slices.Contains(recoveryExcludedBots, task.DeviceType) {
s.idleBots[task.DeviceType] += 1
} else {
s.recoveringBots = append(
s.recoveringBots,
RecoveringBot{
DeviceType: task.DeviceType,
RecoveryTime: s.currTime.Add(75 * time.Second),
},
)
}
} else {
updatedRunningTasks = append(updatedRunningTasks, task)
}
}
s.runningTasks = updatedRunningTasks
}
// processWaitingTasks is a helper that assigns waiting tasks to an available bot.
// If no bot is available, it stays in the waitingTasks queue.
func (s *Simulator) processWaitingTasks() {
var updatedWaitingTasks []types.TestingTask
for _, task := range s.waitingTasks {
// Remove this task if it is pending over its configured expiration time.
if task.PendingDuration > task.Expiration {
s.expiredTasks = append(s.expiredTasks, task)
continue
}
if !s.baseline {
// Drop the task if it is proposed for removal in ci/cqRemove.
if s.shouldRemoveTask(task) {
continue
}
}
if s.idleBots[task.DeviceType] > 0 {
s.idleBots[task.DeviceType] -= 1
s.runningTasks = append(s.runningTasks, task)
} else {
updatedWaitingTasks = append(updatedWaitingTasks, task)
}
}
s.waitingTasks = updatedWaitingTasks
}
// shouldRemoveTask is a helper function that iterates through the proposed
// removed builders and returns true if it finds a match, false otherwise.
func (s *Simulator) shouldRemoveTask(task types.TestingTask) bool {
if task.IsTry {
for _, builderChange := range s.cqRemove {
sameProject := task.Project == builderChange.Project
sameBuilder := task.Builder == builderChange.Builder
sameBucket := task.Bucket == builderChange.Bucket
if sameProject && sameBuilder && sameBucket {
return true
}
}
} else {
for _, builderChange := range s.ciRemove {
sameProject := task.Project == builderChange.Project
sameBuilder := task.Builder == builderChange.Builder
sameBucket := task.Bucket == builderChange.Bucket
if sameProject && sameBuilder && sameBucket {
return true
}
}
}
return false
}
// processIncomingTasks is a helper that moves tasks from the taskRuns queue to the waitingTasks queue.
// Tasks are only moved if the simulation is past the point they were created.
func (s *Simulator) processIncomingTasks() {
for len(s.taskRuns) > 0 && s.taskRuns[0].CreateTime.Before(s.currTime) {
task := s.taskRuns[0]
task.PendingDuration = 0
s.taskRuns = s.taskRuns[1:]
s.waitingTasks = append(s.waitingTasks, task)
}
}
// processCiTriggers is a helper that enqueues subtasks for the simulate command.
// Added CI builders have an associated timestamp that indicates when we should
// trigger the next round of subtasks.
func (s *Simulator) processCITriggers() {
for builder, nextTrigger := range s.ciTriggers {
if s.currTime.After(nextTrigger) {
// Trigger all subtasks in profile.
for _, deviceFootprint := range s.builderProfiles[builder].DeviceFootprints {
for i := 0; i < deviceFootprint.AverageTasks; i++ {
// Internal CI
task := types.TestingTask{
Builder: builder,
CreateTime: s.currTime,
DeviceType: "internal_" + deviceFootprint.DeviceType,
Duration: deviceFootprint.AverageTaskDuration,
Expiration: deviceFootprint.Expiration,
IsTry: false,
PendingDuration: 0,
}
s.waitingTasks = append(s.waitingTasks, task)
}
}
// Set the next trigger.
s.ciTriggers[builder] = s.currTime.Add(s.builderProfiles[builder].AverageBuildDuration)
}
}
}
// processCqAttempts is a helper that enqueues subtasks for the simulate command.
// It loops through historical presubmit attempts and enqueues tasks for each
// added CQ builder.
func (s *Simulator) processCQAttempts() {
for ; len(s.cqAttempts) > 0 && (s.cqAttempts)[0].StartTime.Before(s.currTime); s.cqAttempts = s.cqAttempts[1:] {
cqAttempt := s.cqAttempts[0]
externality := "internal_"
if strings.HasPrefix(cqAttempt.ConfigGroup, "fuchsia") {
externality = "external_"
}
for _, builderChange := range s.cqAdd {
if cqAttempt.ConfigGroup == builderChange.ConfigGroup {
// Trigger all subtasks in profile.
for _, deviceFootprint := range s.builderProfiles[builderChange.Builder].DeviceFootprints {
for i := 0; i < deviceFootprint.AverageTasks; i++ {
task := types.TestingTask{
Builder: builderChange.Builder,
CreateTime: s.currTime,
DeviceType: externality + deviceFootprint.DeviceType,
Duration: deviceFootprint.AverageTaskDuration,
Expiration: deviceFootprint.Expiration,
IsTry: true,
PendingDuration: 0,
}
s.waitingTasks = append(s.waitingTasks, task)
}
}
}
}
}
}
func maxlen(s []string) int {
max := 0
for _, str := range s {
if len(str) > max {
max = len(str)
}
}
return max
}
type builderDef struct {
luciProject string
bucket string
name string
}
func parseBuilderDef(input string) (builderDef, error) {
strSplit := strings.Split(input, "/")
if len(strSplit) != 3 {
return builderDef{},
errors.New(fmt.Sprintf(
"Malformed input to parseBuilderDef, expected 'project/bucket/builder', got %s",
input,
))
}
return builderDef{
luciProject: strSplit[0],
bucket: strSplit[1],
name: strSplit[2],
}, nil
}
func parseCIBuilderName(name string, bucket string) string {
// Strip the last 9 chars, the hash suffix.
res := name[0:(len(name) - 9)]
// If the first characters of the builder are the same as the bucket, strip
// them. This is because when there are two triggers with the same name,
// the bucket gets prepended to differentiate them.
if bucket+"-" == name[0:(len(bucket)+1)] {
res = res[len(bucket)+1:]
}
return res
}
func percentile(d []time.Duration, p int) time.Duration {
return d[len(d)*p/100]
}
type LuciConfigurationFiles struct {
commitQueueConfig *cvpb.Config
luciSchedulerConfig *schedulerpb.ProjectConfig
}
func readLuciConfigs(path string) LuciConfigurationFiles {
commitQueueConfig := &cvpb.Config{}
luciSchedulerConfig := &schedulerpb.ProjectConfig{}
// Rather than erroring out if we don't find the file or it's improperly
// written, we print out the error and continue, this is due to a lack
// of guarantee that lucicfg directories will contain all expected files.
if err := readTextprotoFile(filepath.Join(path, "commit-queue.cfg"), commitQueueConfig); err != nil {
log.Print(err)
}
if err := readTextprotoFile(filepath.Join(path, "luci-scheduler.cfg"), luciSchedulerConfig); err != nil {
log.Print(err)
}
return LuciConfigurationFiles{
commitQueueConfig: commitQueueConfig,
luciSchedulerConfig: luciSchedulerConfig,
}
}
func readTextprotoFile(path string, msg proto.Message) error {
in, err := os.ReadFile(path)
if err != nil {
return err
}
if err := (prototext.UnmarshalOptions{AllowPartial: true}).Unmarshal([]byte(in), msg); err != nil {
return err
}
return nil
}
func reportSimulationDifferences(
deviceTypes map[string]int,
baselineSimulation, proposedSimulation []types.StepStatus,
) {
for idx, baselineStep := range baselineSimulation {
proposedStep := proposedSimulation[idx]
diffStrings := []string{fmt.Sprintf("Diffs for Execution step %d", idx+1)}
for dt := range deviceTypes {
bworkers := baselineStep.IdleBots[dt]
pworkers := proposedStep.IdleBots[dt]
if bworkers != pworkers {
s := fmt.Sprintf(
"Diff available workers: %s %d | %d",
dt,
bworkers,
pworkers,
)
diffStrings = append(diffStrings, s)
}
bpending := baselineStep.WaitingTimes[dt]
ppending := proposedStep.WaitingTimes[dt]
if bpending != ppending {
s := fmt.Sprintf(
"Diff in pending minutes: %s %f | %f",
dt,
bpending.Minutes(),
ppending.Minutes(),
)
diffStrings = append(diffStrings, s)
}
}
if len(baselineStep.RunningTasks) != len(proposedStep.RunningTasks) {
s := fmt.Sprintf(
"Diff runningTasks length: %d | %d",
len(baselineStep.RunningTasks),
len(proposedStep.RunningTasks),
)
diffStrings = append(diffStrings, s)
}
if len(baselineStep.WaitingTasks) != len(proposedStep.WaitingTasks) {
s := fmt.Sprintf(
"Diff waitingTasks length: %d | %d",
len(baselineStep.WaitingTasks),
len(proposedStep.WaitingTasks),
)
diffStrings = append(diffStrings, s)
}
if len(baselineStep.TaskRuns) != len(proposedStep.TaskRuns) {
s := fmt.Sprintf(
"Diff scheduledRuns length: %d | %d",
len(baselineStep.TaskRuns),
len(proposedStep.TaskRuns),
)
diffStrings = append(diffStrings, s)
}
if len(diffStrings) > 1 {
for _, str := range diffStrings {
log.Print(str)
}
log.Print("\n")
}
}
}
func runQuery(
ctx context.Context,
client *bigquery.Client,
query string,
params []bigquery.QueryParameter, ptr any,
) error {
q := client.Query(query)
q.Parameters = params
iter, err := q.Read(ctx)
if err != nil {
return err
}
var rows []map[string]bigquery.Value
for {
row := make(map[string]bigquery.Value)
err := iter.Next(&row)
if err == iterator.Done {
break
} else if err != nil {
return err
}
rows = append(rows, row)
}
// This is a hacky way to easily deserialize rows into objects
// Copy at your own peril!
jsonRows, err := json.Marshal(rows)
if err != nil {
return err
}
return json.Unmarshal(jsonRows, &ptr)
}