blob: 289c4febae47ca12309d47716d57b4bbb9bf8537 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package resultdb
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
sinkpb "go.chromium.org/luci/resultdb/sink/proto/v1"
"google.golang.org/protobuf/encoding/protojson"
)
// luciContext corresponds to the schema of the file identified by the
// LUCI_CONTEXT env var. See
// https://crsrc.org/i/go/src/go.chromium.org/luci/lucictx/sections.proto for
// the whole structure.
type luciContext struct {
ResultDB resultDB `json:"resultdb"`
ResultSink resultSink `json:"result_sink"`
}
// resultSink holds the result_sink information parsed from LUCI_CONTEXT.
type resultSink struct {
AuthToken string `json:"auth_token"`
ResultSinkAddr string `json:"address"`
}
type Client struct {
resultSink *resultSink
httpClient *http.Client
semaphore chan struct{}
}
type resultDB struct {
CurrentInvocation resultDBInvocation `json:"current_invocation"`
}
type resultDBInvocation struct {
Name string `json:"name"`
}
func (c *Client) ReportTestResults(requests []*sinkpb.ReportTestResultsRequest) error {
for _, request := range requests {
testResult := protojson.Format(request)
err := c.sendData(testResult, "ReportTestResults")
if err != nil {
return err
}
}
return nil
}
func (c *Client) ReportInvocationLevelArtifacts(outputRoot string, invocationArtifacts []string) error {
invocationRequest := &sinkpb.ReportInvocationLevelArtifactsRequest{
Artifacts: InvocationLevelArtifacts(outputRoot, invocationArtifacts),
}
testResult := protojson.Format(invocationRequest)
return c.sendData(testResult, "ReportInvocationLevelArtifacts")
}
func (c *Client) sendData(data, endpoint string) error {
<-c.semaphore
defer func() { c.semaphore <- struct{}{} }()
url := fmt.Sprintf("http://%s/prpc/luci.resultsink.v1.Sink/%s", c.resultSink.ResultSinkAddr, endpoint)
req, err := http.NewRequest("POST", url, strings.NewReader(data))
if err != nil {
return err
}
// ResultSink HTTP authorization scheme is documented at
// https://fuchsia.googlesource.com/third_party/luci-go/+/HEAD/resultdb/sink/proto/v1/sink.proto#29
req.Header.Add("Authorization", fmt.Sprintf("ResultSink %s", c.resultSink.AuthToken))
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("ResultDB Http Request errored with status code %s (%d)", http.StatusText(resp.StatusCode), resp.StatusCode)
}
return nil
}
func NewClient() (*Client, error) {
b, err := os.ReadFile(os.Getenv("LUCI_CONTEXT"))
if err != nil {
return nil, err
}
var ctx luciContext
if err = json.Unmarshal(b, &ctx); err != nil {
return nil, err
}
// We are clearly running inside a LUCI_CONTEXT luciexe environment but rdb
// stream was not started.
if ctx.ResultSink.AuthToken == "" || ctx.ResultSink.ResultSinkAddr == "" {
return nil, fmt.Errorf("resultdb is enabled but not resultsink for invocation. Make sure swarming is run under \"rdb stream\"")
}
client := &Client{
resultSink: &ctx.ResultSink,
httpClient: &http.Client{},
semaphore: make(chan struct{}, 64),
}
for i := 0; i < cap(client.semaphore); i++ {
client.semaphore <- struct{}{}
}
return client, nil
}