blob: cc1dd016e5fe3d41f42438cc05835c47977bef24 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from PB.infra.fuchsia import Fuchsia
from recipe_engine.recipe_api import Property
DEPS = [
'fuchsia/artifacts',
'fuchsia/build',
'fuchsia/buildbucket_util',
'fuchsia/checkout',
'fuchsia/experimental',
'fuchsia/fuchsia',
'fuchsia/testing',
'fuchsia/testing_requests',
'fuchsia/testsharder',
'recipe_engine/buildbucket',
'recipe_engine/isolated',
'recipe_engine/file',
'recipe_engine/json',
'recipe_engine/path',
# Needed because we pass our own api into FuchsiaBuildResults
'recipe_engine/platform',
'recipe_engine/properties',
'recipe_engine/raw_io',
'recipe_engine/step',
'recipe_engine/swarming',
'recipe_engine/time',
]
PROPERTIES = {
'gcs_bucket':
Property(
kind=str,
help='GCS bucket for uploading checkout, build, and test results',
default='fuchsia_infra'),
'device_type':
Property(
kind=str,
help='Passed through to spec field Fuchsia.Test.device_type',
default='QEMU'),
'pave':
Property(
kind=bool,
help='Passed through to spec field Fuchsia.Test.pave',
default=True),
'test_in_shards':
Property(
kind=bool,
help='Passed through to spec field Fuchsia.Test.test_in_shards',
default=False),
'upload_to_catapult':
Property(
kind=bool,
help='Passed through to spec field Fuchsia.Test.upload_to_catapult',
default=False),
'collect_timeout_secs':
Property(
kind=int,
help=('Passed through to spec field '
'Fuchsia.Test.collect_timeout_secs'),
default=0),
'debug_symbol_gcs_bucket':
Property(
kind=str,
help='Passed through to spec field Fuchsia.debug_symbol_gcs_bucket',
default='debug-symbols'),
'test_async':
Property(
kind=bool,
help='Whether to call the deprecated_test_async method',
default=False),
'per_test_timeout_secs':
Property(
kind=int,
help='Passed through to spec field Fuchsia.Test.per_test_timeout_secs',
default=0),
'rerun_budget_secs':
Property(
kind=int,
help='Passed through to spec field Fuchsia.Test.rerun_budget_secs',
default=0),
'use_runtests':
Property(kind=bool, help='Whether to use runtests', default=False),
'variants':
Property(kind=list, help='GN variants', default=[]),
}
def RunSteps(api, gcs_bucket, device_type, pave, test_in_shards,
upload_to_catapult, collect_timeout_secs, debug_symbol_gcs_bucket,
test_async, per_test_timeout_secs, rerun_budget_secs, use_runtests,
variants):
upload_results = bool(gcs_bucket)
# Intended to be a minimal amount of code to create a valid FuchsiaBuildResults object.
checkout_root = api.path['start_dir']
fuchsia_build_dir = checkout_root.join('out', 'default')
gn_results = api.build.GNResults(api, fuchsia_build_dir)
images_list = api.json.read('read images',
fuchsia_build_dir.join('images.json')).json.output
images_dict = {i['name']: i for i in images_list}
build_results = api.build.FuchsiaBuildResults(
api, checkout_root, 'arm64', variants, 'build-type', fuchsia_build_dir,
checkout_root.join('out', 'default.zircon'), '//boards/foo.gni',
'//products/foo.gni', gn_results, images_dict)
# Configure context of uploaded artifacts for test task construction.
api.artifacts.gcs_bucket = 'fuchsia-artifacts'
api.artifacts.uuid = api.buildbucket_util.id
test_spec = Fuchsia.Test(
device_type=device_type,
pave=pave,
pool='fuchsia.tests',
test_in_shards=test_in_shards,
upload_to_catapult=upload_to_catapult,
collect_timeout_secs=collect_timeout_secs,
per_test_timeout_secs=per_test_timeout_secs,
use_runtests=use_runtests,
default_service_account='default_service_account',
rerun_budget_secs=rerun_budget_secs,
)
if rerun_budget_secs:
test_spec.max_attempts = 1
spec = Fuchsia(
test=test_spec,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
)
if test_in_shards:
shards = api.testsharder.execute('load test shards', 'testsharder',
fuchsia_build_dir)
shard_requests = api.testing_requests.shard_requests(
build_results,
api.buildbucket.build,
spec.test.per_test_timeout_secs,
spec.test.pool,
shards,
spec.test.swarming_expiration_timeout_secs,
spec.test.swarming_io_timeout_secs,
spec.test.use_runtests,
timeout_secs=spec.test.timeout_secs,
default_service_account=test_spec.default_service_account,
)
orchestration_inputs = api.build.TestOrchestrationInputs.from_build_results(
build_results, shard_requests)
testing_tasks = api.testing.test_in_shards(
collect_timeout_secs=spec.test.collect_timeout_secs,
debug_symbol_gcs_bucket=spec.debug_symbol_gcs_bucket,
orchestration_inputs=orchestration_inputs,
max_attempts=spec.test.max_attempts,
rerun_budget_secs=spec.test.rerun_budget_secs)
all_results = api.testing.final_results(testing_tasks)
else:
shard_requests = api.testing_requests.deprecated_shard_requests(
build_results,
api.testing_requests.deprecated_test_cmds(test_spec),
test_spec.device_type,
test_spec.pool,
test_spec.timeout_secs,
test_spec.pave,
swarming_expiration_timeout_secs=spec.test
.swarming_expiration_timeout_secs,
swarming_io_timeout_secs=spec.test.swarming_io_timeout_secs,
default_service_account=test_spec.default_service_account,
)
orchestration_inputs = api.build.TestOrchestrationInputs.from_build_results(
build_results, shard_requests)
if test_async:
result = api.testing.deprecated_test_async(
debug_symbol_gcs_bucket,
device_type,
orchestration_inputs,
)()
else:
result = api.testing.deprecated_test(
debug_symbol_gcs_bucket,
device_type,
orchestration_inputs,
)
all_results = [result]
# Upload test results
if upload_results:
for test_results in all_results:
test_results.upload_results(
gcs_bucket=gcs_bucket,
upload_to_catapult=upload_to_catapult,
)
if 'profile' in build_results.variants:
api.testing.process_coverage(
covargs_path=orchestration_inputs.covargs,
test_results=[
result for result in all_results if result.from_fuchsia
],
ids_txt=orchestration_inputs.ids,
llvm_profdata=orchestration_inputs.llvm_profdata,
llvm_cov=orchestration_inputs.llvm_cov,
gcs_bucket=gcs_bucket)
# Raise test failures
with api.step.defer_results():
api.testing.raise_failures()
for test_results in all_results:
test_results.raise_failures()
def GenTests(api):
# For coverage
api.testing.task_requests_step_data([api.testing.task_request_jsonish(False)],
'')
def test(*args, **kwargs):
return api.testing.test(*args, **kwargs) + api.step_data(
'read images', api.build.mock_image_manifest())
# Test cases for running Fuchsia tests as a swarming task.
yield test(
'isolated_tests_no_json',
status='failure',
# Test a missing summary.json file. Clear the default steps and manage
# them manually to avoid providing the file, which is usually done by the
# auto-included test_step_data step.
clear_default_steps=True,
steps=[
api.testing.task_step_data([
api.swarming.task_result(
id='1', name='test', outputs=['out.tar']),
]),
])
yield test(
'isolated_test_device_no_pave',
properties={
'device_type': 'Intel NUC Kit NUC6i3SYK',
'pave': False,
},
)
yield test(
'isolated_tests_test_failure',
expect_failure=True, # Failure steps injected below.
steps=[
api.testing.task_step_data(
[
api.swarming.task_result(
id='1', name='test', outputs=['out.tar']),
],
iteration=0,
),
api.testing.task_step_data(
[
api.swarming.task_result(
id='2', name='test', outputs=['out.tar']),
],
iteration=1,
),
api.testing.tests_json_data(iteration=0),
api.testing.tests_json_data(iteration=1),
api.testing.test_step_data(failure=True, iteration=0),
api.testing.test_step_data(failure=True, iteration=1),
api.step_data('run tests.attempt 0.task results.symbolize logs',
api.raw_io.stream_output('bt1\nbt2\n')),
api.step_data('run tests.attempt 1.task results.symbolize logs',
api.raw_io.stream_output('bt1\nbt2\n')),
])
# TODO(garymm): Remove retries to simplify this test.
yield test(
'isolated_tests_no_resource',
status='infra_failure',
expect_failure=True, # Failure step injected below.
steps=[
api.testing.task_step_data(
[
api.swarming.task_result(
id='1',
name='test',
state=api.swarming.TaskState.NO_RESOURCE,
),
],
iteration=0,
),
api.testing.task_step_data(
[
api.swarming.task_result(
id='2',
name='test',
state=api.swarming.TaskState.NO_RESOURCE,
),
],
iteration=1,
),
])
yield test(
'isolated_tests_kernel_panic',
expect_failure=True, # Failure step injected below.
steps=[
api.testing.task_step_data(
[
api.swarming.task_result(
id='1',
name='test',
state=api.swarming.TaskState.TIMED_OUT,
output='KERNEL PANIC',
),
],
iteration=0,
),
api.testing.task_step_data(
[
api.swarming.task_result(
id='2',
name='test',
state=api.swarming.TaskState.TIMED_OUT,
output='KERNEL PANIC',
),
],
iteration=1,
),
],
)
# Test case for generating test coverage
yield test(
'upload_test_coverage',
properties={
'gcs_bucket': 'fuchsia-build',
'variants': ['profile'],
})
# Test case for ASan.
yield test(
'asan_tests',
properties={
'gcs_bucket': 'fuchsia-build',
'variants': ['profile'],
})
# Test cases for testing in shards.
# TODO(fxb/9784): during mass clean-up, move into into api.testing.test_api.
test_task_outputs = [
'syslog.txt',
'serial.txt',
'out.tar',
'benchmark.catapult_json',
]
def test_task_data(*shard_names, **kwargs): # pylint: disable=invalid-name
iteration = kwargs.pop('iteration', 0)
assert not kwargs
results = []
step_data = api.step_data(None)
for idx, name in enumerate(shard_names):
results.append(
api.swarming.task_result(
id=str(idx), name=name, outputs=test_task_outputs))
step_data += api.testing.test_step_data(
shard_name=name, qemu='EMU' in name)
step_data += api.testing.task_retry_step_data(results, iteration=iteration)
return step_data
# TODO(garymm): Remove retries to simplify this test.
yield test(
'sharded_kernel_panic',
expect_failure=True, # Failure step injected below.
properties={'test_in_shards': True},
steps=[
api.testsharder.execute(
'load test shards',
shards=[
api.testsharder.shard(
name='Vim2',
tests=[api.testsharder.test('test', os='linux')],
dimensions=dict(device_type='Khadas Vim2 Max'),
),
]),
test_task_data('Vim2'),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='1',
name='Vim2',
outputs=test_task_outputs,
output='KERNEL PANIC',
),
],
iteration=0),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='2',
name='Vim2',
outputs=test_task_outputs,
output='KERNEL PANIC',
),
],
iteration=1),
],
)
# TODO(garymm): Remove retries to simplify this test.
yield test(
'sharded_failure_string',
expect_failure=True, # Failure step injected below.
properties={'test_in_shards': True},
steps=[
api.testsharder.execute(
'load test shards',
shards=[
api.testsharder.shard(
name='Vim2',
tests=[api.testsharder.test('test', os='linux')],
dimensions=dict(device_type='Khadas Vim2 Max'),
),
]),
test_task_data('Vim2'),
api.testing.task_retry_step_data(
task_results=[
api.swarming.task_result(
id='1',
name='Vim2',
outputs=test_task_outputs,
),
],
iteration=0,
),
api.testing.task_retry_log_data(
iteration=0,
task_name='Vim2',
log_name='serial.txt',
log_contents='ASSERT FAILED'),
api.testing.task_retry_step_data(
task_results=[
api.swarming.task_result(
id='2',
name='Vim2',
outputs=test_task_outputs,
),
],
iteration=1,
),
api.testing.task_retry_log_data(
iteration=1,
task_name='Vim2',
log_name='serial.txt',
log_contents='DEVICE SUSPEND TIMED OUT'),
],
)
yield test(
'test_with_no_shards',
clear_default_steps=True,
properties={
'test_in_shards': True,
},
steps=[api.testsharder.execute('load test shards', shards=())])
yield test(
'test_with_shards_arm64_serial_failure',
status='failure',
clear_default_steps=True,
properties={
'test_in_shards': True,
},
steps=[
api.testsharder.execute(
'load test shards',
shards=[
api.testsharder.shard(
name='Vim2',
tests=[api.testsharder.test('test', os='linux')],
dimensions=dict(device_type='Khadas Vim2 Max'),
),
]),
test_task_data('Vim2'),
api.step_data('check log Vim2:serial.txt.read serial.txt',
api.raw_io.output_text('...DEVICE SUSPEND TIMED OUT\n'))
])
# fuchsia-0000 passes the first time.
# fuchsia-0001 has tests that always fail.
# fuchsia-0002 always times out.
# fuchsia-0003 has tests that fail the first time but pass the second time.
yield test(
'test_in_shards_mixed_failure',
status='failure',
clear_default_steps=True,
properties={
'test_in_shards': True,
# Here to get coverage for this path without adding another test.
'per_test_timeout_secs': 1,
},
steps=[
api.testsharder.execute('load test shards', shards=[
api.testsharder.shard(
name='fuchsia-0000',
tests=api.testing_requests.default_tests(),
dimensions=dict(device_type='QEMU'),
),
api.testsharder.shard(
name='fuchsia-0001',
tests=[api.testsharder.test('test1')],
dimensions=dict(device_type='NUC'),
),
api.testsharder.shard(
name='fuchsia-0002',
tests=api.testing_requests.default_tests(),
dimensions=dict(device_type='QEMU'),
),
api.testsharder.shard(
name='fuchsia-0003',
tests=[api.testsharder.test('test3')],
dimensions=dict(device_type='NUC'),
),
]),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='610',
name='fuchsia-0000',
outputs=test_task_outputs,
),
api.swarming.task_result(
id='710',
name='fuchsia-0001',
outputs=test_task_outputs,
),
api.swarming.task_result(
id='810',
name='fuchsia-0002',
state=api.swarming.TaskState.TIMED_OUT,
outputs=['serial.txt', 'syslog.txt'],
),
api.swarming.task_result(
id='910',
name='fuchsia-0003',
outputs=test_task_outputs,
),
], iteration=0),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='711',
name='fuchsia-0001',
outputs=test_task_outputs,
),
api.swarming.task_result(
id='811',
name='fuchsia-0002',
state=api.swarming.TaskState.TIMED_OUT,
outputs=['serial.txt', 'syslog.txt'],
),
api.swarming.task_result(
id='911',
name='fuchsia-0003',
outputs=test_task_outputs,
),
], iteration=1),
api.testing.test_step_data(
# Test that multiplied shards (where the same test executable is
# run many times) are handled correctly.
shard_name='fuchsia-0000',
qemu=False,
tests_json=[
{
'test': {
'path': 'host_x64/foo_test',
'name': 'foo_test (%i)' % (i + 1),
},
} for i in range(5)
]),
api.testing.test_step_data(
shard_name='fuchsia-0001', qemu=False, failure=True, iteration=0),
api.testing.test_step_data(
shard_name='fuchsia-0001', failure=True, iteration=1),
api.testing.test_step_data(
shard_name='fuchsia-0003', qemu=False, failure=True, iteration=0),
api.testing.test_step_data(
shard_name='fuchsia-0003', qemu=False, iteration=1),
]) # yapf: disable
yield test(
'test_in_shards_single_attempt',
status='failure',
clear_default_steps=True,
properties={
'test_in_shards': True,
'per_test_timeout_secs': 1,
},
steps=[
api.testsharder.execute('load test shards', shards=[
api.testsharder.shard(
name='multiplied:fuchsia-0000',
tests=api.testing_requests.default_tests(),
dimensions=dict(device_type='QEMU'),
),
]),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='610',
name='multiplied:fuchsia-0000',
outputs=test_task_outputs,
),
], iteration=0),
api.testing.test_step_data(
shard_name='multiplied:fuchsia-0000', failure=True, iteration=0),
]) # yapf: disable
# TODO(garymm): combine this with another test. Just set collect_timeout_secs on some
# other test's properties to get the same coverage.
yield test(
'fail_then_timeout',
status='failure',
clear_default_steps=True,
properties={
'test_in_shards': True,
'collect_timeout_secs': 2,
},
steps=[
api.testsharder.execute('load test shards', shards=[
api.testsharder.shard(
name='fuchsia-0000',
tests=api.testing_requests.default_tests(),
dimensions=dict(device_type='QEMU'),
),
]),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='610',
name='fuchsia-0000',
outputs=test_task_outputs,
),
], iteration=0),
api.testing.task_retry_step_data([
api.swarming.task_result(
id='611',
name='fuchsia-0000',
state=api.swarming.TaskState.TIMED_OUT,
outputs=['serial.txt', 'syslog.txt'],
),
], iteration=1),
api.testing.test_step_data(
shard_name='fuchsia-0000', iteration=0, failure=True)
]) # yapf: disable
yield test(
'upload_to_catapult',
clear_default_steps=True,
properties={
'test_in_shards': True,
'upload_to_catapult': True,
},
steps=[
api.testsharder.execute(
'load test shards',
shards=[
api.testsharder.shard(
name='Linux',
tests=api.testing_requests.default_tests(),
# For coverage of non-fuchsia specific code paths.
dimensions=dict(os='linux'),
),
]),
test_task_data('Linux'),
])
yield test(
'rerun',
expect_failure=True,
clear_default_steps=True,
properties={
'test_in_shards': True,
'rerun_budget_secs': 100,
},
steps=[
api.testsharder.execute(
'load test shards',
shards=[
api.testsharder.shard(
name='fuchsia-0000',
tests=api.testing_requests.default_tests(),
dimensions=dict(device_type='QEMU'),
),
]),
api.testing.task_retry_step_data(
[
api.swarming.task_result(
id='611',
name='fuchsia-0000',
outputs=test_task_outputs,
),
],
iteration=0,
),
api.testing.test_step_data(
shard_name='fuchsia-0000', iteration=0, failure=True),
]
) + (
# One less than rerun_budget_secs. Should result in the
# testing task being run exactly once.
api.time.step(99))
yield test(
'async',
properties={'test_async': True},
enable_retries=False,
)