blob: e0e6d169799e880cd34c8aa384387ad7318d372b [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved
// Use of this source code is governed by a BSD-style license that can
// found in the LICENSE file.
package main
import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"go.chromium.org/luci/common/api/swarming/swarming/v1"
ftxproto "go.fuchsia.dev/infra/cmd/ftxtest/proto"
)
type Swarming struct {
instance string
service *swarming.Service
}
const (
taskPriority = 200
taskExpiration = 5 * time.Hour
taskExecutionTimeout = 2 * time.Hour
poolTaskInterval = 10 * time.Second
)
func NewSwarming(httpClient *http.Client, instance string) (*Swarming, error) {
swarmingService, err := swarming.New(httpClient)
swarmingService.BasePath = fmt.Sprintf("https://%s.appspot.com/_ah/api/swarming/v1/", instance)
if err != nil {
return nil, fmt.Errorf("swarming.New: %v", err)
}
swarming := &Swarming{
service: swarmingService,
instance: instance,
}
return swarming, nil
}
func (s *Swarming) LaunchTask(buildInput *ftxproto.InputProperties) (*swarming.SwarmingRpcsTaskRequestMetadata, error) {
casInput, err := s.casInput(buildInput)
if err != nil {
return nil, fmt.Errorf("casInput: %v", err)
}
hostTest := buildInput.Test.GetHost()
return s.service.Tasks.New(&swarming.SwarmingRpcsNewTaskRequest{
Name: buildInput.Name,
ExpirationSecs: int64(taskExpiration.Seconds()),
Priority: taskPriority,
Realm: realm(buildInput),
Properties: &swarming.SwarmingRpcsTaskProperties{
ExecutionTimeoutSecs: int64(taskExpiration.Seconds()),
Command: hostTest.Command,
CasInputRoot: casInput,
CipdInput: cipdInput(buildInput.CipdPackages),
Dimensions: swarmmingStringPair(buildInput.TargetDimensions),
Env: swarmmingStringPair(hostTest.Env),
},
}).Do()
}
func (s *Swarming) WaitTask(taskId string) error {
for {
result, err := s.service.Task.Result(taskId).Fields("state", "failure", "internal_failure").Do()
if err != nil {
err = fmt.Errorf("swarming.Task.Result RPC failed: %v", err)
return err
}
if result.Failure || result.InternalFailure {
err = errors.New("Swarming task failed")
return err
}
if result.State == "PENDING" || result.State == "RUNNING" {
time.Sleep(poolTaskInterval)
continue
}
return nil
}
}
func (s *Swarming) casInput(buildInput *ftxproto.InputProperties) (*swarming.SwarmingRpcsCASReference, error) {
digestSplit := strings.Split(buildInput.InputArtifactsDigest, "/")
sizeBytes, err := strconv.ParseInt(digestSplit[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("sizeBytes: %v", err)
}
return &swarming.SwarmingRpcsCASReference{
CasInstance: fmt.Sprintf("projects/%s/instances/default_instance", s.instance),
Digest: &swarming.SwarmingRpcsDigest{
Hash: digestSplit[0],
SizeBytes: sizeBytes,
},
}, nil
}
func cipdInput(cipdPackages []*ftxproto.CipdPackage) *swarming.SwarmingRpcsCipdInput {
packages := []*swarming.SwarmingRpcsCipdPackage{}
for _, inputPkg := range cipdPackages {
packages = append(packages, &swarming.SwarmingRpcsCipdPackage{
Path: inputPkg.Path,
Version: inputPkg.Version,
PackageName: inputPkg.Name,
})
}
return &swarming.SwarmingRpcsCipdInput{
Packages: packages,
}
}
func swarmmingStringPair(m map[string]string) []*swarming.SwarmingRpcsStringPair {
result := []*swarming.SwarmingRpcsStringPair{}
for key, value := range m {
result = append(result, &swarming.SwarmingRpcsStringPair{
Key: key,
Value: value,
})
}
return result
}
func realm(buildInput *ftxproto.InputProperties) string {
if buildInput.External {
return "fuchsia:try"
} else {
return "turquoise:global.try"
}
}