blob: 9dfd5ec6ddffda840fb2da13d110670d4697367a [file] [log] [blame]
// Copyright 2023 The Shac Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reporting
import (
"bytes"
"context"
"encoding/json"
"fmt"
"html"
"io"
"net/http"
"os"
"path/filepath"
"sync"
"time"
resultpb "go.chromium.org/luci/resultdb/proto/v1"
sinkpb "go.chromium.org/luci/resultdb/sink/proto/v1"
"go.fuchsia.dev/shac-project/shac/internal/engine"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
resultSinkMaxBatchSize = 500
// https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/pbutil/test_result.go;l=40;drc=1eaf63ee80de3e7de3139800ebe7d0d5497a42e2
resultDBMaxSummaryHTMLLength = 4 * 1024
// https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/pbutil/test_result.go;l=41;drc=1eaf63ee80de3e7de3139800ebe7d0d5497a42e2
resultDBMaxFailureReasonLength = 1024
)
type luci struct {
doneChecks chan *sinkpb.TestResult
// batchWaitDuration is the duration after having a test result enqueued
// that the reporter should wait before uploading it to ResultDB, in case
// more results are reported soon after that can be batched together.
// Overrideable to allow better determinism during tests.
batchWaitDuration time.Duration
mu sync.Mutex
eg errgroup.Group
liveChecks map[string]*sinkpb.TestResult
}
func (l *luci) init(ctx context.Context) error {
l.doneChecks = make(chan *sinkpb.TestResult, resultSinkMaxBatchSize)
l.liveChecks = map[string]*sinkpb.TestResult{}
r, err := resultSinkCtx()
if err != nil {
return err
}
// Do uploads in a persistent goroutine so HTTP requests don't block checks
// from running.
l.eg.Go(func() error {
client := &http.Client{}
requests := &sinkpb.ReportTestResultsRequest{}
for {
res, ok := <-l.doneChecks
if res == nil || !ok {
return nil
}
requests.TestResults = append(requests.TestResults, res)
for loop := true; loop && len(requests.TestResults) < resultSinkMaxBatchSize; {
select {
case res, ok = <-l.doneChecks:
if res == nil || !ok {
loop = false
} else {
requests.TestResults = append(requests.TestResults, res)
}
// Wait a bit in case we get more results that we can upload in
// the same batch.
case <-time.After(l.batchWaitDuration):
loop = false
}
}
b, marshalErr := protojson.MarshalOptions{}.Marshal(requests)
requests.TestResults = requests.TestResults[:0]
l.eg.Go(func() error {
if marshalErr != nil {
// Return error from here instead of from the outer function
// so that the outer function can continue doing uploads
// even if one upload attempt fails.
return marshalErr
}
// TODO(olivernewman): Implement HTTP retries.
return r.sendData(ctx, client, "ReportTestResults", b)
})
}
})
return nil
}
func (l *luci) Close() error {
close(l.doneChecks)
// Wait for the upload goroutine to complete before exiting.
return l.eg.Wait()
}
func (l *luci) EmitFinding(ctx context.Context, check string, level engine.Level, message, root, file string, s engine.Span, replacements []string) error {
r := l.getTestResult(check)
check = html.EscapeString(check)
lev := html.EscapeString(string(level))
file = html.EscapeString(file)
message = html.EscapeString(message)
if file != "" {
// TODO(maruel): Do not drop span and replacements!
if s.Start.Line > 0 {
r.SummaryHtml += fmt.Sprintf("[%s/%s] %s(%d): %s<br>", check, lev, file, s.Start.Line, message)
return nil
}
r.SummaryHtml += fmt.Sprintf("[%s/%s] %s: %s<br>", check, lev, file, message)
return nil
}
r.SummaryHtml += fmt.Sprintf("[%s/%s] %s<br>", check, lev, message)
return nil
}
func (l *luci) EmitArtifact(ctx context.Context, check, root, file string, content []byte) error {
r := l.getTestResult(check)
if content != nil {
r.Artifacts[file] = &sinkpb.Artifact{
Body: &sinkpb.Artifact_Contents{Contents: content},
ContentType: "text/plain",
}
} else {
r.Artifacts[file] = &sinkpb.Artifact{
Body: &sinkpb.Artifact_FilePath{FilePath: filepath.Join(root, file)},
ContentType: "text/plain",
}
}
return nil
}
func (l *luci) CheckCompleted(ctx context.Context, check string, start time.Time, d time.Duration, level engine.Level, err error) {
r := l.getTestResult(check)
r.StartTime = timestamppb.New(start)
r.Duration = durationpb.New(d)
truncationNotice := "... (truncated)"
if err != nil {
r.Status = resultpb.TestStatus_CRASH
msg := err.Error()
if len(msg) > resultDBMaxFailureReasonLength {
msg = msg[:resultDBMaxFailureReasonLength-len(truncationNotice)]
msg = msg + truncationNotice
}
r.FailureReason = &resultpb.FailureReason{PrimaryErrorMessage: msg}
} else if level == engine.Error {
r.Status = resultpb.TestStatus_FAIL
} else {
r.Status = resultpb.TestStatus_PASS
r.Expected = true
}
if len(r.SummaryHtml) > resultDBMaxSummaryHTMLLength {
// TODO(olivernewman): Be careful not to truncate in the middle of an
// HTML tag.
r.SummaryHtml = r.SummaryHtml[:resultDBMaxSummaryHTMLLength-len(truncationNotice)] + truncationNotice
}
// TODO(maruel): Tag r.Tags with "shac".
l.mu.Lock()
delete(l.liveChecks, check)
l.mu.Unlock()
l.doneChecks <- r
}
func (l *luci) Print(ctx context.Context, check, file string, line int, message string) {
}
func (l *luci) getTestResult(check string) *sinkpb.TestResult {
l.mu.Lock()
r := l.liveChecks[check]
if r == nil {
r = &sinkpb.TestResult{
TestId: "shac/" + check,
Artifacts: map[string]*sinkpb.Artifact{},
}
l.liveChecks[check] = r
}
l.mu.Unlock()
return r
}
// Support code.
// luciContext corresponds to the schema of the file identified by the
// LUCI_CONTEXT env var. See
// https://crsrc.org/i/go/src/go.chromium.org/luci/lucictx/sections.proto for
// the whole structure.
type luciContext struct {
ResultDB resultDB `json:"resultdb"`
ResultSink resultSinkContext `json:"result_sink"`
}
// resultSinkContext holds the result_sink information parsed from LUCI_CONTEXT.
type resultSinkContext struct {
AuthToken string `json:"auth_token"`
ResultSinkAddr string `json:"address"`
}
type resultDB struct {
CurrentInvocation resultDBInvocation `json:"current_invocation"`
}
type resultDBInvocation struct {
Name string `json:"name"`
}
func (r *resultSinkContext) sendData(ctx context.Context, client *http.Client, endpoint string, data []byte) error {
url := fmt.Sprintf("http://%s/prpc/luci.resultsink.v1.Sink/%s", r.ResultSinkAddr, endpoint)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data))
if err != nil {
return err
}
// ResultSink HTTP authorization scheme is documented at
// https://crsrc.org/i/go/src/go.chromium.org/luci/resultdb/sink/proto/v1/sink.proto;l=30
req.Header.Add("Authorization", "ResultSink "+r.AuthToken)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return err
}
_, err = io.Copy(io.Discard, resp.Body)
if err2 := resp.Body.Close(); err == nil {
err = err2
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("ResultDB HTTP Request error: %s (%d)", http.StatusText(resp.StatusCode), resp.StatusCode)
}
return err
}
// resultSinkCtx returns the rdb stream port if available.
func resultSinkCtx() (*resultSinkContext, error) {
path := os.Getenv("LUCI_CONTEXT")
b, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to load LUCI_CONTEXT=%q: %w", path, err)
}
var ctx luciContext
if err = json.Unmarshal(b, &ctx); err != nil {
return nil, fmt.Errorf("failed to load LUCI_CONTEXT=%q: %w", path, err)
}
// We are clearly running inside a LUCI_CONTEXT luciexe environment but rdb
// stream was not started. Hard fail since it means we need to fix the recipe.
if ctx.ResultDB.CurrentInvocation.Name == "" || ctx.ResultSink.AuthToken == "" || ctx.ResultSink.ResultSinkAddr == "" {
return nil, fmt.Errorf("resultdb is enabled but not resultsink for invocation %s. Make sure shac is run under \"rdb stream\"", ctx.ResultDB.CurrentInvocation.Name)
}
return &ctx.ResultSink, nil
}