| # Copyright 2023 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Utilities to filter and extract events and statistics from a trace Model.""" |
| |
| import math |
| import statistics |
| from typing import ( |
| Any, |
| Generator, |
| Iterable, |
| Iterator, |
| List, |
| Optional, |
| Set, |
| Tuple, |
| TypeVar, |
| ) |
| |
| from trace_processing import trace_metrics, trace_model, trace_time |
| |
| |
| # Compute the linear interpolated [percentile]th percentile |
| # (https://en.wikipedia.org/wiki/Percentile) of [values]. |
| def percentile(values: Iterable[int | float], percentile: int) -> float: |
| if not values: |
| raise TypeError( |
| "[values] must not be empty in order to compute percentile" |
| ) |
| |
| values_list: List[int | float] = sorted(values) |
| if percentile == 100: |
| return float(values_list[-1]) |
| |
| index_as_float: float = float(len(values_list) - 1) * (0.01 * percentile) |
| index: int = math.floor(index_as_float) |
| if (index + 1) == len(values_list): |
| return float(values_list[-1]) |
| index_fraction: float = index_as_float % 1 |
| return ( |
| values_list[index] * (1.0 - index_fraction) |
| + values_list[index + 1] * index_fraction |
| ) |
| |
| |
| T = TypeVar("T", bound=trace_model.Event) |
| |
| |
| def filter_events( |
| events: Iterable[trace_model.Event], |
| type: type[T], |
| category: Optional[str] = None, |
| name: Optional[str] = None, |
| ) -> Generator[T, None, None]: |
| """Filter |events| based on category, name, or type. |
| |
| Args: |
| events: The set of events to filter. |
| category: Category of events to include, or None to skip this filter. |
| name: name of events to include, or None to skip this filter. |
| type: Type of events to include. By default object to include all events. |
| |
| Returns: |
| An [Iterator] of filtered events. Note that Iterators can only be iterated a single time, so |
| the caller must create a local copy in order to loop over the filtered events more than once. |
| """ |
| |
| for event in events: |
| category_matches: bool = category is None or event.category == category |
| name_matches: bool = name is None or event.name == name |
| if isinstance(event, type) and category_matches and name_matches: |
| yield event |
| |
| |
| U = TypeVar("U", bound=trace_model.SchedulingRecord) |
| |
| |
| def filter_records( |
| records: Iterable[trace_model.SchedulingRecord], type_: type[U] |
| ) -> Iterator[U]: |
| """Filters SchedulingRecords by type. |
| |
| Easier for mypy to grok than using types with filter(). |
| |
| Args: |
| records: Iterable of SchedulingRecords to be filtered. |
| type_: The subclass of SchedulingRecord by which to filter. |
| |
| Yields: |
| The elements of the iterable that are of the specified type. |
| """ |
| for record in records: |
| if isinstance(record, type_): |
| yield record |
| |
| |
| def total_event_duration( |
| events: Iterable[trace_model.Event], |
| ) -> trace_time.TimeDelta: |
| """Compute the total duration of all [Event]s in |events|. This is the end |
| of the last event minus the beginning of the first event. |
| |
| Args: |
| events: The set of events to compute the duration for. |
| |
| Returns: |
| Total event duration. |
| """ |
| |
| def event_times( |
| e: trace_model.Event, |
| ) -> Tuple[trace_time.TimePoint, trace_time.TimePoint]: |
| end: trace_time.TimePoint = e.start |
| if ( |
| isinstance(e, trace_model.AsyncEvent | trace_model.DurationEvent) |
| and e.duration |
| ): |
| end = e.start + e.duration |
| return (e.start, end) |
| |
| start_times, end_times = zip(*map(event_times, events)) |
| min_time: trace_time.TimePoint = min( |
| start_times, default=trace_time.TimePoint.zero() |
| ) |
| max_time: trace_time.TimePoint = max( |
| end_times, default=trace_time.TimePoint.zero() |
| ) |
| |
| return max_time - min_time |
| |
| |
| def get_arg_values_from_events( |
| events: Iterable[trace_model.Event], |
| arg_key: str, |
| arg_types: type | Tuple[type, ...] = object, |
| ) -> Iterable[Any]: |
| """Collect values from the |args| maps in |events|. |
| |
| Args: |
| events: The events to collect args from. |
| arg_key: The key in the |args| maps to collect values from. |
| arg_type: |
| |
| Raises: |
| KeyError: The value corresponding to |arg_key| was not present in one of |
| the event's |args| map. |
| |
| Returns: |
| An [Iterable] of collected values. |
| """ |
| |
| def event_to_arg_type(event: trace_model.Event) -> Any: |
| has_arg: bool = arg_key in event.args |
| arg: Any = event.args.get(arg_key, None) |
| if not has_arg or not isinstance(arg, arg_types): |
| raise KeyError( |
| f"Error, expected events to include arg with key '{arg_key}' " |
| f"of type(s) {str(arg_types)}" |
| ) |
| return arg # Cannot be None if we reach here |
| |
| return map(event_to_arg_type, events) |
| |
| |
| def get_following_flow_events( |
| event: trace_model.Event, |
| ) -> Iterable[trace_model.Event]: |
| """Find all Events that are flow connected and follow |event|. |
| |
| Args: |
| event: The starting event. |
| |
| Returns: |
| An [Iterable] of flow connected events. |
| """ |
| frontier: List[trace_model.Event] = [event] |
| visited: Set[trace_model.Event] = set() |
| |
| def set_add( |
| event_set: Set[trace_model.Event], event: trace_model.Event |
| ) -> bool: |
| length_before = len(event_set) |
| event_set.add(event) |
| return len(event_set) != length_before |
| |
| while frontier: |
| current: trace_model.Event = frontier.pop() |
| added = set_add(visited, current) |
| if not added: |
| continue |
| if isinstance(current, trace_model.DurationEvent): |
| frontier.extend(current.child_durations) |
| frontier.extend(current.child_flows) |
| elif isinstance(current, trace_model.FlowEvent): |
| if current.enclosing_duration: |
| frontier.append(current.enclosing_duration) |
| if current.next_flow: |
| frontier.append(current.next_flow) |
| |
| def by_start_time(event: trace_model.Event) -> trace_time.TimePoint: |
| return event.start |
| |
| for connected_event in sorted(visited, key=by_start_time): |
| yield connected_event |
| |
| |
| def get_nearest_following_flow_event( |
| event: trace_model.Event, |
| following_event_category: str, |
| following_event_name: str, |
| ) -> trace_model.Event | None: |
| """Find the nearest target event that is flow connected and follow |event|. |
| |
| Args: |
| event: The starting event. |
| following_event_category: Trace category of the target event. |
| following_event_name: Trace event name of the target event. |
| |
| Returns: |
| Flow connected events. If nothing is found, None. |
| """ |
| filtered_following_events = filter_events( |
| get_following_flow_events(event), |
| category=following_event_category, |
| name=following_event_name, |
| type=trace_model.DurationEvent, |
| ) |
| return next(iter(filtered_following_events), None) |
| |
| |
| # This method looks for a possible race between trace collection start in multiple processes. |
| # |
| # This problem usually occurs between Scenic, Magma and Display processes. The flow connection |
| # between these processes happens through koids of objects passed to Vulkan, which may be reused. |
| # When there is a gap in process trace collection starts, we may see the initial vsync event(s) |
| # connecting to multiple flows. We can safely skip these initial events. See |
| # https://fxbug.dev/322849857 for more detailed examples with screenshots. |
| # |
| # This method finds the first event flow the test interested in, finds processes in the flow, and |
| # cuts tracing when the latest process tracing started. |
| # |
| # This method used to check processes with given process and thread name. That did not work because |
| # different product may have different process setup, for example Display process with different |
| # names. |
| def adjust_to_common_process_start( |
| model: trace_model.Model, |
| name: str, |
| type: type[T], |
| category: Optional[str] = None, |
| ) -> trace_model.Model: |
| """Adjust model to a consistent start time tracking the latest first event recorded from a |
| list of processes. The list of processes are selected through matching event flow. |
| |
| Args: |
| model: Trace model. |
| name: name of event, or None to skip this filter. |
| category: Category of event, or None to skip this filter. |
| type: Type of event. By default object to include all events. |
| |
| Returns: |
| Model adjusted to a start time tracking the latest first event recorded from processes from |
| the given event flow. |
| |
| Raises: |
| KeyError if no matched event is found. |
| """ |
| |
| filtered = filter_events( |
| model.all_events(), category=category, name=name, type=type |
| ) |
| # Get the first matched event. |
| begin_event = next(filtered, None) |
| |
| if begin_event is None: |
| raise KeyError(f"Error, expected event with name '{name}'") |
| |
| events_in_flow: Iterable[trace_model.Event] = get_following_flow_events( |
| begin_event |
| ) |
| |
| process_ids = set() |
| process_ids.add(begin_event.pid) |
| for event in events_in_flow: |
| process_ids.add(event.pid) |
| |
| process_matches = [p for p in model.processes if p.pid in process_ids] |
| consistent_start_time = max( |
| min( |
| ( |
| event.start |
| for thread in process.threads |
| for event in thread.events |
| ), |
| default=trace_time.TimePoint(), |
| ) |
| for process in process_matches |
| ) |
| return model.slice(start=consistent_start_time) |
| |
| |
| def standard_metrics_set( |
| values: List[int | float], |
| label_prefix: str, |
| unit: trace_metrics.Unit, |
| percentiles: tuple[int, int, int, int, int] = (5, 25, 50, 75, 95), |
| durations: List[float] | None = None, |
| doc_prefix: str | None = None, |
| ) -> list[trace_metrics.TestCaseResult]: |
| """Generates min, max, average and percentiles metrics for the given values. |
| |
| Args: |
| values: Input to create the metrics from. |
| label_prefix: metric labels will be '{label_prefix}Min', '{label_prefix}Max', |
| '{label_prefix}Average' and '{label_prefix}P*' (percentiles). |
| unit: The metrics unit. |
| percentiles: Percentiles to output. |
| durations: Optional. If passed in, the average percentage is calculated |
| proportional to the duration of each Event's CPU usage, instead of |
| just taking the average of all the values. The latter assumes that |
| each value had the same duration (e.g. 1 s). For example: |
| cpu_percentages: [30, 2] |
| duration: [1, 5] |
| The average would be around (30*1+2*5)/(1+5) = 6.7% |
| instead of statistics.mean([30, 1]) = 15.5% |
| doc_prefix: a descriptive string with which to document this set of metrics. |
| Defaults to label_prefix. |
| |
| Returns: |
| A list of TestCaseResults representing each of the generated metrics. |
| """ |
| doc_prefix = doc_prefix if doc_prefix else label_prefix |
| |
| results = [ |
| trace_metrics.TestCaseResult( |
| f"{label_prefix}P{p}", |
| unit, |
| [percentile(values, p)], |
| f"{doc_prefix}, {p}th percentile", |
| ) |
| for p in percentiles |
| ] |
| |
| results += [ |
| trace_metrics.TestCaseResult( |
| f"{label_prefix}Min", |
| unit, |
| [min(values)], |
| f"{doc_prefix}, minimum", |
| ), |
| trace_metrics.TestCaseResult( |
| f"{label_prefix}Max", |
| unit, |
| [max(values)], |
| f"{doc_prefix}, maximum", |
| ), |
| ] |
| |
| average: float = 0 |
| if durations is not None: |
| proportional_percentages: list[float] = [ |
| duration * values[i] for i, duration in enumerate(durations) |
| ] |
| # The total average CPU percentage is: |
| # (sum of the average_percentages) / (total trace duration) |
| average_percentage = sum(proportional_percentages) / sum(durations) |
| average = average_percentage |
| else: |
| average = statistics.mean(values) |
| results += [ |
| trace_metrics.TestCaseResult( |
| f"{label_prefix}Average", |
| unit, |
| [average], |
| f"{doc_prefix}, mean", |
| ), |
| ] |
| |
| return results |