blob: c4677abd00980bf57dc700001a663cb0b1b6dd67 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"sort"
"strings"
"time"
"cloud.google.com/go/bigquery"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth"
"go.fuchsia.dev/infra/cmd/builder_oracle/queries"
"go.fuchsia.dev/infra/cmd/builder_oracle/types"
"go.fuchsia.dev/infra/flagutil"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
const simulateLongDesc = `
Simulate the impact of adding or removing builders to/from CQ or CI.
`
// We recommend that tests constrain their flake rates to 3% or lower.
const flakeThreshold = 0.03
// This scalar allows us to squirrel away an extra amount of capacity so that we
// can handle unforeseen changes to our usage or availability.
const secretBotStash = 0.2
func cmdSimulate(authOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "simulate -cq-add builder_a -cq-add builder_b -ci-remove builder_c",
ShortDesc: "Simulate builder configuration changes.",
LongDesc: simulateLongDesc,
CommandRun: func() subcommands.CommandRun {
c := &simulateCmd{}
c.Init(authOpts)
return c
},
}
}
type simulateCmd struct {
commonFlags
cqAdd flagutil.RepeatedStringValue
cqRemove flagutil.RepeatedStringValue
ciAdd flagutil.RepeatedStringValue
ciRemove flagutil.RepeatedStringValue
}
func (c *simulateCmd) Init(defaultAuthOpts auth.Options) {
descs := map[string]string{
"ci-add": "Builder to be added to CI, multiple instances of this flag are allowed",
"ci-remove": "Builder to be removed from CI, multiple instances of this flag are allowed",
"cq-add": "Builder to be added to presubmit, multiple instances of this flag are allowed",
"cq-remove": "Builder to be removed from presubmit, multiple instances of this flag are allowed",
}
c.commonFlags.Init(defaultAuthOpts)
c.Flags.Var(&c.ciAdd, "ci-add", descs["ci-add"])
c.Flags.Var(&c.ciRemove, "ci-remove", descs["ci-remove"])
c.Flags.Var(&c.cqAdd, "cq-add", descs["cq-add"])
c.Flags.Var(&c.cqRemove, "cq-remove", descs["cq-remove"])
}
func (c *simulateCmd) parseArgs() error {
if err := c.commonFlags.Parse(); err != nil {
return err
}
return nil
}
func (c *simulateCmd) Run(a subcommands.Application, _ []string, _ subcommands.Env) int {
if err := c.parseArgs(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
if err := c.main(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
return 0
}
func (c *simulateCmd) main() error {
ctx := context.Background()
authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts)
tokenSource, err := authenticator.TokenSource()
if err != nil {
if err == auth.ErrLoginRequired {
fmt.Fprintf(os.Stderr, "You need to login first by running:\n")
fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " "))
}
return err
}
if c.quiet {
log.SetOutput(ioutil.Discard)
}
client, err := bigquery.NewClient(ctx, "fuchsia-infra", option.WithTokenSource(tokenSource))
if err != nil {
return err
}
return c.runSimulation(ctx, client)
}
// Query strings are defined in queries.go for ease of readability
func (c *simulateCmd) runSimulation(ctx context.Context, client *bigquery.Client) error {
pst, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
return err
}
// The last two weeks
periodOfInterestDuration := 14 * 24 * time.Hour
endTime := time.Now().In(pst).Truncate(24 * time.Hour)
startTime := endTime.Add(-periodOfInterestDuration)
log.Printf(" Period of interest: %s - %s", startTime, endTime)
log.Print("Fetching average subtask runtime...")
var avgTaskRuntimeResult []types.AvgTaskDuration
timeframeParams := []bigquery.QueryParameter{
{
Name: "StartTime",
Value: startTime,
},
{
Name: "EndTime",
Value: endTime,
},
}
if err := runQuery(ctx, client, queries.AvgTaskRuntime, timeframeParams, &avgTaskRuntimeResult); err != nil {
return err
}
if len(avgTaskRuntimeResult) == 0 {
return errors.New("no results for avgTaskRuntimeQuery")
}
avgTaskRuntime := avgTaskRuntimeResult[0].AvgDuration
log.Print("Fetching device counts...")
var deviceCounts []types.DeviceCount
if err := runQuery(ctx, client, queries.BotCounts, timeframeParams, &deviceCounts); err != nil {
return err
}
availableBots := map[string]int{}
for _, dc := range deviceCounts {
// ex: internal_Cavium
availableBots[dc.DeviceType] = int((1 - secretBotStash) * float64(dc.BotCount))
}
log.Print("Fetching testing task runs during period of interest...")
var testingTaskRuns []types.TestingTask
if err := runQuery(ctx, client, queries.TestingTasks, timeframeParams, &testingTaskRuns); err != nil {
return err
}
sort.SliceStable(testingTaskRuns, func(i, j int) bool {
if testingTaskRuns[i].CreateTime != testingTaskRuns[j].CreateTime {
return testingTaskRuns[i].CreateTime.Before(testingTaskRuns[j].CreateTime)
} else {
return testingTaskRuns[i].Duration < testingTaskRuns[j].Duration
}
})
taskCountPerBuilder := map[string]int{}
for _, run := range testingTaskRuns {
if run.Duration == 0 {
run.Duration = avgTaskRuntime
}
taskCountPerBuilder[run.Builder] += 1
}
log.Printf(" %d rows found", len(testingTaskRuns))
if c.verbose {
fmt.Println("Tasks:")
for builder, count := range taskCountPerBuilder {
fmt.Println("Task Count: ", builder, " | ", count)
}
fmt.Println("Bots:")
for dt, count := range deviceCounts {
fmt.Println("Bot Count: ", dt, " | ", count)
}
}
builderProfileMap := map[string]types.BuilderProfile{}
log.Print("Fetching statistics on proposed builder additions...")
var builderProfileSlice []types.BuilderProfile
if err := runQuery(ctx, client, queries.BuilderTestingTaskProfiles, timeframeParams, &builderProfileSlice); err != nil {
return err
}
for _, profile := range builderProfileSlice {
builderProfileMap[profile.Builder] = profile
}
var cqAttempts []types.CommitQueueAttempt
if len(c.cqAdd) > 0 {
log.Print("Fetching historical presubmit events...")
if err := runQuery(ctx, client, queries.CommitQueueAttempts, timeframeParams, &cqAttempts); err != nil {
return err
}
}
sim := Simulator{
baseline: true,
startTime: startTime,
endTime: endTime,
originalIdleBots: availableBots,
originalTaskRuns: testingTaskRuns,
originalCqAttempts: cqAttempts,
builderProfiles: builderProfileMap,
cqRemove: c.cqRemove,
ciRemove: c.ciRemove,
cqAdd: c.cqAdd,
ciAdd: c.ciAdd,
}
log.Print("Running Simulation to gather baseline information...")
baseline := sim.simulate()
log.Print("Running Simulation with proposed changes...")
sim.baseline = false
proposed := sim.simulate()
log.Print("Simulation Complete!")
historicalPending := map[string]time.Duration{}
for _, testingTask := range testingTaskRuns {
historicalPending[testingTask.DeviceType] += testingTask.PendingDuration
}
if c.veryVerbose {
reportSimulationDifferences(availableBots, baseline.StepResults, proposed.StepResults)
}
c.reportRecommendation(
availableBots,
baseline,
proposed,
historicalPending,
builderProfileMap,
)
return nil
}
// reportRecommendation is a function that prints out any recommended actions
// for proposed changes. If no changes are detected, it prints out the raw stats.
func (c *simulateCmd) reportRecommendation(
availableBots map[string]int,
baseline SimulateResults,
proposed SimulateResults,
historicalPending map[string]time.Duration,
builderProfiles map[string]types.BuilderProfile) {
dts := maps.Keys(availableBots)
sort.Strings(dts)
// special case if someone invokes simulate with no builder changes
if len(c.cqAdd)+len(c.ciAdd)+len(c.ciRemove)+len(c.cqRemove) == 0 {
fmt.Println("No changes to builders detected, reporting raw statistics")
for _, dt := range dts {
fmt.Println(dt)
fmt.Printf(
"Average simulated pending time: %.2f seconds\n",
baseline.BotStats[dt].PendingMean.Seconds(),
)
fmt.Printf(
"P75 simulated pending time: %.2f seconds\n",
baseline.BotStats[dt].PendingP75.Seconds(),
)
fmt.Printf(
"P99 simulated pending time: %.2f seconds\n",
baseline.BotStats[dt].PendingP99.Seconds(),
)
fmt.Printf(
"Simulated device availability: %.2f %%\n",
baseline.BotStats[dt].AvailableCapacityPercent,
)
fmt.Println("Builders using this device:")
sort.Strings(baseline.BotStats[dt].Builders)
for _, builder := range baseline.BotStats[dt].Builders {
fmt.Println(" " + builder)
}
fmt.Print("\n")
}
return
}
// Identify longpole builders.
// TODO(fxbug.dev/99233): refine this to per-project analysis.
builderDurations := []time.Duration{}
for _, bp := range builderProfiles {
builderDurations = append(builderDurations, bp.AverageBuildDuration)
}
slices.Sort(builderDurations)
durationP90 := percentile(builderDurations, 90)
// Go through added builders and report any warnings.
addedBuilders := c.ciAdd
for _, builder := range c.cqAdd {
if !slices.Contains(addedBuilders, builder) {
addedBuilders = append(addedBuilders, builder)
}
}
warningCount := 0
benefitCount := 0
for _, builder := range addedBuilders {
profile := builderProfiles[builder]
// Raise warning if low count of builds recorded for added builders.
if profile.NumBuilds == 0 {
fmt.Printf(
"WARNING: zero runs detected for added builder %s. Proceed with caution, as impact to infrastructure is unquantified.\n\n",
builder,
)
warningCount++
} else if profile.NumBuilds < 10 {
fmt.Printf(
"WARNING: added builder %s's profile was based off of a statistically low number of recorded runs, simulation fidelity may be impacted.\n\n",
builder,
)
warningCount++
}
// Raise warning if builder is a long pole.
// TODO(fxbug.dev/99233): Make this per project
if profile.AverageBuildDuration > durationP90 {
fmt.Printf(
"WARNING: added builder %s is a long pole for the context it is running in at %0.2f seconds.\n\n",
builder,
profile.AverageBuildDuration.Seconds(),
)
warningCount++
}
// Raise warning if builder is flaky.
if profile.FlakeRate > flakeThreshold {
fmt.Printf(
"WARNING: builder %s added to presubmit has a high flake rate at %.2f%%, this exceeds our recommended flake rate for presubmit.\n\n",
builder,
100*profile.FlakeRate,
)
warningCount++
}
}
// Raise warning if bot pending times were impacted.
for dt, botstat := range baseline.BotStats {
deltaAvailableCapacityPercent := proposed.BotStats[dt].AvailableCapacityPercent - botstat.AvailableCapacityPercent
// We choose the AvailableCapacityPercent as the best proxy for an improvement or detriment
// Since the pending times are derived from device availability.
if deltaAvailableCapacityPercent < 0 {
warningCount++
fmt.Printf(
"WARNING: Proposed changes reduce the availability of device: %s (details below).\n",
dt,
)
c.printPendingDeltas(
dt,
proposed.BotStats[dt],
botstat,
)
}
if deltaAvailableCapacityPercent > 0 {
benefitCount++
fmt.Printf(
"This change will improve the pending times for device %s! (details below).\n",
dt,
)
c.printPendingDeltas(
dt,
proposed.BotStats[dt],
botstat,
)
}
}
if warningCount > 0 {
fmt.Printf(
"Builder Oracle found %d issue(s) with the proposed builder changes, please review the warning output.\n",
warningCount,
)
} else if benefitCount > 0 {
fmt.Println("Congratulations, these changes made improvements to bot availability, examine the benefits in the output above.")
} else {
fmt.Printf("No issues detected with proposed changes, you may proceed with confidence!\n")
}
}
// printPendingDeltas is a function that prints the change in pendingTimes for
// the proposed change.
func (c *simulateCmd) printPendingDeltas(
dt string,
proposed DeviceStatistics,
baseline DeviceStatistics,
) {
fmt.Println(dt)
fmt.Printf(
"PendingMean: (before -> after) %.2fs -> %.2fs | delta: %.2fs\n",
baseline.PendingMean.Seconds(),
proposed.PendingMean.Seconds(),
(proposed.PendingMean - baseline.PendingMean).Seconds(),
)
fmt.Printf(
"PendingP75: (before -> after) %.2fs -> %.2fs | delta: %.2fs\n",
baseline.PendingP75.Seconds(),
proposed.PendingP75.Seconds(),
(proposed.PendingP75 - baseline.PendingP75).Seconds(),
)
fmt.Printf(
"PendingP99: (before -> after) %.2fs -> %.2fs | delta: %.2fs\n",
baseline.PendingP99.Seconds(),
proposed.PendingP99.Seconds(),
(proposed.PendingP99 - baseline.PendingP99).Seconds(),
)
fmt.Printf(
"Availability: (before -> after) %.2f%% -> %.2f%% | delta: %.2f%%\n\n",
baseline.AvailableCapacityPercent,
proposed.AvailableCapacityPercent,
proposed.AvailableCapacityPercent-baseline.AvailableCapacityPercent,
)
}
type SimulateResults struct {
BotStats map[string]DeviceStatistics
StepResults []types.StepStatus
}
type DeviceStatistics struct {
PendingMean time.Duration
PendingP75 time.Duration
PendingP99 time.Duration
AvailableCapacityPercent float64
Builders []string
}
type RecoveringBot struct {
DeviceType string
RecoveryTime time.Time
}
type Simulator struct {
baseline bool // whether the simulation should factor in builder changes
cqAttempts []types.CommitQueueAttempt // working copy of originalCqAttempts
originalCqAttempts []types.CommitQueueAttempt // incoming presubmit verification requests
recoveringBots []RecoveringBot // bots that are performing maintenance between tasks
ciAdd []string // builders added to ci
ciRemove []string // builders removed from ci
cqAdd []string // builders added to cq
cqRemove []string // builders removed from cq
StepResults []types.StepStatus // debug logging for observing the simulation in detail
expiredTasks []types.TestingTask // tasks that were pending for longer than their expiration timeout
finishedTasks []types.TestingTask // tasks which have completed
runningTasks []types.TestingTask // tasks that are using a bot and have not completed
waitingTasks []types.TestingTask // tasks that are waiting for an available bot
originalTaskRuns []types.TestingTask // historical tasks observed over the simulated period
taskRuns []types.TestingTask // working copy of originalTaskRuns
builderProfiles map[string]types.BuilderProfile // builder information and bot footprint
botAvailability map[string]time.Duration // a mapping of bot names to duration that the bot had more than 0 available
idleBots map[string]int // working copy of originalIdleBots
originalIdleBots map[string]int // available bots for tasks
ciTriggers map[string]time.Time // next timestamp to trigger new ci builders
timestepIncrement time.Duration // how far in time the simulation should move between steps
currTime time.Time // the current time in the simulation
endTime time.Time // the end of the simulated period
startTime time.Time // the start of the simulated period
}
// Our hardware-backed bots require recovery overhead, which does not include gce or Cavium
var recoveryExcludedBots = []string{"gce", "Cavium"}
func (s *Simulator) simulate() SimulateResults {
// initialize working variables for the new simulation.
s.initVars()
for s.currTime = s.startTime; s.currTime.Before(s.endTime); s.currTime = s.currTime.Add(s.timestepIncrement) {
// Check if any bots have finished their tasks and recovery period and
// make them available.
s.recoverBots()
// Prune finished tasks from the runningTask queue.
s.processFinishedTasks()
// Add incoming historical traffic to the waitingTask queue.
s.processIncomingTasks()
// Add traffic from proposed changes to the waitingTask queue.
if !s.baseline {
s.processCITriggers()
s.processCQAttempts()
}
// Start waiting tasks if there are available bots.
s.processWaitingTasks()
// Increment our waiting and runtime counts.
for idx := range s.waitingTasks {
s.waitingTasks[idx].PendingDuration += s.timestepIncrement
}
for bot, amt := range s.idleBots {
// track if the bot type had units available
if amt != 0 {
s.botAvailability[bot] += s.timestepIncrement
}
}
// Log this step in the simulation for debugging.
s.StepResults = append(s.StepResults, types.StepStatus{
CurrentTime: s.currTime,
RunningTasks: s.runningTasks,
WaitingTasks: s.waitingTasks,
IdleBots: maps.Clone(s.idleBots),
TaskRuns: s.taskRuns,
})
}
// Calculate the mean/p50/p99 pending time for tasks per device type
pendingTimesByBots := map[string][]time.Duration{}
uniqueBuildersByBot := map[string][]string{}
BotStats := map[string]DeviceStatistics{}
for _, task := range append(s.finishedTasks, s.expiredTasks...) {
pendingTimesByBots[task.DeviceType] = append(pendingTimesByBots[task.DeviceType], task.PendingDuration)
if !slices.Contains(uniqueBuildersByBot[task.DeviceType], task.Builder) {
uniqueBuildersByBot[task.DeviceType] = append(uniqueBuildersByBot[task.DeviceType], task.Builder)
}
}
for bot, pt := range pendingTimesByBots {
slices.Sort(pt)
sum := 0 * time.Second
for _, p := range pt {
sum += p
}
avg := sum / time.Duration(len(pt))
p75 := percentile(pt, 75)
p99 := percentile(pt, 99)
BotStats[bot] = DeviceStatistics{
PendingMean: avg,
PendingP75: p75,
PendingP99: p99,
AvailableCapacityPercent: 100 * float64(s.botAvailability[bot]) / float64(s.endTime.Sub(s.startTime)),
Builders: uniqueBuildersByBot[bot],
}
}
return SimulateResults{
BotStats: BotStats,
StepResults: s.StepResults,
}
}
// initVars is a helper that initializes working variables before simulate() calls.
func (s *Simulator) initVars() {
s.timestepIncrement = 5 * time.Second
s.expiredTasks = []types.TestingTask{}
s.finishedTasks = []types.TestingTask{}
s.runningTasks = []types.TestingTask{}
s.waitingTasks = []types.TestingTask{}
s.recoveringBots = []RecoveringBot{}
s.idleBots = maps.Clone(s.originalIdleBots)
s.StepResults = []types.StepStatus{}
s.botAvailability = map[string]time.Duration{}
// This is a rough approximation for adding builders to CI. This assumes
// that there is a constant influx of CLs to merit triggering the builder
// after each run has completed.
// TODO(fxbug.dev/95557): Track merge events and use
// max_concurrent_invocations and batching rules to simulate with higher
// fidelity.
s.ciTriggers = map[string]time.Time{}
for _, builder := range s.ciAdd {
s.ciTriggers[builder] = s.startTime
}
s.taskRuns = s.originalTaskRuns
s.cqAttempts = s.originalCqAttempts
}
// recoverBots is a helper that adds bots to the idleBots pool if they've finished their tasks and recovered.
func (s *Simulator) recoverBots() {
var updatedRecoveringBots []RecoveringBot
for _, bot := range s.recoveringBots {
if s.currTime.After(bot.RecoveryTime) {
s.idleBots[bot.DeviceType] += 1
} else {
updatedRecoveringBots = append(updatedRecoveringBots, bot)
}
}
s.recoveringBots = updatedRecoveringBots
}
func (s *Simulator) processFinishedTasks() {
var updatedRunningTasks []types.TestingTask
for _, task := range s.runningTasks {
if s.currTime.After(task.CreateTime.Add(task.Duration)) {
s.finishedTasks = append(s.finishedTasks, task)
if slices.Contains(recoveryExcludedBots, task.DeviceType) {
s.idleBots[task.DeviceType] += 1
} else {
s.recoveringBots = append(
s.recoveringBots,
RecoveringBot{
DeviceType: task.DeviceType,
RecoveryTime: s.currTime.Add(75 * time.Second),
},
)
}
} else {
updatedRunningTasks = append(updatedRunningTasks, task)
}
}
s.runningTasks = updatedRunningTasks
}
// processWaitingTasks is a helper that assigns waiting tasks to an available bot.
// If no bot is available, it stays in the waitingTasks queue.
func (s *Simulator) processWaitingTasks() {
var updatedWaitingTasks []types.TestingTask
for _, task := range s.waitingTasks {
// Remove this task if it is pending over its configured expiration time.
if task.PendingDuration > task.Expiration {
s.expiredTasks = append(s.expiredTasks, task)
continue
}
if !s.baseline {
// Drop the task if it is proposed for removal in ci/cqRemove.
if task.IsTry && len(s.cqRemove) > 0 {
if slices.Contains(s.cqRemove, task.Builder) {
continue
}
} else if !task.IsTry && len(s.ciRemove) > 0 {
if slices.Contains(s.ciRemove, task.Builder) {
continue
}
}
}
if s.idleBots[task.DeviceType] > 0 {
s.idleBots[task.DeviceType] -= 1
s.runningTasks = append(s.runningTasks, task)
} else {
updatedWaitingTasks = append(updatedWaitingTasks, task)
}
}
s.waitingTasks = updatedWaitingTasks
}
// processIncomingTasks is a helper that moves tasks from the taskRuns queue to the waitingTasks queue.
// Tasks are only moved if the simulation is past the point they were created.
func (s *Simulator) processIncomingTasks() {
for len(s.taskRuns) > 0 && s.taskRuns[0].CreateTime.Before(s.currTime) {
task := s.taskRuns[0]
task.PendingDuration = 0
s.taskRuns = s.taskRuns[1:]
s.waitingTasks = append(s.waitingTasks, task)
}
}
// processCiTriggers is a helper that enqueues subtasks for the simulate command.
// Added CI builders have an associated timestamp that indicates when we should
// trigger the next round of subtasks.
func (s *Simulator) processCITriggers() {
for builder, nextTrigger := range s.ciTriggers {
if s.currTime.After(nextTrigger) {
// Trigger all subtasks in profile.
for _, deviceFootprint := range s.builderProfiles[builder].DeviceFootprints {
for i := 0; i < deviceFootprint.AverageTasks; i++ {
// Internal CI
task := types.TestingTask{
Builder: builder,
CreateTime: s.currTime,
DeviceType: "internal_" + deviceFootprint.DeviceType,
Duration: deviceFootprint.AverageTaskDuration,
Expiration: deviceFootprint.Expiration,
IsTry: false,
PendingDuration: 0,
}
s.waitingTasks = append(s.waitingTasks, task)
}
}
// Set the next trigger.
s.ciTriggers[builder] = s.currTime.Add(s.builderProfiles[builder].AverageBuildDuration)
}
}
}
// processCqAttempts is a helper that enqueues subtasks for the simulate command.
// It loops through historical presubmit attempts and enqueues tasks for each
// added CQ builder.
// (TODO:fxbug.dev/95819) - Only trigger cq builders for the projects they're
// enabled for.
func (s *Simulator) processCQAttempts() {
for ; len(s.cqAttempts) > 0 && (s.cqAttempts)[0].StartTime.Before(s.currTime); s.cqAttempts = s.cqAttempts[1:] {
cqAttempt := s.cqAttempts[0]
externality := "internal_"
if strings.HasPrefix(cqAttempt.ConfigGroup, "fuchsia") {
externality = "external_"
}
for _, builder := range s.cqAdd {
// Trigger all subtasks in profile.
for _, deviceFootprint := range s.builderProfiles[builder].DeviceFootprints {
for i := 0; i < deviceFootprint.AverageTasks; i++ {
task := types.TestingTask{
Builder: builder,
CreateTime: s.currTime,
DeviceType: externality + deviceFootprint.DeviceType,
Duration: deviceFootprint.AverageTaskDuration,
Expiration: deviceFootprint.Expiration,
IsTry: true,
PendingDuration: 0,
}
s.waitingTasks = append(s.waitingTasks, task)
}
}
}
}
}
func runQuery(
ctx context.Context,
client *bigquery.Client,
query string,
params []bigquery.QueryParameter, ptr interface{},
) error {
q := client.Query(query)
q.Parameters = params
iter, err := q.Read(ctx)
if err != nil {
return err
}
var rows []map[string]bigquery.Value
for {
row := make(map[string]bigquery.Value)
err := iter.Next(&row)
if err == iterator.Done {
break
} else if err != nil {
return err
}
rows = append(rows, row)
}
// This is a hacky way to easily deserialize rows into objects
// Copy at your own peril!
jsonRows, err := json.Marshal(rows)
if err != nil {
return err
}
return json.Unmarshal(jsonRows, &ptr)
}
func reportSimulationDifferences(
deviceTypes map[string]int,
baselineSimulation, proposedSimulation []types.StepStatus,
) {
for idx, baselineStep := range baselineSimulation {
proposedStep := proposedSimulation[idx]
diffStrings := []string{fmt.Sprintf("Diffs for Execution step %d", idx+1)}
for dt := range deviceTypes {
bworkers := baselineStep.IdleBots[dt]
pworkers := proposedStep.IdleBots[dt]
if bworkers != pworkers {
s := fmt.Sprintf(
"Diff available workers: %s %d | %d",
dt,
bworkers,
pworkers,
)
diffStrings = append(diffStrings, s)
}
bpending := baselineStep.WaitingTimes[dt]
ppending := proposedStep.WaitingTimes[dt]
if bpending != ppending {
s := fmt.Sprintf(
"Diff in pending minutes: %s %f | %f",
dt,
bpending.Minutes(),
ppending.Minutes(),
)
diffStrings = append(diffStrings, s)
}
}
if len(baselineStep.RunningTasks) != len(proposedStep.RunningTasks) {
s := fmt.Sprintf(
"Diff runningTasks length: %d | %d",
len(baselineStep.RunningTasks),
len(proposedStep.RunningTasks),
)
diffStrings = append(diffStrings, s)
}
if len(baselineStep.WaitingTasks) != len(proposedStep.WaitingTasks) {
s := fmt.Sprintf(
"Diff waitingTasks length: %d | %d",
len(baselineStep.WaitingTasks),
len(proposedStep.WaitingTasks),
)
diffStrings = append(diffStrings, s)
}
if len(baselineStep.TaskRuns) != len(proposedStep.TaskRuns) {
s := fmt.Sprintf(
"Diff scheduledRuns length: %d | %d",
len(baselineStep.TaskRuns),
len(proposedStep.TaskRuns),
)
diffStrings = append(diffStrings, s)
}
if len(diffStrings) > 1 {
for _, str := range diffStrings {
log.Print(str)
}
log.Print("\n")
}
}
}
func maxLength(s map[string]int) int {
max := 0
for str := range s {
if len(str) > max {
max = len(str)
}
}
return max
}
func percentile(d []time.Duration, p int) time.Duration {
return d[len(d)*p/100]
}