blob: fb30c56e2dbc2a7c2cd75e12ba1e4dcd054f916d [file] [log] [blame]
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_test_api
from PB.go.chromium.org.luci.buildbucket.proto import (
builds_service as builds_service_pb2,
)
class CLUtilTestApi(recipe_test_api.RecipeTestApi):
def create_cl(self, step_name, test_data):
return self.step_data(step_name, self.m.proto.output(test_data))
def trigger_cq(self, step_name, success=True):
return self.step_data(step_name, retcode=0 if success else 1)
def trigger_tryjobs(self, step_name, tryjob_msgs):
responses = [dict(schedule_build=m) for m in tryjob_msgs]
return self.m.buildbucket.simulated_schedule_output(
batch_response=builds_service_pb2.BatchResponse(responses=responses),
step_name="%s.schedule" % step_name,
)
def collect_tryjobs(
self, step_name, builders=None, tryjob_msgs=None, status="SUCCESS"
):
if builders:
tryjob_msgs = []
for idx, builder in enumerate(builders):
project, bucket, builder_name = builder.split("/")
msg = self.m.buildbucket.try_build_message(
builder=builder_name,
bucket=bucket,
project=project,
build_id=(idx + 1) * 1234567890,
status=status,
)
tryjob_msgs.append(msg)
search_results = self.m.buildbucket.simulated_search_results(
tryjob_msgs, step_name="%s.search" % step_name,
)
collect_data = self.m.buildbucket.simulated_collect_output(
tryjob_msgs, step_name="%s.collect" % step_name,
)
if builders:
return search_results + collect_data
return collect_data