| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io/ioutil" |
| "log" |
| "os" |
| "sort" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/bigquery" |
| "github.com/maruel/subcommands" |
| "go.chromium.org/luci/auth" |
| "go.fuchsia.dev/infra/cmd/builder_oracle/queries" |
| "go.fuchsia.dev/infra/cmd/builder_oracle/types" |
| "go.fuchsia.dev/infra/flagutil" |
| "golang.org/x/exp/maps" |
| "golang.org/x/exp/slices" |
| "google.golang.org/api/iterator" |
| "google.golang.org/api/option" |
| ) |
| |
| const simulateLongDesc = ` |
| Simulate the impact of adding or removing builders to/from CQ or CI. |
| ` |
| |
| // We recommend that tests constrain their flake rates to 3% or lower. |
| const flakeThreshold = 0.03 |
| |
| // This scalar allows us to squirrel away an extra amount of capacity so that we |
| // can handle unforeseen changes to our usage or availability. |
| const secretBotStash = 0.2 |
| |
| func cmdSimulate(authOpts auth.Options) *subcommands.Command { |
| return &subcommands.Command{ |
| UsageLine: "simulate -cq-add builder_a -cq-add builder_b -ci-remove builder_c", |
| ShortDesc: "Simulate builder configuration changes.", |
| LongDesc: simulateLongDesc, |
| CommandRun: func() subcommands.CommandRun { |
| c := &simulateCmd{} |
| c.Init(authOpts) |
| return c |
| }, |
| } |
| } |
| |
| type simulateCmd struct { |
| commonFlags |
| |
| cqAdd flagutil.RepeatedStringValue |
| cqRemove flagutil.RepeatedStringValue |
| ciAdd flagutil.RepeatedStringValue |
| ciRemove flagutil.RepeatedStringValue |
| } |
| |
| func (c *simulateCmd) Init(defaultAuthOpts auth.Options) { |
| descs := map[string]string{ |
| "ci-add": "Builder to be added to CI, multiple instances of this flag are allowed", |
| "ci-remove": "Builder to be removed from CI, multiple instances of this flag are allowed", |
| "cq-add": "Builder to be added to presubmit, multiple instances of this flag are allowed", |
| "cq-remove": "Builder to be removed from presubmit, multiple instances of this flag are allowed", |
| } |
| c.commonFlags.Init(defaultAuthOpts) |
| c.Flags.Var(&c.ciAdd, "ci-add", descs["ci-add"]) |
| c.Flags.Var(&c.ciRemove, "ci-remove", descs["ci-remove"]) |
| c.Flags.Var(&c.cqAdd, "cq-add", descs["cq-add"]) |
| c.Flags.Var(&c.cqRemove, "cq-remove", descs["cq-remove"]) |
| } |
| |
| func (c *simulateCmd) parseArgs() error { |
| if err := c.commonFlags.Parse(); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (c *simulateCmd) Run(a subcommands.Application, _ []string, _ subcommands.Env) int { |
| if err := c.parseArgs(); err != nil { |
| fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
| return 1 |
| } |
| |
| if err := c.main(); err != nil { |
| fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
| return 1 |
| } |
| return 0 |
| } |
| |
| func (c *simulateCmd) main() error { |
| ctx := context.Background() |
| |
| authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts) |
| tokenSource, err := authenticator.TokenSource() |
| if err != nil { |
| if err == auth.ErrLoginRequired { |
| fmt.Fprintf(os.Stderr, "You need to login first by running:\n") |
| fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " ")) |
| } |
| return err |
| } |
| |
| if c.quiet { |
| log.SetOutput(ioutil.Discard) |
| } |
| |
| client, err := bigquery.NewClient(ctx, "fuchsia-infra", option.WithTokenSource(tokenSource)) |
| if err != nil { |
| return err |
| } |
| |
| return c.runSimulation(ctx, client) |
| } |
| |
| // Query strings are defined in queries.go for ease of readability |
| func (c *simulateCmd) runSimulation(ctx context.Context, client *bigquery.Client) error { |
| pst, err := time.LoadLocation("America/Los_Angeles") |
| if err != nil { |
| return err |
| } |
| |
| // The last two weeks |
| periodOfInterestDuration := 14 * 24 * time.Hour |
| |
| endTime := time.Now().In(pst).Truncate(24 * time.Hour) |
| startTime := endTime.Add(-periodOfInterestDuration) |
| log.Printf(" Period of interest: %s - %s", startTime, endTime) |
| |
| log.Print("Fetching average subtask runtime...") |
| var avgTaskRuntimeResult []types.AvgTaskDuration |
| timeframeParams := []bigquery.QueryParameter{ |
| { |
| Name: "StartTime", |
| Value: startTime, |
| }, |
| { |
| Name: "EndTime", |
| Value: endTime, |
| }, |
| } |
| if err := runQuery(ctx, client, queries.AvgTaskRuntime, timeframeParams, &avgTaskRuntimeResult); err != nil { |
| return err |
| } |
| if len(avgTaskRuntimeResult) == 0 { |
| return errors.New("no results for avgTaskRuntimeQuery") |
| } |
| avgTaskRuntime := avgTaskRuntimeResult[0].AvgDuration |
| |
| log.Print("Fetching device counts...") |
| var deviceCounts []types.DeviceCount |
| if err := runQuery(ctx, client, queries.BotCounts, timeframeParams, &deviceCounts); err != nil { |
| return err |
| } |
| |
| availableBots := map[string]int{} |
| for _, dc := range deviceCounts { |
| // ex: internal_Cavium |
| availableBots[dc.DeviceType] = int((1 - secretBotStash) * float64(dc.BotCount)) |
| } |
| |
| log.Print("Fetching testing task runs during period of interest...") |
| |
| var testingTaskRuns []types.TestingTask |
| if err := runQuery(ctx, client, queries.TestingTasks, timeframeParams, &testingTaskRuns); err != nil { |
| return err |
| } |
| sort.SliceStable(testingTaskRuns, func(i, j int) bool { |
| if testingTaskRuns[i].CreateTime != testingTaskRuns[j].CreateTime { |
| return testingTaskRuns[i].CreateTime.Before(testingTaskRuns[j].CreateTime) |
| } else { |
| return testingTaskRuns[i].Duration < testingTaskRuns[j].Duration |
| } |
| }) |
| |
| taskCountPerBuilder := map[string]int{} |
| |
| for _, run := range testingTaskRuns { |
| if run.Duration == 0 { |
| run.Duration = avgTaskRuntime |
| } |
| taskCountPerBuilder[run.Builder] += 1 |
| } |
| log.Printf(" %d rows found", len(testingTaskRuns)) |
| |
| if c.verbose { |
| fmt.Println("Tasks:") |
| for builder, count := range taskCountPerBuilder { |
| fmt.Println("Task Count: ", builder, " | ", count) |
| } |
| fmt.Println("Bots:") |
| for dt, count := range deviceCounts { |
| fmt.Println("Bot Count: ", dt, " | ", count) |
| } |
| } |
| |
| builderProfileMap := map[string]types.BuilderProfile{} |
| log.Print("Fetching statistics on proposed builder additions...") |
| |
| var builderProfileSlice []types.BuilderProfile |
| if err := runQuery(ctx, client, queries.BuilderTestingTaskProfiles, timeframeParams, &builderProfileSlice); err != nil { |
| return err |
| } |
| for _, profile := range builderProfileSlice { |
| builderProfileMap[profile.Builder] = profile |
| } |
| |
| var cqAttempts []types.CommitQueueAttempt |
| if len(c.cqAdd) > 0 { |
| log.Print("Fetching historical presubmit events...") |
| if err := runQuery(ctx, client, queries.CommitQueueAttempts, timeframeParams, &cqAttempts); err != nil { |
| return err |
| } |
| } |
| |
| sim := Simulator{ |
| baseline: true, |
| startTime: startTime, |
| endTime: endTime, |
| originalIdleBots: availableBots, |
| originalTaskRuns: testingTaskRuns, |
| originalCqAttempts: cqAttempts, |
| builderProfiles: builderProfileMap, |
| cqRemove: c.cqRemove, |
| ciRemove: c.ciRemove, |
| cqAdd: c.cqAdd, |
| ciAdd: c.ciAdd, |
| } |
| |
| log.Print("Running Simulation to gather baseline information...") |
| baseline := sim.simulate() |
| |
| log.Print("Running Simulation with proposed changes...") |
| sim.baseline = false |
| proposed := sim.simulate() |
| log.Print("Simulation Complete!") |
| |
| historicalPending := map[string]time.Duration{} |
| for _, testingTask := range testingTaskRuns { |
| historicalPending[testingTask.DeviceType] += testingTask.PendingDuration |
| } |
| |
| if c.veryVerbose { |
| reportSimulationDifferences(availableBots, baseline.StepResults, proposed.StepResults) |
| } |
| |
| c.reportRecommendation( |
| availableBots, |
| baseline, |
| proposed, |
| historicalPending, |
| builderProfileMap, |
| ) |
| |
| return nil |
| } |
| |
| // reportRecommendation is a function that prints out any recommended actions |
| // for proposed changes. If no changes are detected, it prints out the raw stats. |
| func (c *simulateCmd) reportRecommendation( |
| availableBots map[string]int, |
| baseline SimulateResults, |
| proposed SimulateResults, |
| historicalPending map[string]time.Duration, |
| builderProfiles map[string]types.BuilderProfile) { |
| |
| dts := maps.Keys(availableBots) |
| sort.Strings(dts) |
| |
| // special case if someone invokes simulate with no builder changes |
| if len(c.cqAdd)+len(c.ciAdd)+len(c.ciRemove)+len(c.cqRemove) == 0 { |
| fmt.Println("No changes to builders detected, reporting raw statistics") |
| for _, dt := range dts { |
| fmt.Println(dt) |
| fmt.Printf( |
| "Average simulated pending time: %.2f seconds\n", |
| baseline.BotStats[dt].PendingMean.Seconds(), |
| ) |
| |
| fmt.Printf( |
| "P75 simulated pending time: %.2f seconds\n", |
| baseline.BotStats[dt].PendingP75.Seconds(), |
| ) |
| fmt.Printf( |
| "P99 simulated pending time: %.2f seconds\n", |
| baseline.BotStats[dt].PendingP99.Seconds(), |
| ) |
| fmt.Printf( |
| "Simulated device availability: %.2f %%\n", |
| baseline.BotStats[dt].AvailableCapacityPercent, |
| ) |
| fmt.Println("Builders using this device:") |
| sort.Strings(baseline.BotStats[dt].Builders) |
| for _, builder := range baseline.BotStats[dt].Builders { |
| fmt.Println(" " + builder) |
| } |
| fmt.Print("\n") |
| } |
| return |
| } |
| |
| // Identify longpole builders. |
| // TODO(fxbug.dev/99233): refine this to per-project analysis. |
| builderDurations := []time.Duration{} |
| for _, bp := range builderProfiles { |
| builderDurations = append(builderDurations, bp.AverageBuildDuration) |
| } |
| slices.Sort(builderDurations) |
| durationP90 := percentile(builderDurations, 90) |
| |
| // Go through added builders and report any warnings. |
| addedBuilders := c.ciAdd |
| for _, builder := range c.cqAdd { |
| if !slices.Contains(addedBuilders, builder) { |
| addedBuilders = append(addedBuilders, builder) |
| } |
| } |
| |
| warningCount := 0 |
| benefitCount := 0 |
| for _, builder := range addedBuilders { |
| |
| profile := builderProfiles[builder] |
| // Raise warning if low count of builds recorded for added builders. |
| if profile.NumBuilds == 0 { |
| fmt.Printf( |
| "WARNING: zero runs detected for added builder %s. Proceed with caution, as impact to infrastructure is unquantified.\n\n", |
| builder, |
| ) |
| warningCount++ |
| } else if profile.NumBuilds < 10 { |
| fmt.Printf( |
| "WARNING: added builder %s's profile was based off of a statistically low number of recorded runs, simulation fidelity may be impacted.\n\n", |
| builder, |
| ) |
| warningCount++ |
| } |
| |
| // Raise warning if builder is a long pole. |
| // TODO(fxbug.dev/99233): Make this per project |
| if profile.AverageBuildDuration > durationP90 { |
| fmt.Printf( |
| "WARNING: added builder %s is a long pole for the context it is running in at %0.2f seconds.\n\n", |
| builder, |
| profile.AverageBuildDuration.Seconds(), |
| ) |
| warningCount++ |
| } |
| |
| // Raise warning if builder is flaky. |
| if profile.FlakeRate > flakeThreshold { |
| fmt.Printf( |
| "WARNING: builder %s added to presubmit has a high flake rate at %.2f%%, this exceeds our recommended flake rate for presubmit.\n\n", |
| builder, |
| 100*profile.FlakeRate, |
| ) |
| warningCount++ |
| } |
| } |
| |
| // Raise warning if bot pending times were impacted. |
| for dt, botstat := range baseline.BotStats { |
| deltaAvailableCapacityPercent := proposed.BotStats[dt].AvailableCapacityPercent - botstat.AvailableCapacityPercent |
| |
| // We choose the AvailableCapacityPercent as the best proxy for an improvement or detriment |
| // Since the pending times are derived from device availability. |
| if deltaAvailableCapacityPercent < 0 { |
| warningCount++ |
| fmt.Printf( |
| "WARNING: Proposed changes reduce the availability of device: %s (details below).\n", |
| dt, |
| ) |
| c.printPendingDeltas( |
| dt, |
| proposed.BotStats[dt], |
| botstat, |
| ) |
| } |
| if deltaAvailableCapacityPercent > 0 { |
| benefitCount++ |
| |
| fmt.Printf( |
| "This change will improve the pending times for device %s! (details below).\n", |
| dt, |
| ) |
| c.printPendingDeltas( |
| dt, |
| proposed.BotStats[dt], |
| botstat, |
| ) |
| } |
| } |
| |
| if warningCount > 0 { |
| fmt.Printf( |
| "Builder Oracle found %d issue(s) with the proposed builder changes, please review the warning output.\n", |
| warningCount, |
| ) |
| } else if benefitCount > 0 { |
| fmt.Println("Congratulations, these changes made improvements to bot availability, examine the benefits in the output above.") |
| } else { |
| fmt.Printf("No issues detected with proposed changes, you may proceed with confidence!\n") |
| } |
| } |
| |
| // printPendingDeltas is a function that prints the change in pendingTimes for |
| // the proposed change. |
| func (c *simulateCmd) printPendingDeltas( |
| dt string, |
| proposed DeviceStatistics, |
| baseline DeviceStatistics, |
| ) { |
| fmt.Println(dt) |
| fmt.Printf( |
| "PendingMean: (before -> after) %.2fs -> %.2fs | delta: %.2fs\n", |
| baseline.PendingMean.Seconds(), |
| proposed.PendingMean.Seconds(), |
| (proposed.PendingMean - baseline.PendingMean).Seconds(), |
| ) |
| fmt.Printf( |
| "PendingP75: (before -> after) %.2fs -> %.2fs | delta: %.2fs\n", |
| baseline.PendingP75.Seconds(), |
| proposed.PendingP75.Seconds(), |
| (proposed.PendingP75 - baseline.PendingP75).Seconds(), |
| ) |
| fmt.Printf( |
| "PendingP99: (before -> after) %.2fs -> %.2fs | delta: %.2fs\n", |
| baseline.PendingP99.Seconds(), |
| proposed.PendingP99.Seconds(), |
| (proposed.PendingP99 - baseline.PendingP99).Seconds(), |
| ) |
| fmt.Printf( |
| "Availability: (before -> after) %.2f%% -> %.2f%% | delta: %.2f%%\n\n", |
| baseline.AvailableCapacityPercent, |
| proposed.AvailableCapacityPercent, |
| proposed.AvailableCapacityPercent-baseline.AvailableCapacityPercent, |
| ) |
| } |
| |
| type SimulateResults struct { |
| BotStats map[string]DeviceStatistics |
| StepResults []types.StepStatus |
| } |
| |
| type DeviceStatistics struct { |
| PendingMean time.Duration |
| PendingP75 time.Duration |
| PendingP99 time.Duration |
| AvailableCapacityPercent float64 |
| Builders []string |
| } |
| |
| type RecoveringBot struct { |
| DeviceType string |
| RecoveryTime time.Time |
| } |
| |
| type Simulator struct { |
| baseline bool // whether the simulation should factor in builder changes |
| cqAttempts []types.CommitQueueAttempt // working copy of originalCqAttempts |
| originalCqAttempts []types.CommitQueueAttempt // incoming presubmit verification requests |
| recoveringBots []RecoveringBot // bots that are performing maintenance between tasks |
| ciAdd []string // builders added to ci |
| ciRemove []string // builders removed from ci |
| cqAdd []string // builders added to cq |
| cqRemove []string // builders removed from cq |
| StepResults []types.StepStatus // debug logging for observing the simulation in detail |
| expiredTasks []types.TestingTask // tasks that were pending for longer than their expiration timeout |
| finishedTasks []types.TestingTask // tasks which have completed |
| runningTasks []types.TestingTask // tasks that are using a bot and have not completed |
| waitingTasks []types.TestingTask // tasks that are waiting for an available bot |
| originalTaskRuns []types.TestingTask // historical tasks observed over the simulated period |
| taskRuns []types.TestingTask // working copy of originalTaskRuns |
| builderProfiles map[string]types.BuilderProfile // builder information and bot footprint |
| botAvailability map[string]time.Duration // a mapping of bot names to duration that the bot had more than 0 available |
| idleBots map[string]int // working copy of originalIdleBots |
| originalIdleBots map[string]int // available bots for tasks |
| ciTriggers map[string]time.Time // next timestamp to trigger new ci builders |
| timestepIncrement time.Duration // how far in time the simulation should move between steps |
| currTime time.Time // the current time in the simulation |
| endTime time.Time // the end of the simulated period |
| startTime time.Time // the start of the simulated period |
| } |
| |
| // Our hardware-backed bots require recovery overhead, which does not include gce or Cavium |
| var recoveryExcludedBots = []string{"gce", "Cavium"} |
| |
| func (s *Simulator) simulate() SimulateResults { |
| // initialize working variables for the new simulation. |
| s.initVars() |
| |
| for s.currTime = s.startTime; s.currTime.Before(s.endTime); s.currTime = s.currTime.Add(s.timestepIncrement) { |
| // Check if any bots have finished their tasks and recovery period and |
| // make them available. |
| s.recoverBots() |
| |
| // Prune finished tasks from the runningTask queue. |
| s.processFinishedTasks() |
| |
| // Add incoming historical traffic to the waitingTask queue. |
| s.processIncomingTasks() |
| |
| // Add traffic from proposed changes to the waitingTask queue. |
| if !s.baseline { |
| s.processCITriggers() |
| s.processCQAttempts() |
| } |
| |
| // Start waiting tasks if there are available bots. |
| s.processWaitingTasks() |
| |
| // Increment our waiting and runtime counts. |
| for idx := range s.waitingTasks { |
| s.waitingTasks[idx].PendingDuration += s.timestepIncrement |
| } |
| |
| for bot, amt := range s.idleBots { |
| // track if the bot type had units available |
| if amt != 0 { |
| s.botAvailability[bot] += s.timestepIncrement |
| } |
| } |
| |
| // Log this step in the simulation for debugging. |
| s.StepResults = append(s.StepResults, types.StepStatus{ |
| CurrentTime: s.currTime, |
| RunningTasks: s.runningTasks, |
| WaitingTasks: s.waitingTasks, |
| IdleBots: maps.Clone(s.idleBots), |
| TaskRuns: s.taskRuns, |
| }) |
| } |
| |
| // Calculate the mean/p50/p99 pending time for tasks per device type |
| |
| pendingTimesByBots := map[string][]time.Duration{} |
| uniqueBuildersByBot := map[string][]string{} |
| BotStats := map[string]DeviceStatistics{} |
| |
| for _, task := range append(s.finishedTasks, s.expiredTasks...) { |
| pendingTimesByBots[task.DeviceType] = append(pendingTimesByBots[task.DeviceType], task.PendingDuration) |
| if !slices.Contains(uniqueBuildersByBot[task.DeviceType], task.Builder) { |
| uniqueBuildersByBot[task.DeviceType] = append(uniqueBuildersByBot[task.DeviceType], task.Builder) |
| } |
| } |
| |
| for bot, pt := range pendingTimesByBots { |
| slices.Sort(pt) |
| |
| sum := 0 * time.Second |
| for _, p := range pt { |
| sum += p |
| } |
| avg := sum / time.Duration(len(pt)) |
| p75 := percentile(pt, 75) |
| p99 := percentile(pt, 99) |
| BotStats[bot] = DeviceStatistics{ |
| PendingMean: avg, |
| PendingP75: p75, |
| PendingP99: p99, |
| AvailableCapacityPercent: 100 * float64(s.botAvailability[bot]) / float64(s.endTime.Sub(s.startTime)), |
| Builders: uniqueBuildersByBot[bot], |
| } |
| } |
| |
| return SimulateResults{ |
| BotStats: BotStats, |
| StepResults: s.StepResults, |
| } |
| } |
| |
| // initVars is a helper that initializes working variables before simulate() calls. |
| func (s *Simulator) initVars() { |
| s.timestepIncrement = 5 * time.Second |
| s.expiredTasks = []types.TestingTask{} |
| s.finishedTasks = []types.TestingTask{} |
| s.runningTasks = []types.TestingTask{} |
| s.waitingTasks = []types.TestingTask{} |
| s.recoveringBots = []RecoveringBot{} |
| s.idleBots = maps.Clone(s.originalIdleBots) |
| s.StepResults = []types.StepStatus{} |
| s.botAvailability = map[string]time.Duration{} |
| |
| // This is a rough approximation for adding builders to CI. This assumes |
| // that there is a constant influx of CLs to merit triggering the builder |
| // after each run has completed. |
| // TODO(fxbug.dev/95557): Track merge events and use |
| // max_concurrent_invocations and batching rules to simulate with higher |
| // fidelity. |
| s.ciTriggers = map[string]time.Time{} |
| for _, builder := range s.ciAdd { |
| s.ciTriggers[builder] = s.startTime |
| } |
| s.taskRuns = s.originalTaskRuns |
| s.cqAttempts = s.originalCqAttempts |
| } |
| |
| // recoverBots is a helper that adds bots to the idleBots pool if they've finished their tasks and recovered. |
| func (s *Simulator) recoverBots() { |
| var updatedRecoveringBots []RecoveringBot |
| for _, bot := range s.recoveringBots { |
| if s.currTime.After(bot.RecoveryTime) { |
| s.idleBots[bot.DeviceType] += 1 |
| } else { |
| updatedRecoveringBots = append(updatedRecoveringBots, bot) |
| } |
| } |
| s.recoveringBots = updatedRecoveringBots |
| } |
| |
| func (s *Simulator) processFinishedTasks() { |
| var updatedRunningTasks []types.TestingTask |
| for _, task := range s.runningTasks { |
| if s.currTime.After(task.CreateTime.Add(task.Duration)) { |
| s.finishedTasks = append(s.finishedTasks, task) |
| if slices.Contains(recoveryExcludedBots, task.DeviceType) { |
| s.idleBots[task.DeviceType] += 1 |
| } else { |
| s.recoveringBots = append( |
| s.recoveringBots, |
| RecoveringBot{ |
| DeviceType: task.DeviceType, |
| RecoveryTime: s.currTime.Add(75 * time.Second), |
| }, |
| ) |
| } |
| } else { |
| updatedRunningTasks = append(updatedRunningTasks, task) |
| } |
| } |
| s.runningTasks = updatedRunningTasks |
| } |
| |
| // processWaitingTasks is a helper that assigns waiting tasks to an available bot. |
| // If no bot is available, it stays in the waitingTasks queue. |
| func (s *Simulator) processWaitingTasks() { |
| var updatedWaitingTasks []types.TestingTask |
| for _, task := range s.waitingTasks { |
| |
| // Remove this task if it is pending over its configured expiration time. |
| if task.PendingDuration > task.Expiration { |
| s.expiredTasks = append(s.expiredTasks, task) |
| continue |
| } |
| |
| if !s.baseline { |
| // Drop the task if it is proposed for removal in ci/cqRemove. |
| if task.IsTry && len(s.cqRemove) > 0 { |
| if slices.Contains(s.cqRemove, task.Builder) { |
| continue |
| } |
| } else if !task.IsTry && len(s.ciRemove) > 0 { |
| if slices.Contains(s.ciRemove, task.Builder) { |
| continue |
| } |
| } |
| } |
| if s.idleBots[task.DeviceType] > 0 { |
| s.idleBots[task.DeviceType] -= 1 |
| s.runningTasks = append(s.runningTasks, task) |
| } else { |
| updatedWaitingTasks = append(updatedWaitingTasks, task) |
| } |
| } |
| s.waitingTasks = updatedWaitingTasks |
| } |
| |
| // processIncomingTasks is a helper that moves tasks from the taskRuns queue to the waitingTasks queue. |
| // Tasks are only moved if the simulation is past the point they were created. |
| func (s *Simulator) processIncomingTasks() { |
| for len(s.taskRuns) > 0 && s.taskRuns[0].CreateTime.Before(s.currTime) { |
| task := s.taskRuns[0] |
| task.PendingDuration = 0 |
| s.taskRuns = s.taskRuns[1:] |
| s.waitingTasks = append(s.waitingTasks, task) |
| } |
| } |
| |
| // processCiTriggers is a helper that enqueues subtasks for the simulate command. |
| // Added CI builders have an associated timestamp that indicates when we should |
| // trigger the next round of subtasks. |
| func (s *Simulator) processCITriggers() { |
| for builder, nextTrigger := range s.ciTriggers { |
| if s.currTime.After(nextTrigger) { |
| // Trigger all subtasks in profile. |
| for _, deviceFootprint := range s.builderProfiles[builder].DeviceFootprints { |
| for i := 0; i < deviceFootprint.AverageTasks; i++ { |
| // Internal CI |
| task := types.TestingTask{ |
| Builder: builder, |
| CreateTime: s.currTime, |
| DeviceType: "internal_" + deviceFootprint.DeviceType, |
| Duration: deviceFootprint.AverageTaskDuration, |
| Expiration: deviceFootprint.Expiration, |
| IsTry: false, |
| PendingDuration: 0, |
| } |
| s.waitingTasks = append(s.waitingTasks, task) |
| } |
| } |
| // Set the next trigger. |
| s.ciTriggers[builder] = s.currTime.Add(s.builderProfiles[builder].AverageBuildDuration) |
| } |
| } |
| } |
| |
| // processCqAttempts is a helper that enqueues subtasks for the simulate command. |
| // It loops through historical presubmit attempts and enqueues tasks for each |
| // added CQ builder. |
| // (TODO:fxbug.dev/95819) - Only trigger cq builders for the projects they're |
| // enabled for. |
| func (s *Simulator) processCQAttempts() { |
| for ; len(s.cqAttempts) > 0 && (s.cqAttempts)[0].StartTime.Before(s.currTime); s.cqAttempts = s.cqAttempts[1:] { |
| cqAttempt := s.cqAttempts[0] |
| externality := "internal_" |
| if strings.HasPrefix(cqAttempt.ConfigGroup, "fuchsia") { |
| externality = "external_" |
| } |
| for _, builder := range s.cqAdd { |
| // Trigger all subtasks in profile. |
| for _, deviceFootprint := range s.builderProfiles[builder].DeviceFootprints { |
| for i := 0; i < deviceFootprint.AverageTasks; i++ { |
| task := types.TestingTask{ |
| Builder: builder, |
| CreateTime: s.currTime, |
| DeviceType: externality + deviceFootprint.DeviceType, |
| Duration: deviceFootprint.AverageTaskDuration, |
| Expiration: deviceFootprint.Expiration, |
| IsTry: true, |
| PendingDuration: 0, |
| } |
| s.waitingTasks = append(s.waitingTasks, task) |
| } |
| } |
| } |
| } |
| } |
| |
| func runQuery( |
| ctx context.Context, |
| client *bigquery.Client, |
| query string, |
| params []bigquery.QueryParameter, ptr interface{}, |
| ) error { |
| q := client.Query(query) |
| q.Parameters = params |
| iter, err := q.Read(ctx) |
| if err != nil { |
| return err |
| } |
| |
| var rows []map[string]bigquery.Value |
| for { |
| row := make(map[string]bigquery.Value) |
| err := iter.Next(&row) |
| if err == iterator.Done { |
| break |
| } else if err != nil { |
| return err |
| } |
| rows = append(rows, row) |
| } |
| |
| // This is a hacky way to easily deserialize rows into objects |
| // Copy at your own peril! |
| jsonRows, err := json.Marshal(rows) |
| if err != nil { |
| return err |
| } |
| |
| return json.Unmarshal(jsonRows, &ptr) |
| } |
| |
| func reportSimulationDifferences( |
| deviceTypes map[string]int, |
| baselineSimulation, proposedSimulation []types.StepStatus, |
| ) { |
| for idx, baselineStep := range baselineSimulation { |
| proposedStep := proposedSimulation[idx] |
| diffStrings := []string{fmt.Sprintf("Diffs for Execution step %d", idx+1)} |
| |
| for dt := range deviceTypes { |
| bworkers := baselineStep.IdleBots[dt] |
| pworkers := proposedStep.IdleBots[dt] |
| if bworkers != pworkers { |
| s := fmt.Sprintf( |
| "Diff available workers: %s %d | %d", |
| dt, |
| bworkers, |
| pworkers, |
| ) |
| diffStrings = append(diffStrings, s) |
| } |
| bpending := baselineStep.WaitingTimes[dt] |
| ppending := proposedStep.WaitingTimes[dt] |
| if bpending != ppending { |
| s := fmt.Sprintf( |
| "Diff in pending minutes: %s %f | %f", |
| dt, |
| bpending.Minutes(), |
| ppending.Minutes(), |
| ) |
| diffStrings = append(diffStrings, s) |
| } |
| } |
| |
| if len(baselineStep.RunningTasks) != len(proposedStep.RunningTasks) { |
| s := fmt.Sprintf( |
| "Diff runningTasks length: %d | %d", |
| len(baselineStep.RunningTasks), |
| len(proposedStep.RunningTasks), |
| ) |
| diffStrings = append(diffStrings, s) |
| } |
| if len(baselineStep.WaitingTasks) != len(proposedStep.WaitingTasks) { |
| s := fmt.Sprintf( |
| "Diff waitingTasks length: %d | %d", |
| len(baselineStep.WaitingTasks), |
| len(proposedStep.WaitingTasks), |
| ) |
| diffStrings = append(diffStrings, s) |
| } |
| if len(baselineStep.TaskRuns) != len(proposedStep.TaskRuns) { |
| s := fmt.Sprintf( |
| "Diff scheduledRuns length: %d | %d", |
| len(baselineStep.TaskRuns), |
| len(proposedStep.TaskRuns), |
| ) |
| diffStrings = append(diffStrings, s) |
| } |
| |
| if len(diffStrings) > 1 { |
| for _, str := range diffStrings { |
| log.Print(str) |
| } |
| log.Print("\n") |
| } |
| } |
| } |
| |
| func maxLength(s map[string]int) int { |
| max := 0 |
| for str := range s { |
| if len(str) > max { |
| max = len(str) |
| } |
| } |
| return max |
| } |
| |
| func percentile(d []time.Duration, p int) time.Duration { |
| return d[len(d)*p/100] |
| } |