blob: 11eff539cbf8e2ef710820f02ece812d54018089 [file] [log] [blame]
# Copyright 2025 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This tool is to be invoked by the CTF trybot. It is intended for
# comparing the FIDL methods that were exercised in CTF tests
# to the FIDL methods that exist in the SDK.
import argparse
import json
import os
import sys
from dataclasses import dataclass, fields
from io import TextIOWrapper
from pathlib import Path
from typing import Any, Dict, List
# TODO(b/379766709) (comment 21): Rewrite this in Go.
# The dataclasses form the output schemas of the script.
#
# The "expanded" schema has full info about endpoints on every call,
# and its schema is a list of `ExpandedMessage`.
#
# The "condensed" schema stores a dict of interned endpoints that calls refer into,
# and its schema is `CondensedSchema`.
#
# The "summary" schema is a dict from "fuchsia.library/Protocol.Message" to a dict with
# keys: "incoming", "outgoing", "intra"; values: int number of calls, and "unstable": bool.
#
# The "flat" schema is an array of dicts with keys "incoming", "outgoing", "intra" (int),
# "message" with value "fuchsia.library/Protocol.Message", and "unstable": bool. Same
# information as "summary" but it's all in the values, which is easier for SQL to unpack.
@dataclass(frozen=True)
class Endpoint:
name: str
url: str = ""
moniker: str = ""
@dataclass
class CondensedCall:
sender: int
receiver: int
direction: str
@dataclass
class CondensedMessage:
name: str
ordinal: str
unstable: bool
calls: List[CondensedCall]
@dataclass
class CondensedSchema:
endpoints: Dict[int, Endpoint]
messages: List[CondensedMessage]
@dataclass
class ExpandedCall:
sender: Endpoint
receiver: Endpoint
direction: str
@dataclass
class ExpandedMessage:
name: str
ordinal: str
unstable: bool
calls: List[ExpandedCall]
@dataclass
class CallsSummary:
unstable: bool
incoming: int
outgoing: int
intra: int
@dataclass
class CallsFlat:
message: str
unstable: bool
incoming: int
outgoing: int
intra: int
# Maintain an interned set of `Endpoint`s and return a different key for
# each unique `Endpoint`.
class Endpoints:
def __init__(self) -> None:
# This will be written to the condensed JSON file.
self.endpoints_by_key: Dict[int, Endpoint] = {}
# This interns endpoints we've seen before.
self.keys_by_endpoint: Dict[Endpoint, int] = {}
self.next_key = 101
def get_key(self, endpoint: Endpoint) -> int:
if endpoint in self.keys_by_endpoint:
return self.keys_by_endpoint[endpoint]
key = self.next_key
self.next_key += 1
self.keys_by_endpoint[endpoint] = key
self.endpoints_by_key[key] = endpoint
return key
def data_to_write(self) -> Dict[int, Endpoint]:
return self.endpoints_by_key
# The one-and-only intern-er for `Endpoint`s
ENDPOINTS = Endpoints()
# The endpoint info from the snoop files has fields we don't care about.
# ENDPOINT_FIELDS helps `cleanup_endpoint()` scrub the unwanted fields.
ENDPOINT_FIELDS = [f.name for f in fields(Endpoint)]
def cleanup_endpoint(endpoint: Dict[str, Any]) -> Dict[str, Any]:
for key in list(endpoint.keys()):
if key not in ENDPOINT_FIELDS:
del endpoint[key]
return endpoint
# Stores a single FIDL message. `name` is the name of the message, as
# `fuchsia.library/Protocol.Message` - unless the name is not in the CTF ordinal<>message
# map, in which case `name` is the same as `ordinal`. Accumulates both "expanded" and "condensed"
# versions of the call info for its message, and can supply output-schema classes for either.
class Message:
def __init__(self, name: str, ordinal: str, unstable: bool) -> None:
self.name = name
self.ordinal = ordinal
self.unstable = unstable
self.condensed_calls: List[CondensedCall] = []
self.expanded_calls: List[ExpandedCall] = []
def add_calls(
self, calls: List[Dict[str, Any]], test_path: Path, direction: str
) -> None:
for call in calls:
sender = Endpoint(**cleanup_endpoint(call["sender"]))
receiver = Endpoint(**cleanup_endpoint(call["receiver"]))
self.expanded_calls.append(
ExpandedCall(
sender=sender,
receiver=call["receiver"],
direction=direction,
)
)
self.condensed_calls.append(
CondensedCall(
sender=ENDPOINTS.get_key(sender),
receiver=ENDPOINTS.get_key(receiver),
direction=direction,
)
)
def call_data(self) -> Dict[str, Any]:
data: Dict[str, Any] = {
"incoming": 0,
"outgoing": 0,
"intra": 0,
"unstable": self.unstable,
}
for call in self.condensed_calls:
data[call.direction] += 1
return data
def summary(self) -> CallsSummary:
data = self.call_data()
return CallsSummary(**data)
def flat(self) -> CallsFlat:
data: Dict[str, Any] = self.call_data()
data["message"] = self.name
return CallsFlat(**data)
def condensed_version(self) -> CondensedMessage:
return CondensedMessage(
name=self.name,
ordinal=self.ordinal,
unstable=self.unstable,
calls=self.condensed_calls,
)
def expanded_version(self) -> ExpandedMessage:
return ExpandedMessage(
name=self.name,
ordinal=self.ordinal,
unstable=self.unstable,
calls=self.expanded_calls,
)
# Tells json how to write classes. Use by passing `default=vars_or_obj` to json.dump().
def vars_or_obj(obj: Any) -> Any:
"""Extracts fields from classes, or passes through built-in data types."""
try:
return vars(obj)
except:
return obj
# Main data structure of the program. Gathers data from all input files and outputs it
# to JSON in the correct schema - a list of Message with an item for every entry in the
# SDK API name<>ordinal map file (whether or not it has calls) and for every event in
# every scanned FIDL-snoop file (whether or not it's in the API).
class Messages:
def __init__(self, message_file: TextIOWrapper) -> None:
self.setup_data_structures(message_file)
def setup_data_structures(self, message_file: TextIOWrapper) -> None:
self.raw_methods = json.load(message_file)
self.name_lookup: Dict[str, Message] = {}
self.ordinal_lookup: Dict[str, Message] = {}
# Schema is [{name: "fuchsia.library",
# methods: [{name: "fuchsia.library/Protocol.Message", ordinal: 1234}, ...]}, ...]
for entry in self.raw_methods:
for method in entry["methods"]:
message = Message(
name=method["name"],
ordinal=method["ordinal"],
unstable=method.get("unstable", False),
)
self.name_lookup[method["name"]] = message
self.ordinal_lookup[method["ordinal"]] = message
def process_calls(self, path: Path, direction: str) -> None:
with open(path) as f:
calls = json.load(f)
# Schema is { message: [call, call...], message: [call, call...]...}
# message is either an ordinal (as a string) or "fuchsia.library/Protocol/Message"
# call is a dict: {"sender": process, "receiver": process}
# process is a dict with keys: name, is_test, pid, url, moniker
for message, call_list in calls.items():
if message in self.ordinal_lookup:
self.ordinal_lookup[message].add_calls(
calls=call_list, test_path=path, direction=direction
)
else:
if message not in self.name_lookup:
self.name_lookup[message] = Message(
name=message, ordinal=message, unstable=False
)
self.name_lookup[message].add_calls(
calls=call_list, test_path=path, direction=direction
)
def data_to_write(self) -> list[Message]:
return list(self.name_lookup.values())
def write_expanded_to_json(self, path: Path) -> None:
"""Writes the desired data to the path in JSON format."""
with open(path, "w") as f:
json.dump(
[
message.expanded_version()
for message in self.data_to_write()
],
f,
default=vars_or_obj,
indent=2,
)
def write_condensed_to_json(self, path: Path) -> None:
data = CondensedSchema(
endpoints=ENDPOINTS.data_to_write(),
messages=[
message.condensed_version() for message in self.data_to_write()
],
)
with open(path, "w") as f:
json.dump(data, f, default=vars_or_obj, indent=2)
def write_summary_to_json(self, path: Path) -> None:
data = dict(
[
(message.name, message.summary())
for message in self.data_to_write()
]
)
with open(path, "w") as f:
json.dump(data, f, default=vars_or_obj, indent=2)
def write_flat_to_json(self, path: Path) -> None:
data = [message.flat() for message in self.data_to_write()]
with open(path, "w") as f:
json.dump(data, f, default=vars_or_obj, indent=1)
# The core of this script. Scans for FIDL-snoop files, accumulates calls from them, and writes
# the accumulated information to specified files in the correct schema.
def check_coverage(args: argparse.Namespace) -> None:
api_path = Path(args.api_file)
with open(api_path) as f:
messages = Messages(f)
for root_dir in args.results_dir:
for dir_path, _, file_names in os.walk(root_dir):
for name in file_names:
if name.endswith("intra_calls.freeform.json"):
messages.process_calls(Path(dir_path) / name, "intra")
elif name.endswith("incoming_calls.freeform.json"):
messages.process_calls(Path(dir_path) / name, "incoming")
elif name.endswith("outgoing_calls.freeform.json"):
messages.process_calls(Path(dir_path) / name, "outgoing")
if args.json_output:
messages.write_flat_to_json(Path(args.json_output))
if args.expanded_json_output:
messages.write_expanded_to_json(Path(args.expanded_json_output))
if args.condensed_json_output:
messages.write_condensed_to_json(Path(args.condensed_json_output))
if args.summary_json_output:
messages.write_summary_to_json(Path(args.summary_json_output))
def main(argv: List[str]) -> None:
"""This script takes an API file summarizing SDK FIDL methods (name <-> ordinal) and one or
more directories containing test outputs, which should include FIDL-snoop files named
`intra_calls.freeform.json`. It can write either of two schemas. (For soft-migration reasons,
`--json_output` is the same as `--expanded_json_output`.)"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)
subparser = subparsers.add_parser(
"check_coverage",
help="Summarize FIDL coverage from an API file and test result directories.",
)
subparser.add_argument(
"--api_file",
required=True,
help="File summarizing FIDL API, produced by summarize_fidl_methods.py.",
)
subparser.add_argument(
"--json_output",
help="Path to write a flat (array of struct) FIDL coverage summary to.",
)
subparser.add_argument(
"--expanded_json_output",
help="Path to write expanded FIDL coverage summary to. Deprecated; use condensed.",
)
subparser.add_argument(
"--condensed_json_output",
help="Path to write condensed FIDL coverage summary to, with full call info.",
)
subparser.add_argument(
"--summary_json_output",
help="Path to write a brief (dict by message name) FIDL coverage summary to.",
)
subparser.add_argument(
"--results_dir",
nargs="+",
help="Root directories of test outputs",
required=True,
)
subparser.set_defaults(func=check_coverage)
args = parser.parse_args(argv)
args.func(args)
if __name__ == "__main__":
main(sys.argv[1:])