blob: 4db58a7f4e7ca8772f4b0a2496e9101dbd41d2e2 [file] [log] [blame]
// Package rexec provides a top-level client for executing remote commands.
package rexec
import (
"context"
"fmt"
"strings"
"time"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/command"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/tree"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
rc "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
tspb "github.com/golang/protobuf/ptypes/timestamp"
)
// Client is a remote execution client.
type Client struct {
FileMetadataCache filemetadata.Cache
GrpcClient *rc.Client
}
// Context allows more granular control over various stages of command execution.
// At any point, any errors that occurred will be stored in the Result.
type Context struct {
ctx context.Context
cmd *command.Command
opt *command.ExecutionOptions
oe outerr.OutErr
client *Client
inputBlobs []*chunker.Chunker
cmdCh, acCh *chunker.Chunker
resPb *repb.ActionResult
// The metadata of the current execution.
Metadata *command.Metadata
// The result of the current execution, if available.
Result *command.Result
}
// NewContext starts a new Context for a given command.
func (c *Client) NewContext(ctx context.Context, cmd *command.Command, opt *command.ExecutionOptions, oe outerr.OutErr) (*Context, error) {
cmd.FillDefaultFieldValues()
if err := cmd.Validate(); err != nil {
return nil, err
}
grpcCtx, err := rc.ContextWithMetadata(ctx, cmd.Identifiers.ToolName, cmd.Identifiers.CommandID, cmd.Identifiers.InvocationID)
if err != nil {
return nil, err
}
return &Context{
ctx: grpcCtx,
cmd: cmd,
opt: opt,
oe: oe,
client: c,
Metadata: &command.Metadata{EventTimes: make(map[string]*command.TimeInterval)},
}, nil
}
func (ec *Context) downloadStream(raw []byte, dgPb *repb.Digest, write func([]byte)) error {
if raw != nil {
write(raw)
} else if dgPb != nil {
dg, err := digest.NewFromProto(dgPb)
if err != nil {
return err
}
bytes, err := ec.client.GrpcClient.ReadBlob(ec.ctx, dg)
if err != nil {
return err
}
write(bytes)
}
return nil
}
func (ec *Context) setOutputMetadata() {
if ec.resPb == nil {
return
}
ec.Metadata.OutputFiles = len(ec.resPb.OutputFiles) + len(ec.resPb.OutputFileSymlinks)
ec.Metadata.OutputDirectories = len(ec.resPb.OutputDirectories) + len(ec.resPb.OutputDirectorySymlinks)
ec.Metadata.OutputDigests = make(map[string]digest.Digest)
ec.Metadata.TotalOutputBytes = 0
for _, file := range ec.resPb.OutputFiles {
dg := digest.NewFromProtoUnvalidated(file.Digest)
ec.Metadata.OutputDigests[file.Path] = dg
ec.Metadata.TotalOutputBytes += dg.Size
}
if ec.resPb.StdoutRaw != nil {
ec.Metadata.TotalOutputBytes += int64(len(ec.resPb.StdoutRaw))
} else if ec.resPb.StdoutDigest != nil {
ec.Metadata.TotalOutputBytes += ec.resPb.StdoutDigest.SizeBytes
}
if ec.resPb.StderrRaw != nil {
ec.Metadata.TotalOutputBytes += int64(len(ec.resPb.StderrRaw))
} else if ec.resPb.StderrDigest != nil {
ec.Metadata.TotalOutputBytes += ec.resPb.StderrDigest.SizeBytes
}
}
func (ec *Context) downloadResults(execRoot string, downloadOutputs bool) *command.Result {
if !downloadOutputs {
return command.NewResultFromExitCode((int)(ec.resPb.ExitCode))
}
ec.setOutputMetadata()
ec.Metadata.EventTimes[command.EventDownloadResults] = &command.TimeInterval{From: time.Now()}
defer func() { ec.Metadata.EventTimes[command.EventDownloadResults].To = time.Now() }()
if err := ec.downloadStream(ec.resPb.StdoutRaw, ec.resPb.StdoutDigest, ec.oe.WriteOut); err != nil {
return command.NewRemoteErrorResult(err)
}
if err := ec.downloadStream(ec.resPb.StderrRaw, ec.resPb.StderrDigest, ec.oe.WriteErr); err != nil {
return command.NewRemoteErrorResult(err)
}
if err := ec.client.GrpcClient.DownloadActionOutputs(ec.ctx, ec.resPb, execRoot, ec.client.FileMetadataCache); err != nil {
return command.NewRemoteErrorResult(err)
}
// TODO(olaola): save output stats onto metadata here.
return command.NewResultFromExitCode((int)(ec.resPb.ExitCode))
}
func (ec *Context) computeInputs() error {
if ec.Metadata.ActionDigest.Size > 0 {
// Already computed inputs.
return nil
}
ec.Metadata.EventTimes[command.EventComputeMerkleTree] = &command.TimeInterval{From: time.Now()}
defer func() { ec.Metadata.EventTimes[command.EventComputeMerkleTree].To = time.Now() }()
executionID := ec.cmd.Identifiers.ExecutionID
cmdPb := ec.cmd.ToREProto()
log.V(2).Infof("%s> Command: \n%s\n", executionID, proto.MarshalTextString(cmdPb))
chunkSize := int(ec.client.GrpcClient.ChunkMaxSize)
var err error
if ec.cmdCh, err = chunker.NewFromProto(cmdPb, chunkSize); err != nil {
return err
}
cmdDg := ec.cmdCh.Digest()
ec.Metadata.CommandDigest = cmdDg
log.V(1).Infof("%s> Command digest: %s", executionID, cmdDg)
log.V(1).Infof("%s> Computing input Merkle tree...", executionID)
root, blobs, stats, err := tree.ComputeMerkleTree(ec.cmd.ExecRoot, ec.cmd.InputSpec, chunkSize, ec.client.FileMetadataCache)
log.V(1).Infof("%s> Finished Merkle tree...", executionID)
if err != nil {
return err
}
ec.inputBlobs = blobs
ec.Metadata.InputFiles = stats.InputFiles
ec.Metadata.InputDirectories = stats.InputDirectories
ec.Metadata.TotalInputBytes = stats.TotalInputBytes
acPb := &repb.Action{
CommandDigest: cmdDg.ToProto(),
InputRootDigest: root.ToProto(),
DoNotCache: ec.opt.DoNotCache,
}
if ec.cmd.Timeout > 0 {
acPb.Timeout = ptypes.DurationProto(ec.cmd.Timeout)
}
if ec.acCh, err = chunker.NewFromProto(acPb, chunkSize); err != nil {
return err
}
acDg := ec.acCh.Digest()
log.V(1).Infof("%s> Action digest: %s", executionID, acDg)
ec.inputBlobs = append(ec.inputBlobs, ec.cmdCh)
ec.inputBlobs = append(ec.inputBlobs, ec.acCh)
ec.Metadata.ActionDigest = acDg
ec.Metadata.TotalInputBytes += cmdDg.Size + acDg.Size
return nil
}
// GetCachedResult tries to get the command result from the cache. The Result will be nil on a
// cache miss. The Context will be ready to execute the action, or, alternatively, to
// update the remote cache with a local result. If the ExecutionOptions do not allow to accept
// remotely cached results, the operation is a noop.
func (ec *Context) GetCachedResult() {
if err := ec.computeInputs(); err != nil {
ec.Result = command.NewLocalErrorResult(err)
return
}
if ec.opt.AcceptCached && !ec.opt.DoNotCache {
ec.Metadata.EventTimes[command.EventCheckActionCache] = &command.TimeInterval{From: time.Now()}
resPb, err := ec.client.GrpcClient.CheckActionCache(ec.ctx, ec.Metadata.ActionDigest.ToProto())
ec.Metadata.EventTimes[command.EventCheckActionCache].To = time.Now()
if err != nil {
ec.Result = command.NewRemoteErrorResult(err)
return
}
ec.resPb = resPb
}
if ec.resPb != nil {
log.V(1).Infof("%s> Found cached result, downloading outputs...", ec.cmd.Identifiers.ExecutionID)
ec.Result = ec.downloadResults(ec.cmd.ExecRoot, ec.opt.DownloadOutputs)
if ec.Result.Err == nil {
ec.Result.Status = command.CacheHitResultStatus
}
return
}
ec.Result = nil
}
// UpdateCachedResult tries to write local results of the execution to the remote cache.
// TODO(olaola): optional arguments to override values of local outputs, and also stdout/err.
func (ec *Context) UpdateCachedResult() {
executionID := ec.cmd.Identifiers.ExecutionID
ec.Result = &command.Result{Status: command.SuccessResultStatus}
if ec.opt.DoNotCache {
log.V(1).Infof("%s> Command is marked do-not-cache, skipping remote caching.", executionID)
return
}
if err := ec.computeInputs(); err != nil {
ec.Result = command.NewLocalErrorResult(err)
return
}
ec.Metadata.EventTimes[command.EventUpdateCachedResult] = &command.TimeInterval{From: time.Now()}
defer func() { ec.Metadata.EventTimes[command.EventUpdateCachedResult].To = time.Now() }()
chunkSize := int(ec.client.GrpcClient.ChunkMaxSize)
outPaths := append(ec.cmd.OutputFiles, ec.cmd.OutputDirs...)
blobs, resPb, err := tree.ComputeOutputsToUpload(ec.cmd.ExecRoot, outPaths, chunkSize, ec.client.FileMetadataCache)
if err != nil {
ec.Result = command.NewLocalErrorResult(err)
return
}
ec.resPb = resPb
ec.setOutputMetadata()
toUpload := []*chunker.Chunker{ec.acCh, ec.cmdCh}
for _, ch := range blobs {
toUpload = append(toUpload, ch)
}
log.V(1).Infof("%s> Uploading local outputs...", executionID)
if missing, err := ec.client.GrpcClient.UploadIfMissing(ec.ctx, toUpload...); err != nil {
ec.Result = command.NewRemoteErrorResult(err)
return
} else {
ec.Metadata.MissingDigests = missing
}
log.V(1).Infof("%s> Updating remote cache...", executionID)
req := &repb.UpdateActionResultRequest{
InstanceName: ec.client.GrpcClient.InstanceName,
ActionDigest: ec.Metadata.ActionDigest.ToProto(),
ActionResult: resPb,
}
if _, err := ec.client.GrpcClient.UpdateActionResult(ec.ctx, req); err != nil {
ec.Result = command.NewRemoteErrorResult(err)
return
}
}
// ExecuteRemotely tries to execute the command remotely and download the results. It uploads any
// missing inputs first.
func (ec *Context) ExecuteRemotely() {
if err := ec.computeInputs(); err != nil {
ec.Result = command.NewLocalErrorResult(err)
return
}
executionID := ec.cmd.Identifiers.ExecutionID
log.V(1).Infof("%s> Checking inputs to upload...", executionID)
// TODO(olaola): compute input cache hit stats.
ec.Metadata.EventTimes[command.EventUploadInputs] = &command.TimeInterval{From: time.Now()}
missing, err := ec.client.GrpcClient.UploadIfMissing(ec.ctx, ec.inputBlobs...)
ec.Metadata.EventTimes[command.EventUploadInputs].To = time.Now()
if err != nil {
ec.Result = command.NewRemoteErrorResult(err)
return
}
ec.Metadata.MissingDigests = missing
log.V(1).Infof("%s> Executing remotely...\n%s", executionID, strings.Join(ec.cmd.Args, " "))
ec.Metadata.EventTimes[command.EventExecuteRemotely] = &command.TimeInterval{From: time.Now()}
op, err := ec.client.GrpcClient.ExecuteAndWait(ec.ctx, &repb.ExecuteRequest{
InstanceName: ec.client.GrpcClient.InstanceName,
SkipCacheLookup: !ec.opt.AcceptCached || ec.opt.DoNotCache,
ActionDigest: ec.Metadata.ActionDigest.ToProto(),
})
ec.Metadata.EventTimes[command.EventExecuteRemotely].To = time.Now()
if err != nil {
ec.Result = command.NewRemoteErrorResult(err)
return
}
or := op.GetResponse()
if or == nil {
ec.Result = command.NewRemoteErrorResult(fmt.Errorf("unexpected operation result type: %v", or))
return
}
resp := &repb.ExecuteResponse{}
if err := ptypes.UnmarshalAny(or, resp); err != nil {
ec.Result = command.NewRemoteErrorResult(err)
return
}
ec.resPb = resp.Result
setTimingMetadata(ec.Metadata, resp.Result.GetExecutionMetadata())
st := status.FromProto(resp.Status)
message := resp.Message
if message != "" && (st.Code() != codes.OK || ec.resPb != nil && ec.resPb.ExitCode != 0) {
ec.oe.WriteErr([]byte(message + "\n"))
}
if ec.resPb != nil {
log.V(1).Infof("%s> Downloading outputs...", executionID)
ec.Result = ec.downloadResults(ec.cmd.ExecRoot, ec.opt.DownloadOutputs)
if resp.CachedResult && ec.Result.Err == nil {
ec.Result.Status = command.CacheHitResultStatus
}
}
if st.Code() == codes.DeadlineExceeded {
ec.Result = command.NewTimeoutResult()
return
}
if st.Code() != codes.OK {
ec.Result = command.NewRemoteErrorResult(st.Err())
return
}
if ec.resPb == nil {
ec.Result = command.NewRemoteErrorResult(fmt.Errorf("execute did not return action result"))
}
}
// DownloadResults downloads the result of the command in the context to the specified directory.
func (ec *Context) DownloadResults(execRoot string) {
st := ec.Result.Status
ec.Result = ec.downloadResults(execRoot, true)
if ec.Result.Err == nil {
ec.Result.Status = st
}
}
func timeFromProto(tPb *tspb.Timestamp) time.Time {
if tPb == nil {
return time.Time{}
}
t, err := ptypes.Timestamp(tPb)
if err != nil {
log.Errorf("Failed to parse RBE timestamp: %+v - > %v", tPb, err)
}
return t
}
func setEventTimes(cm *command.Metadata, event string, start, end *tspb.Timestamp) {
cm.EventTimes[event] = &command.TimeInterval{
From: timeFromProto(start),
To: timeFromProto(end),
}
}
func setTimingMetadata(cm *command.Metadata, em *repb.ExecutedActionMetadata) {
if em == nil {
return
}
setEventTimes(cm, command.EventServerQueued, em.QueuedTimestamp, em.WorkerStartTimestamp)
setEventTimes(cm, command.EventServerWorker, em.WorkerStartTimestamp, em.WorkerCompletedTimestamp)
setEventTimes(cm, command.EventServerWorkerInputFetch, em.InputFetchStartTimestamp, em.InputFetchCompletedTimestamp)
setEventTimes(cm, command.EventServerWorkerExecution, em.ExecutionStartTimestamp, em.ExecutionCompletedTimestamp)
setEventTimes(cm, command.EventServerWorkerOutputUpload, em.OutputUploadStartTimestamp, em.OutputUploadCompletedTimestamp)
}
// Run executes a command remotely.
func (c *Client) Run(ctx context.Context, cmd *command.Command, opt *command.ExecutionOptions, oe outerr.OutErr) (*command.Result, *command.Metadata) {
ec, err := c.NewContext(ctx, cmd, opt, oe)
if err != nil {
return command.NewLocalErrorResult(err), &command.Metadata{}
}
ec.GetCachedResult()
if ec.Result != nil {
return ec.Result, ec.Metadata
}
ec.ExecuteRemotely()
// TODO(olaola): implement the cache-miss-retry loop.
return ec.Result, ec.Metadata
}