| // Copyright 2023 The Fuchsia Authors. All rights reserved |
| // Use of this source code is governed by a BSD-style license that can |
| // found in the LICENSE file. |
| package main |
| |
| import ( |
| "errors" |
| "fmt" |
| "net/http" |
| "strconv" |
| "strings" |
| "time" |
| |
| "go.chromium.org/luci/common/api/swarming/swarming/v1" |
| ftxproto "go.fuchsia.dev/infra/cmd/ftxtest/proto" |
| ) |
| |
| type Swarming struct { |
| instance string |
| service *swarming.Service |
| } |
| |
| const ( |
| taskPriority = 200 |
| taskExpiration = 5 * time.Hour |
| taskExecutionTimeout = 2 * time.Hour |
| poolTaskInterval = 10 * time.Second |
| ) |
| |
| func NewSwarming(httpClient *http.Client, instance string) (*Swarming, error) { |
| swarmingService, err := swarming.New(httpClient) |
| swarmingService.BasePath = fmt.Sprintf("https://%s.appspot.com/_ah/api/swarming/v1/", instance) |
| if err != nil { |
| return nil, fmt.Errorf("swarming.New: %v", err) |
| } |
| swarming := &Swarming{ |
| service: swarmingService, |
| instance: instance, |
| } |
| return swarming, nil |
| } |
| |
| func (s *Swarming) LaunchTask(buildInput *ftxproto.InputProperties) (*swarming.SwarmingRpcsTaskRequestMetadata, error) { |
| casInput, err := s.casInput(buildInput) |
| if err != nil { |
| return nil, fmt.Errorf("casInput: %v", err) |
| } |
| hostTest := buildInput.Test.GetHost() |
| return s.service.Tasks.New(&swarming.SwarmingRpcsNewTaskRequest{ |
| Name: buildInput.Name, |
| ExpirationSecs: int64(taskExpiration.Seconds()), |
| Priority: taskPriority, |
| Realm: realm(buildInput), |
| Properties: &swarming.SwarmingRpcsTaskProperties{ |
| ExecutionTimeoutSecs: int64(taskExpiration.Seconds()), |
| Command: hostTest.Command, |
| CasInputRoot: casInput, |
| CipdInput: cipdInput(buildInput.CipdPackages), |
| Dimensions: swarmmingStringPair(buildInput.TargetDimensions), |
| Env: swarmmingStringPair(hostTest.Env), |
| }, |
| }).Do() |
| } |
| |
| func (s *Swarming) WaitTask(taskId string) error { |
| for { |
| result, err := s.service.Task.Result(taskId).Fields("state", "failure", "internal_failure").Do() |
| if err != nil { |
| err = fmt.Errorf("swarming.Task.Result RPC failed: %v", err) |
| return err |
| } |
| if result.Failure || result.InternalFailure { |
| err = errors.New("Swarming task failed") |
| return err |
| } |
| if result.State == "PENDING" || result.State == "RUNNING" { |
| time.Sleep(poolTaskInterval) |
| continue |
| } |
| return nil |
| } |
| } |
| |
| func (s *Swarming) casInput(buildInput *ftxproto.InputProperties) (*swarming.SwarmingRpcsCASReference, error) { |
| digestSplit := strings.Split(buildInput.InputArtifactsDigest, "/") |
| sizeBytes, err := strconv.ParseInt(digestSplit[1], 10, 64) |
| if err != nil { |
| return nil, fmt.Errorf("sizeBytes: %v", err) |
| } |
| return &swarming.SwarmingRpcsCASReference{ |
| CasInstance: fmt.Sprintf("projects/%s/instances/default_instance", s.instance), |
| Digest: &swarming.SwarmingRpcsDigest{ |
| Hash: digestSplit[0], |
| SizeBytes: sizeBytes, |
| }, |
| }, nil |
| } |
| |
| func cipdInput(cipdPackages []*ftxproto.CipdPackage) *swarming.SwarmingRpcsCipdInput { |
| packages := []*swarming.SwarmingRpcsCipdPackage{} |
| for _, inputPkg := range cipdPackages { |
| packages = append(packages, &swarming.SwarmingRpcsCipdPackage{ |
| Path: inputPkg.Path, |
| Version: inputPkg.Version, |
| PackageName: inputPkg.Name, |
| }) |
| } |
| return &swarming.SwarmingRpcsCipdInput{ |
| Packages: packages, |
| } |
| } |
| |
| func swarmmingStringPair(m map[string]string) []*swarming.SwarmingRpcsStringPair { |
| result := []*swarming.SwarmingRpcsStringPair{} |
| for key, value := range m { |
| result = append(result, &swarming.SwarmingRpcsStringPair{ |
| Key: key, |
| Value: value, |
| }) |
| } |
| return result |
| } |
| |
| func realm(buildInput *ftxproto.InputProperties) string { |
| if buildInput.External { |
| return "fuchsia:try" |
| } else { |
| return "turquoise:global.try" |
| } |
| } |