blob: f9fda304aa9a42d0687aa4fcf714bb689f5016b2 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for ../metrics/wakeup.py."""
import logging
import os
import unittest
from reporting import metrics
from trace_processing import trace_importing, trace_model
from trace_processing.metrics import wakeup
_LOGGER: logging.Logger = logging.getLogger(__name__)
_LABEL = "WakeupTime"
_EVENTS = ["WakeupEvent1", "WakeupEvent2", "WakeupEvent3"]
# Boilerplate-busting constants:
U = metrics.Unit
TCR = metrics.TestCaseResult
class WakeupMetricsTest(unittest.TestCase):
"""Wakeup Metrics tests."""
@staticmethod
def _load_model(model_file_name: str) -> trace_model.Model:
# A second dirname is required to account for the .pyz archive which
# contains the test and a third one since data is a sibling of the test.
runtime_deps_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"runtime_deps/wakeup_metrics_processor",
)
return trace_importing.create_model_from_file_path(
os.path.join(runtime_deps_path, model_file_name)
)
def test_process_metrics_handles_zero_wakeups(self) -> None:
model = WakeupMetricsTest._load_model("zero_wakeups.json")
result = wakeup.WakeupMetricsProcessor(_LABEL, _EVENTS).process_metrics(
model
)
self.assertEqual(result, [])
def test_process_metrics_raises_on_zero_wakeups(self) -> None:
model = WakeupMetricsTest._load_model("zero_wakeups.json")
with self.assertRaises(RuntimeError) as context:
wakeup.WakeupMetricsProcessor(
_LABEL, _EVENTS, require_wakeup=True
).process_metrics(model)
self.assertEqual(
str(context.exception),
"Required wakeup not present in trace, observed events: '', missing event: 'WakeupEvent1'",
)
def test_process_metrics_raises_on_incomplete_wakeup(self) -> None:
model = WakeupMetricsTest._load_model("incomplete_wakeup.json")
with self.assertRaises(RuntimeError) as context:
wakeup.WakeupMetricsProcessor(
_LABEL, _EVENTS, require_wakeup=True
).process_metrics(model)
self.assertEqual(
str(context.exception),
"Required wakeup not present in trace, observed events: 'WakeupEvent1,WakeupEvent2', missing event: 'WakeupEvent3'",
)
def test_process_metrics_handles_one_wakeup(self) -> None:
model = WakeupMetricsTest._load_model("one_wakeup.json")
result = wakeup.WakeupMetricsProcessor(_LABEL, _EVENTS).process_metrics(
model
)
self.assertEqual(
result,
[
TCR(
label=_LABEL,
unit=U.nanoseconds,
# Measured from end of WakeupEvent1 to end of WakeupEvent3.
values=[31000000],
)
],
)
def test_process_metrics_handles_many_wakeup(self) -> None:
model = WakeupMetricsTest._load_model("many_wakeups.json")
result = wakeup.WakeupMetricsProcessor(_LABEL, _EVENTS).process_metrics(
model
)
self.assertEqual(
result,
[
TCR(
label=_LABEL,
unit=U.nanoseconds,
# Measured from end of WakeupEvent1 to end of WakeupEvent3.
values=[31000000, 22000000],
)
],
)
def test_event_0_restarts_sequence(self) -> None:
model = WakeupMetricsTest._load_model("restarted_wakeup.json")
result = wakeup.WakeupMetricsProcessor(_LABEL, _EVENTS).process_metrics(
model
)
self.assertEqual(
result,
[
TCR(
label=_LABEL,
unit=U.nanoseconds,
# Measured from end of the second instance WakeupEvent1 to end of WakeupEvent3.
values=[22000000],
)
],
)