| // Copyright 2023 The Fuchsia Authors. All rights reserved |
| // Use of this source code is governed by a BSD-style license that can |
| // found in the LICENSE file. |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "net/http" |
| "os" |
| "path/filepath" |
| "strconv" |
| "strings" |
| "time" |
| |
| "go.chromium.org/luci/common/api/swarming/swarming/v1" |
| ftxproto "go.fuchsia.dev/infra/cmd/ftxtest/proto" |
| ) |
| |
| type Swarming struct { |
| instance string |
| service *swarming.Service |
| cas *CAS |
| } |
| |
| const ( |
| taskPriority = 200 |
| taskExpiration = 5 * time.Hour |
| taskExecutionTimeout = 2 * time.Hour |
| poolTaskInterval = 10 * time.Second |
| ) |
| |
| func NewSwarming(httpClient *http.Client, instance string, cas *CAS) (*Swarming, error) { |
| swarmingService, err := swarming.New(httpClient) |
| swarmingService.BasePath = fmt.Sprintf("https://%s.appspot.com/_ah/api/swarming/v1/", instance) |
| if err != nil { |
| return nil, fmt.Errorf("swarming.New: %v", err) |
| } |
| swarming := &Swarming{ |
| service: swarmingService, |
| instance: instance, |
| cas: cas, |
| } |
| return swarming, nil |
| } |
| |
| func (s *Swarming) LaunchTask(buildInput *ftxproto.InputProperties, parentTaskId string) (*swarming.SwarmingRpcsTaskRequestMetadata, error) { |
| casInput, err := s.casInput(buildInput) |
| if err != nil { |
| return nil, fmt.Errorf("casInput: %v", err) |
| } |
| hostTest := buildInput.Test.GetHost() |
| properties := &swarming.SwarmingRpcsTaskProperties{ |
| ExecutionTimeoutSecs: int64(taskExpiration.Seconds()), |
| Command: hostTest.Command, |
| CasInputRoot: casInput, |
| Outputs: []string{"out"}, |
| Dimensions: swarmmingStringPair(buildInput.TargetDimensions), |
| Env: swarmmingStringPair(hostTest.Env), |
| } |
| if len(buildInput.CipdPackages) > 0 { |
| properties.CipdInput = cipdInput(buildInput.CipdPackages) |
| } |
| return s.service.Tasks.New(&swarming.SwarmingRpcsNewTaskRequest{ |
| Name: buildInput.Name, |
| ParentTaskId: parentTaskId, |
| ServiceAccount: "internal-artifact-readers@fuchsia-infra.iam.gserviceaccount.com", |
| ExpirationSecs: int64(taskExpiration.Seconds()), |
| Priority: taskPriority, |
| Realm: realm(buildInput), |
| Properties: properties, |
| }).Do() |
| } |
| |
| func (s *Swarming) WaitTask(taskId string) error { |
| for { |
| result, err := s.service.Task.Result(taskId).Fields("state", "failure", "internal_failure").Do() |
| if err != nil { |
| err = fmt.Errorf("swarming.Task.Result RPC failed: %v", err) |
| return err |
| } |
| if result.State == "PENDING" || result.State == "RUNNING" { |
| time.Sleep(poolTaskInterval) |
| continue |
| } |
| if result.State != "COMPLETED" || result.Failure || result.InternalFailure { |
| return fmt.Errorf("Task got %s want COMPLETED", result.State) |
| } |
| return nil |
| } |
| } |
| |
| func (s *Swarming) casInput(buildInput *ftxproto.InputProperties) (*swarming.SwarmingRpcsCASReference, error) { |
| digestSplit := strings.Split(buildInput.InputArtifactsDigest, "/") |
| sizeBytes, err := strconv.ParseInt(digestSplit[1], 10, 64) |
| if err != nil { |
| return nil, fmt.Errorf("sizeBytes: %v", err) |
| } |
| return &swarming.SwarmingRpcsCASReference{ |
| CasInstance: fmt.Sprintf("projects/%s/instances/default_instance", s.instance), |
| Digest: &swarming.SwarmingRpcsDigest{ |
| Hash: digestSplit[0], |
| SizeBytes: sizeBytes, |
| }, |
| }, nil |
| } |
| |
| func cipdInput(cipdPackages []*ftxproto.CipdPackage) *swarming.SwarmingRpcsCipdInput { |
| packages := []*swarming.SwarmingRpcsCipdPackage{} |
| for _, inputPkg := range cipdPackages { |
| packages = append(packages, &swarming.SwarmingRpcsCipdPackage{ |
| Path: inputPkg.Path, |
| Version: inputPkg.Version, |
| PackageName: inputPkg.Name, |
| }) |
| } |
| return &swarming.SwarmingRpcsCipdInput{ |
| Packages: packages, |
| } |
| } |
| |
| func swarmmingStringPair(m map[string]string) []*swarming.SwarmingRpcsStringPair { |
| result := []*swarming.SwarmingRpcsStringPair{} |
| for key, value := range m { |
| result = append(result, &swarming.SwarmingRpcsStringPair{ |
| Key: key, |
| Value: value, |
| }) |
| } |
| return result |
| } |
| |
| func realm(buildInput *ftxproto.InputProperties) string { |
| if buildInput.External { |
| return "fuchsia:try" |
| } else { |
| return "turquoise:global.try" |
| } |
| } |
| |
| // testSummary determines the data for out/summary.json |
| type testSummary struct { |
| Success bool `json:"success"` |
| } |
| |
| func (s *Swarming) CheckTestFailure(ctx context.Context, taskId string, buildOutput *ftxproto.OutputProperties) error { |
| task, err := s.service.Task.Result(taskId).Do() |
| if err != nil { |
| return fmt.Errorf("task.request: %v", err) |
| } |
| if task.CasOutputRoot == nil || task.CasOutputRoot.Digest == nil { |
| return errors.New("Swarming task did not produce CAS output") |
| } |
| d := task.CasOutputRoot.Digest |
| buildOutput.OutputArtifactsDigest = fmt.Sprintf("%s/%d", d.Hash, d.SizeBytes) |
| dir, err := s.cas.Download(ctx, d.Hash, d.SizeBytes) |
| if err != nil { |
| return fmt.Errorf("cas.Download: %v", err) |
| } |
| summary := testSummary{} |
| err = readJSON(filepath.Join(dir, "out", "summary.json"), &summary) |
| if err != nil { |
| return fmt.Errorf("readJSON: %v", err) |
| } |
| if !summary.Success { |
| return errors.New("Test failure") |
| } |
| return nil |
| } |
| |
| func readJSON(filename string, out any) error { |
| rawData, err := os.ReadFile(filename) |
| if err != nil { |
| return fmt.Errorf("os.ReadFile: %v", err) |
| } |
| err = json.Unmarshal(rawData, out) |
| if err != nil { |
| return fmt.Errorf("json.Unmarshal: %v", err) |
| } |
| return nil |
| } |