| # Copyright 2020 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from datetime import timedelta |
| |
| from RECIPE_MODULES.fuchsia.utils import ( |
| cached_property, |
| memoize, |
| nice_duration, |
| pluralize, |
| ) |
| |
| PYTHON_VERSION_COMPATIBILITY = "PY3" |
| |
| DEPS = [ |
| "fuchsia/status_check", |
| "fuchsia/utils", |
| "recipe_engine/step", |
| ] |
| |
| |
| def test_retry(api): |
| max_attempts = 2 |
| |
| call_count = {"count": 0} |
| |
| def func_to_retry(): |
| call_count["count"] += 1 |
| api.step.empty("step to retry") |
| # Only the last attempt is successful. |
| success = call_count["count"] == max_attempts |
| if not success: |
| raise api.step.StepFailure("step failed") |
| |
| api.utils.retry(func_to_retry, max_attempts) |
| assert call_count["count"] == max_attempts |
| |
| call_count = {"count": 0} |
| |
| def raising_func(): |
| call_count["count"] += 1 |
| return api.step.empty("failing step to retry", status=api.step.FAILURE) |
| |
| try: |
| api.utils.retry(raising_func, max_attempts) |
| except api.step.StepFailure: |
| pass |
| else: # pragma: no cover |
| assert False, "api.utils.retry should raise if the last attempt raises" |
| |
| assert call_count["count"] == max_attempts |
| |
| |
| def test_memoize(api): |
| unique_call_count = {"count": 0} |
| |
| @memoize |
| def sleep(seconds, add_one=False): |
| unique_call_count["count"] += 1 |
| if add_one: |
| seconds += 1 |
| api.step("sleep %d seconds" % seconds, ["sleep", str(seconds)]) |
| return seconds |
| |
| assert sleep(5) == 5 |
| assert unique_call_count["count"] == 1 |
| assert sleep(5) == 5 |
| assert unique_call_count["count"] == 1 |
| |
| # Cached inputs should be keyed by both args and kwargs. |
| assert sleep(5, add_one=True) == 6 |
| assert unique_call_count["count"] == 2 |
| assert sleep(5, add_one=True) == 6 |
| assert unique_call_count["count"] == 2 |
| |
| |
| def test_nice_duration(): |
| test_cases = { |
| timedelta(seconds=0): "0s", |
| timedelta(seconds=0.5): "0.5s", |
| timedelta(seconds=1): "1s", |
| timedelta(hours=5, seconds=1): "5h 1s", |
| timedelta(days=5, minutes=25): "5d 25m", |
| } |
| for td, expected in test_cases.items(): |
| actual = nice_duration(td.total_seconds()) |
| assert actual == expected, "%s != %s" % (actual, expected) |
| |
| |
| def test_pluralize(): |
| assert pluralize("cat", 2) == "2 cats" |
| assert pluralize("cat", 1) == "1 cat" |
| assert pluralize("cat", 0) == "0 cats" |
| assert pluralize("cat", ["Tom"]) == "1 cat" |
| assert pluralize("mouse", 2, plural="mice") == "2 mice" |
| |
| |
| def test_cached_property(): |
| calls = {"calls": 0} |
| |
| class CachedPropertyTest(object): |
| @cached_property |
| def x(self): |
| calls["calls"] += 1 |
| return calls["calls"] |
| |
| t = CachedPropertyTest() |
| # t.x should always return the same value. |
| for _ in range(3): |
| assert t.x == 1 |
| assert calls["calls"] == 1 |
| |
| |
| # Test that the interface of traceback_format_exc() stays the same |
| # whether or not its test mode is enabled. As a side effect, this |
| # enables its test mode (prune_tracebacks_for_testing()). |
| def test_traceback_format_exc(api): |
| try: |
| raise AssertionError() |
| except AssertionError: |
| text = api.utils.traceback_format_exc() |
| assert isinstance(text, str) |
| assert "line " not in text, repr(text) |
| |
| |
| def RunSteps(api): |
| test_retry(api) |
| test_memoize(api) |
| test_nice_duration() |
| test_pluralize() |
| test_cached_property() |
| test_traceback_format_exc(api) |
| |
| |
| def GenTests(api): |
| yield api.status_check.test("basic") |