blob: 40d9d367dcb837ecd317f9bc2e382511efa4a3a7 [file] [log] [blame]
package fakes
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
regrpc "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
oppb "google.golang.org/genproto/googleapis/longrunning"
anypb "google.golang.org/protobuf/types/known/anypb"
)
// Exec implements the complete RE execution interface for a single execution, returning a fixed
// result or an error.
type Exec struct {
// Execution will check the action cache first, and update the action cache upon completion.
ac *ActionCache
// The action will be fetched from the CAS at start of execution, and outputs will be put in the
// CAS upon execution completion.
cas *CAS
// Fake result of an execution.
// The returned completed result, if any.
ActionResult *repb.ActionResult
// Returned completed execution status, if not Ok.
Status *status.Status
// Whether action was fake-fetched from the action cache upon execution (simulates a race between
// two executions).
Cached bool
// Any blobs that will be put in the CAS after the fake execution completes.
OutputBlobs [][]byte
// Number of Execute calls.
numExecCalls int32
// Used for errors.
t testing.TB
// The digest of the fake action.
adg digest.Digest
}
// NewExec returns a new empty Exec.
func NewExec(t testing.TB, ac *ActionCache, cas *CAS) *Exec {
c := &Exec{t: t, ac: ac, cas: cas}
c.Clear()
return c
}
// Clear removes all preset results from the fake.
func (s *Exec) Clear() {
s.ActionResult = nil
s.Status = nil
s.Cached = false
s.OutputBlobs = nil
atomic.StoreInt32(&s.numExecCalls, 0)
}
// ExecuteCalls returns the total number of Execute calls.
func (s *Exec) ExecuteCalls() int {
return int(atomic.LoadInt32(&s.numExecCalls))
}
func (s *Exec) fakeExecution(dg digest.Digest, skipCacheLookup bool) (*oppb.Operation, error) {
ar := s.ActionResult
st := s.Status
cached := s.Cached
// Check action cache first, unless instructed not to.
if !skipCacheLookup {
cr := s.ac.Get(dg)
if cr != nil {
ar = cr
st = nil
cached = true
}
}
// Fetch action from CAS.
blob, ok := s.cas.Get(dg)
if !ok {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("action blob with digest %v not in the cas", dg))
}
apb := &repb.Action{}
if err := proto.Unmarshal(blob, apb); err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("error unmarshalling %v as Action", blob))
}
if !apb.DoNotCache {
s.ac.Put(dg, ar)
}
for _, out := range s.OutputBlobs {
s.cas.Put(out)
}
execResp := &repb.ExecuteResponse{
Result: ar,
Status: st.Proto(),
CachedResult: cached,
}
any, err := anypb.New(execResp)
if err != nil {
return nil, err
}
return &oppb.Operation{
Name: "fake",
Done: true,
Result: &oppb.Operation_Response{Response: any},
}, nil
}
// GetCapabilities returns the fake capabilities.
func (s *Exec) GetCapabilities(ctx context.Context, req *repb.GetCapabilitiesRequest) (res *repb.ServerCapabilities, err error) {
dgFn := digest.GetDigestFunction()
res = &repb.ServerCapabilities{
ExecutionCapabilities: &repb.ExecutionCapabilities{
DigestFunction: dgFn,
ExecEnabled: true,
},
CacheCapabilities: &repb.CacheCapabilities{
DigestFunctions: []repb.DigestFunction_Value{dgFn},
ActionCacheUpdateCapabilities: &repb.ActionCacheUpdateCapabilities{
UpdateEnabled: true,
},
MaxBatchTotalSizeBytes: client.DefaultMaxBatchSize,
SymlinkAbsolutePathStrategy: repb.SymlinkAbsolutePathStrategy_DISALLOWED,
},
}
return res, nil
}
// Execute returns the saved result ActionResult, or a Status. It also puts it in the action cache
// unless the execute request specified
func (s *Exec) Execute(req *repb.ExecuteRequest, stream regrpc.Execution_ExecuteServer) (err error) {
dg, err := digest.NewFromProto(req.ActionDigest)
if err != nil {
return status.Error(codes.InvalidArgument, fmt.Sprintf("invalid digest received: %v", req.ActionDigest))
}
if dg != s.adg {
s.t.Errorf("unexpected action digest received by fake: expected %v, got %v", s.adg, dg)
return status.Error(codes.InvalidArgument, fmt.Sprintf("unexpected digest received: %v", req.ActionDigest))
}
if op, err := s.fakeExecution(dg, req.SkipCacheLookup); err != nil {
return err
} else if err = stream.Send(op); err != nil {
return err
}
atomic.AddInt32(&s.numExecCalls, 1)
return nil
}
// WaitExecution is not implemented on this fake.
func (s *Exec) WaitExecution(req *repb.WaitExecutionRequest, stream regrpc.Execution_WaitExecutionServer) (err error) {
return status.Error(codes.Unimplemented, "method WaitExecution not implemented by test fake")
}