blob: a391afc3bf87a6fcc1e0307a846d5cbd93be3e96 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved
// Use of this source code is governed by a BSD-style license that can
// found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"go.chromium.org/luci/common/api/swarming/swarming/v1"
ftxproto "go.fuchsia.dev/infra/cmd/ftxtest/proto"
)
type Swarming struct {
instance string
service *swarming.Service
cas *CAS
}
const (
taskPriority = 200
taskExpiration = 5 * time.Hour
taskExecutionTimeout = 2 * time.Hour
poolTaskInterval = 10 * time.Second
)
func NewSwarming(httpClient *http.Client, instance string, cas *CAS) (*Swarming, error) {
swarmingService, err := swarming.New(httpClient)
swarmingService.BasePath = fmt.Sprintf("https://%s.appspot.com/_ah/api/swarming/v1/", instance)
if err != nil {
return nil, fmt.Errorf("swarming.New: %v", err)
}
swarming := &Swarming{
service: swarmingService,
instance: instance,
cas: cas,
}
return swarming, nil
}
func (s *Swarming) LaunchTask(buildInput *ftxproto.InputProperties, parentTaskId string) (*swarming.SwarmingRpcsTaskRequestMetadata, error) {
casInput, err := s.casInput(buildInput)
if err != nil {
return nil, fmt.Errorf("casInput: %v", err)
}
hostTest := buildInput.Test.GetHost()
properties := &swarming.SwarmingRpcsTaskProperties{
ExecutionTimeoutSecs: int64(taskExpiration.Seconds()),
Command: hostTest.Command,
CasInputRoot: casInput,
Outputs: []string{"out"},
Dimensions: swarmmingStringPair(buildInput.TargetDimensions),
Env: swarmmingStringPair(hostTest.Env),
}
if len(buildInput.CipdPackages) > 0 {
properties.CipdInput = cipdInput(buildInput.CipdPackages)
}
return s.service.Tasks.New(&swarming.SwarmingRpcsNewTaskRequest{
Name: buildInput.Name,
ParentTaskId: parentTaskId,
ServiceAccount: "internal-artifact-readers@fuchsia-infra.iam.gserviceaccount.com",
ExpirationSecs: int64(taskExpiration.Seconds()),
Priority: taskPriority,
Realm: realm(buildInput),
Properties: properties,
}).Do()
}
func (s *Swarming) WaitTask(taskId string) error {
for {
result, err := s.service.Task.Result(taskId).Fields("state", "failure", "internal_failure").Do()
if err != nil {
err = fmt.Errorf("swarming.Task.Result RPC failed: %v", err)
return err
}
if result.State == "PENDING" || result.State == "RUNNING" {
time.Sleep(poolTaskInterval)
continue
}
if result.State != "COMPLETED" || result.Failure || result.InternalFailure {
return fmt.Errorf("Task got %s want COMPLETED", result.State)
}
return nil
}
}
func (s *Swarming) casInput(buildInput *ftxproto.InputProperties) (*swarming.SwarmingRpcsCASReference, error) {
digestSplit := strings.Split(buildInput.InputArtifactsDigest, "/")
sizeBytes, err := strconv.ParseInt(digestSplit[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("sizeBytes: %v", err)
}
return &swarming.SwarmingRpcsCASReference{
CasInstance: fmt.Sprintf("projects/%s/instances/default_instance", s.instance),
Digest: &swarming.SwarmingRpcsDigest{
Hash: digestSplit[0],
SizeBytes: sizeBytes,
},
}, nil
}
func cipdInput(cipdPackages []*ftxproto.CipdPackage) *swarming.SwarmingRpcsCipdInput {
packages := []*swarming.SwarmingRpcsCipdPackage{}
for _, inputPkg := range cipdPackages {
packages = append(packages, &swarming.SwarmingRpcsCipdPackage{
Path: inputPkg.Path,
Version: inputPkg.Version,
PackageName: inputPkg.Name,
})
}
return &swarming.SwarmingRpcsCipdInput{
Packages: packages,
}
}
func swarmmingStringPair(m map[string]string) []*swarming.SwarmingRpcsStringPair {
result := []*swarming.SwarmingRpcsStringPair{}
for key, value := range m {
result = append(result, &swarming.SwarmingRpcsStringPair{
Key: key,
Value: value,
})
}
return result
}
func realm(buildInput *ftxproto.InputProperties) string {
if buildInput.External {
return "fuchsia:try"
} else {
return "turquoise:global.try"
}
}
// testSummary determines the data for out/summary.json
type testSummary struct {
Success bool `json:"success"`
}
func (s *Swarming) CheckTestFailure(ctx context.Context, taskId string, buildOutput *ftxproto.OutputProperties) error {
task, err := s.service.Task.Result(taskId).Do()
if err != nil {
return fmt.Errorf("task.request: %v", err)
}
if task.CasOutputRoot == nil || task.CasOutputRoot.Digest == nil {
return errors.New("Swarming task did not produce CAS output")
}
d := task.CasOutputRoot.Digest
buildOutput.OutputArtifactsDigest = fmt.Sprintf("%s/%d", d.Hash, d.SizeBytes)
dir, err := s.cas.Download(ctx, d.Hash, d.SizeBytes)
if err != nil {
return fmt.Errorf("cas.Download: %v", err)
}
summary := testSummary{}
err = readJSON(filepath.Join(dir, "out", "summary.json"), &summary)
if err != nil {
return fmt.Errorf("readJSON: %v", err)
}
if !summary.Success {
return errors.New("Test failure")
}
return nil
}
func readJSON(filename string, out any) error {
rawData, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("os.ReadFile: %v", err)
}
err = json.Unmarshal(rawData, out)
if err != nil {
return fmt.Errorf("json.Unmarshal: %v", err)
}
return nil
}