blob: d1b0dfc464eacc48b3c88bef01d605e43038cd2e [file] [log] [blame]
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
from time import monotonic
from typing import Any, List
from fuchsia_base_test import fuchsia_base_test
from mobly import test_runner
from trace_processing import trace_importing, trace_model
# TODO(b/396700496) Make this support parallel test-runs
# (or verify that it already does)
_LOGGER = logging.getLogger(__name__)
# This is a Lacewing-style test that wraps a hermetic device test,
# snoops its FIDL activity, and writes that activity to files in
# the test output directory.
class FidlReaderTest(fuchsia_base_test.FuchsiaBaseTest):
def setup_class(self) -> None:
"""Initialize all DUT(s)"""
super().setup_class()
self.device = self.fuchsia_devices[0]
def test_integration_testcase(self) -> None:
data_writer = JsonWriter(self.log_path)
device_test_url = self.user_params.get(
"device_test_url", "URL Not Found"
)
host_output_path = self.test_case_path
test_realm_name = self.user_params.get(
"device_test_realm", "/core/testing/ctf-tests"
)
min_severity = self.user_params.get("min_severity", None)
max_severity = self.user_params.get("max_severity", None)
progress = ProgressRecorder(
writer=data_writer,
url=device_test_url,
realm=test_realm_name,
log_severity=(min_severity, max_severity),
)
with self.device.tracing.trace_session(
categories=[
"kernel:meta",
"kernel:ipc",
"component:start",
],
buffer_size=72,
download=True,
directory=host_output_path,
trace_file="trace.fxt",
):
progress.record_milestone("started tracing")
ffx_test_args = [
"--realm",
test_realm_name,
]
if min_severity:
ffx_test_args += ["--min-severity-logs", min_severity]
if max_severity:
ffx_test_args += ["--max-severity-logs", max_severity]
self.device.ffx.run_test_component(
device_test_url,
# TODO(b/396759445): Audit these capabilities per-test.
capture_output=False,
ffx_test_args=ffx_test_args,
)
progress.record_milestone("ran the test")
progress.record_milestone("finished tracing")
# $ ffx trace symbolize --fxt trace.fxt --outfile trace.fxt
infile = os.path.join(host_output_path, "trace.fxt")
outfile = os.path.join(host_output_path, "trace.fxt")
self.device.ffx.run(
["trace", "symbolize", "--fxt", infile, "--outfile", outfile]
)
progress.record_milestone("symbolized the trace")
json_trace_file: str = trace_importing.convert_trace_file_to_json(
os.path.join(host_output_path, "trace.fxt")
)
progress.record_milestone("converted the trace")
model: trace_model.Model = trace_importing.create_model_from_file_path(
json_trace_file
)
progress.record_milestone("created the model")
scanned_model = EventScanner(model, device_test_url)
progress.record_milestone("scanned the model")
data_writer.write(
"intra_calls.freeform.json", scanned_model.intra_calls()
)
data_writer.write(
"incoming_calls.freeform.json", scanned_model.incoming_calls()
)
data_writer.write(
"outgoing_calls.freeform.json", scanned_model.outgoing_calls()
)
progress.record_milestone("wrote the outputs")
class JsonWriter:
def __init__(self, log_path: str):
self.log_path = log_path
def write(self, fname: str, data: Any) -> None:
out_path = os.path.join(self.log_path, fname)
with open(out_path, "w") as f:
json.dump(data, f, indent=4, default=vars)
# Data-holder classes are used for outputting a clean JSON schema.
# Data-holder class for FIDL client or server processes,
# inside or outside the hermetic test.
class FidlEnd:
def __init__(self) -> None:
self.name = ""
self.is_test = False
# Process outside the hermetic test. We may not know the URL or moniker.
class FidlExtern(FidlEnd):
def __init__(
self, pid: int, url: str | None = None, moniker: str | None = None
) -> None:
super().__init__()
self.pid = pid
if url != None:
self.url = url
if moniker != None:
self.moniker = moniker
# Process inside the hermetic test.
class FidlTest(FidlEnd):
def __init__(self, pid: int, url: str, moniker: str) -> None:
super().__init__()
self.pid = pid
self.url = url
self.moniker = moniker
self.is_test = True
# Data holder class containing client and server processes, and timestamps of calls
# from client to server. Two-way FIDL calls will show up as both client->server
# and server->client.
class FidlChannel:
def __init__(self, sender: FidlEnd, receiver: FidlEnd) -> None:
self.sender = sender
self.receiver = receiver
def add_timestamp(self, timestamp: int) -> None:
if not hasattr(self, "timestamps"):
self.timestamps = []
self.timestamps.append(timestamp)
# Data holder class for outputting progress data. Used by ProgressRecorder.
class ProgressData:
def __init__(
self, url: str, realm: str, log_severity: tuple[str | None, str | None]
):
self.log_severity = log_severity
self.realm = realm
self.url = url
self.timestamps: List[tuple[str, float]] = []
# Progress recorder for updating the progress output file.
class ProgressRecorder:
def __init__(
self,
writer: JsonWriter,
url: str,
realm: str,
log_severity: tuple[str | None, str | None],
):
self.writer = writer
self.progress = ProgressData(
url=url, realm=realm, log_severity=log_severity
)
self.write_file()
self.last_timestamp = monotonic()
def record_milestone(self, milestone_name: str) -> None:
now = monotonic()
milestone = (milestone_name, now - self.last_timestamp)
_LOGGER.info(f"Recording milestone {milestone}")
self.progress.timestamps.append(milestone)
self.write_file()
self.last_timestamp = now
def write_file(self) -> None:
self.writer.write("fidl_progress.freeform.json", self.progress)
# This class stores transaction data for a single FIDL method.
# It's intended to be a dictionary value, so it doesn't store the name.
class FidlMethod:
def __init__(self) -> None:
# Channel info for each (sender, receiver) pair.
self.channels: dict[tuple[FidlEnd, FidlEnd], FidlChannel] = {}
def add_call(
self, sender: FidlEnd, receiver: FidlEnd, timestamp: int | None = None
) -> None:
endpoints_key = (sender, receiver)
if endpoints_key not in self.channels:
self.channels[endpoints_key] = FidlChannel(sender, receiver)
if timestamp:
self.channels[endpoints_key].add_timestamp(timestamp)
def channels_for_scope(
self, sender_is_test: bool, receiver_is_test: bool
) -> list[FidlChannel]:
result = []
for sender, receiver in self.channels:
if (
sender
and sender.is_test == sender_is_test
and receiver
and receiver.is_test == receiver_is_test
):
endpoints_key = (sender, receiver)
result.append(self.channels[endpoints_key])
return result
# This class extracts and stores data on FIDL transactions from Duration events.
class FidlTransactions:
def __init__(self, pid_lookup: dict[int, FidlEnd]):
# Translates from pid to FidlEnd.
self.pid_lookup = pid_lookup
# Stores info about each call.
self.call_info: dict[str, FidlMethod] = {}
# Every FIDL transaction creates two durations, one on the receiving end,
# one on the sending end. Both durations point to both flow events.
# For transactions between two test components, we'll
# see both of those, and we only want to record one. So, if we get a duration
# where we've seen the sender event, we should ignore it.
self.seen_sender_events: set[trace_model.FlowEvent] = set()
# Some durations have empty child flows. Summarize rather than spam the log.
self.empty_child_flows: int = 0
def log_anomalies(self) -> None:
_LOGGER.info(
f"Found {self.empty_child_flows} empty duration.child_flows"
)
def process_duration(self, duration: trace_model.DurationEvent) -> None:
if len(duration.child_flows) == 0:
self.empty_child_flows += 1
return
if len(duration.child_flows) != 1:
_LOGGER.warning(
f"Unexpected event, wrong child_flows: {duration.child_flows}"
)
return
flow = duration.child_flows[0]
if "ordinal" not in duration.args:
# We see a few of these in the trace. Ignore them.
return
if "method" not in duration.args:
method_name = duration.args["ordinal"]
else:
method_name = duration.args["method"]
(sender, receiver) = self.extract_flows(flow)
if not sender or not receiver:
return
if sender in self.seen_sender_events:
# We saw the other half of this already.
return
self.seen_sender_events.add(sender)
if method_name not in self.call_info:
self.call_info[method_name] = FidlMethod()
self.call_info[method_name].add_call(
self.pid_lookup[sender.pid], self.pid_lookup[receiver.pid]
)
# Extract the sender and receiver of the message.
def extract_flows(
self, flow: trace_model.FlowEvent
) -> tuple[trace_model.FlowEvent | None, trace_model.FlowEvent | None]:
# We expect each flow to be either the first or second of a pair. If
# it's part of >2 flows, swallow the error and return (None, None).
if flow.next_flow:
if flow.previous_flow or flow.next_flow.next_flow:
return None, None
return flow, flow.next_flow
if flow.previous_flow:
if flow.previous_flow.previous_flow:
return None, None
return flow.previous_flow, flow
# We typically see a couple of durations without flows. Probably Archivist was
# in the middle of something when the tracing stopped, and the message
# didn't get fully recorded.
return None, None
def fidl_calls(
self, sender_is_test: bool, receiver_is_test: bool
) -> dict[str, list[FidlChannel]]:
output: dict[str, list[FidlChannel]] = {}
for method_name in self.call_info:
channels = self.call_info[method_name].channels_for_scope(
sender_is_test, receiver_is_test
)
if channels:
output[method_name] = channels
return output
# Scans a trace model and gathers info on FIDL calls. intra_calls(), incoming_calls(),
# and outgoing_calls() will return dict[message_string, FidlChannel]
class EventScanner:
def __init__(self, model: trace_model.Model, test_url: str):
self.model = model
self.test_url = test_url
self.start_events = [
event
for event in self.model.all_events()
if event.category == "component:start"
]
self.realm_prefixes = self.find_test_realm_prefixes()
self.fill_pid_lookup()
self.transactions = FidlTransactions(self.pid_lookup)
self.scan_for_fidl()
def find_test_realm_prefixes(self) -> List[str]:
test_runner_events = [
event for event in self.start_events if "-test-" in event.name
]
if not test_runner_events:
_LOGGER.error(
f"Expected at least one test-runner event for {self.test_url}, but found none"
)
return []
test_monikers = [event.args["moniker"] for event in test_runner_events]
# The part of the moniker before "test_wrapper/test" will be a prefix on all tests
# started by this runner.
realm_prefixes = [
test_moniker.split("test_wrapper/test")[0]
for test_moniker in test_monikers
]
for prefix in realm_prefixes:
_LOGGER.info(
f"Found realm prefix {prefix} for test URL {self.test_url}"
)
return realm_prefixes
def test_realm_prefix_for(self, moniker: str) -> str | None:
for prefix in self.realm_prefixes:
if moniker.startswith(prefix):
return prefix
return None
def fill_pid_lookup(self) -> None:
pid_lookup: dict[int, FidlEnd] = {}
for event in self.start_events:
moniker = event.args["moniker"]
url = event.args["url"]
pid = event.args["pid"]
realm_prefix = self.test_realm_prefix_for(moniker)
if realm_prefix:
# It's inside the hermetic test realm
pid_lookup[pid] = FidlTest(
pid=pid, url=url, moniker=moniker[len(realm_prefix) :]
)
else:
# We got a start event so we know its strings, but it's extern to the hermetic test.
pid_lookup[pid] = FidlExtern(pid=pid, url=url, moniker=moniker)
for process in self.model.processes:
# Fill in the pid_lookup dict with at least the process name.
if process.pid not in pid_lookup:
pid_lookup[process.pid] = FidlExtern(pid=process.pid)
pid_lookup[process.pid].name = process.name
self.pid_lookup = pid_lookup
def scan_for_fidl(self) -> Any:
self.calls: set[tuple[str, FidlEnd | None, FidlEnd | None]] = set()
for process in self.model.processes:
if process.pid not in self.pid_lookup:
_LOGGER.warning(f"Pid not found in lookup dict: {process.pid}")
continue
if not self.pid_lookup[process.pid].is_test:
# For messages between inside and outside the test, is_test will be true for at least one side.
# We want to ignore messages where both sides are outside the test.
# If the other side is inside, we'll see its Duration events.
continue
for thread in process.threads:
for event in thread.events:
if event.category == "kernel:ipc" and isinstance(
event, trace_model.DurationEvent
):
self.transactions.process_duration(event)
self.transactions.log_anomalies()
def intra_calls(self) -> dict[str, list[FidlChannel]]:
return self.transactions.fidl_calls(True, True)
def incoming_calls(self) -> dict[str, list[FidlChannel]]:
return self.transactions.fidl_calls(False, True)
def outgoing_calls(self) -> dict[str, list[FidlChannel]]:
return self.transactions.fidl_calls(True, False)
if __name__ == "__main__":
test_runner.main()