blob: 0048d2505bf3a88ebce8381569489d27dd73c31b [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Trace model data structures."""
import copy
import enum
from functools import total_ordering
from typing import Any, Iterator, Optional, Self, TypeVar
from trace_processing import trace_time
INT32_MIN = -0x80000000
class InstantEventScope(enum.Enum):
"""Scope within a trace that an `InstantEvent` applies to."""
THREAD = enum.auto()
PROCESS = enum.auto()
GLOBAL = enum.auto()
class FlowEventPhase(enum.Enum):
"""Phase within the control flow lifecycle that a `FlowEvent` applies to."""
START = enum.auto()
STEP = enum.auto()
END = enum.auto()
class Event:
"""Base class for all trace events in a trace model. Contains fields that
are common to all trace event types.
"""
def __init__(
self,
category: str,
name: str,
start: trace_time.TimePoint,
pid: int,
tid: int,
args: dict[str, Any],
) -> None:
self.category: str = category
self.name: str = name
self.start: trace_time.TimePoint = start
self.pid: int = pid
self.tid: int = tid
# Any extra arguments that the event contains.
self.args: dict[str, Any] = args.copy()
@staticmethod
# from_dict should not be called on an instance
def from_dict(event_dict: dict[str, Any]) -> "Event":
category: str = event_dict["cat"]
name: str = event_dict["name"]
start: trace_time.TimePoint = trace_time.TimePoint.from_epoch_delta(
trace_time.TimeDelta.from_microseconds(event_dict["ts"])
)
pid: int = event_dict["pid"]
tid: int = event_dict["tid"]
args: dict[str, Any] = event_dict.get("args", {})
return Event(category, name, start, pid, tid, args)
class InstantEvent(Event):
"""An event that corresponds to a single moment in time."""
INSTANT_EVENT_SCOPE_MAP: dict[str, InstantEventScope] = {
"g": InstantEventScope.GLOBAL,
"p": InstantEventScope.PROCESS,
"t": InstantEventScope.THREAD,
}
def __init__(self, scope: InstantEventScope, base: Event) -> None:
super().__init__(
base.category, base.name, base.start, base.pid, base.tid, base.args
)
self.scope: InstantEventScope = scope
@staticmethod
def from_dict(event_dict: dict[str, Any]) -> "InstantEvent":
scope_key: str = "s"
if scope_key not in event_dict:
raise TypeError(
f"Expected dictionary to have a field '{scope_key}' of type "
f"str: {event_dict}"
)
if not isinstance(event_dict[scope_key], str):
raise TypeError(
f"Expected dictionary to have a field '{scope_key}' of type "
f"str: {event_dict}"
)
scope_str: str = event_dict[scope_key]
if scope_str not in InstantEvent.INSTANT_EVENT_SCOPE_MAP:
raise TypeError(
f"Expected '{scope_key}' (scope field) of dict to be one of "
f"{list(InstantEvent.INSTANT_EVENT_SCOPE_MAP)}: {event_dict}"
)
scope: InstantEventScope = InstantEvent.INSTANT_EVENT_SCOPE_MAP[
scope_str
]
return InstantEvent(scope, base=Event.from_dict(event_dict))
class CounterEvent(Event):
"""An event that tracks the count of some quantity."""
def __init__(self, id: Optional[int], base: Event) -> None:
super().__init__(
base.category, base.name, base.start, base.pid, base.tid, base.args
)
self.id: Optional[int] = id
@staticmethod
def from_dict(event_dict: dict[str, Any]) -> "CounterEvent":
id_key: str = "id"
id: Optional[int] = None
if id_key in event_dict:
try:
id = int(event_dict[id_key], 0)
except (TypeError, ValueError) as t:
raise TypeError(
f"Expected '{id_key}' field to be an int or a string that "
f"parses as int: {event_dict}"
) from t
return CounterEvent(id, base=Event.from_dict(event_dict))
class DurationEvent(Event):
"""An event which describes work that is happening synchronously on one
thread.
In the Fuchsia trace model, matching begin/end duration in the raw Chrome
trace format are merged into a single `DurationEvent`. Chrome complete
events become `DurationEvent`s as well. Dangling Chrome begin/end events
(i.e. they don't have a matching end/begin event) are dropped.
"""
duration: Optional[trace_time.TimeDelta]
parent: Optional[Self]
child_durations: list[Self]
child_flows: list["FlowEvent"]
def __init__(
self,
duration: Optional[trace_time.TimeDelta],
parent: Optional[Self],
child_durations: list[Self],
child_flows: list["FlowEvent"],
base: Event,
) -> None:
super().__init__(
base.category, base.name, base.start, base.pid, base.tid, base.args
)
self.duration = duration
self.parent = parent
self.child_durations = child_durations
self.child_flows = child_flows
@staticmethod
def from_dict(event_dict: dict[str, Any]) -> "DurationEvent":
duration_key: str = "dur"
duration: Optional[trace_time.TimeDelta] = None
microseconds: Optional[float | int] = event_dict.get(duration_key, None)
if microseconds is not None:
if not isinstance(microseconds, (int, float)):
raise TypeError(
f"Expected dictionary to have a field '{duration_key}' of "
f"type float or int: {event_dict}"
)
duration = trace_time.TimeDelta.from_microseconds(
float(microseconds)
)
return DurationEvent(
duration=duration,
parent=None,
child_durations=[],
child_flows=[],
base=Event.from_dict(event_dict),
)
class AsyncEvent(Event):
"""An event which describes work which is happening asynchronously and which
may span multiple threads.
Dangling Chrome async begin/end events are dropped.
"""
def __init__(
self, id: int, duration: Optional[trace_time.TimeDelta], base: Event
) -> None:
super().__init__(
base.category, base.name, base.start, base.pid, base.tid, base.args
)
self.id: int = id
self.duration: Optional[trace_time.TimeDelta] = duration
@staticmethod
# from_dict should not be called on an instance
# type: ignore[override]
def from_dict(id: int, event_dict: dict[str, Any]) -> "AsyncEvent":
return AsyncEvent(id, duration=None, base=Event.from_dict(event_dict))
class FlowEvent(Event):
"""An event which describes control flow handoffs between threads or across
processes.
Malformed flow events are dropped. Malformed flow events could be any of:
* A begin flow event with a (category, name, id) tuple already in progress
(this is uncommon in practice).
* A step flow event with no preceding (category, name, id) tuple.
* An end flow event with no preceding (category, name, id) tuple.
"""
FLOW_EVENT_PHASE_MAP: dict[str, FlowEventPhase] = {
"s": FlowEventPhase.START,
"t": FlowEventPhase.STEP,
"f": FlowEventPhase.END,
}
def __init__(
self,
id: str,
phase: FlowEventPhase,
enclosing_duration: Optional[DurationEvent],
previous_flow: Optional[Self],
next_flow: Optional[Self],
base: Event,
) -> None:
super().__init__(
base.category, base.name, base.start, base.pid, base.tid, base.args
)
self.id: str = id
self.phase: FlowEventPhase = phase
self.enclosing_duration: Optional[DurationEvent] = enclosing_duration
self.previous_flow: Optional[Self] = previous_flow
self.next_flow: Optional[Self] = next_flow
@staticmethod
# from_dict should not be called on an instance
# type: ignore[override]
def from_dict(
id: str,
enclosing_duration: Optional[DurationEvent],
event_dict: dict[str, Any],
) -> "FlowEvent":
phase_key: str = "ph"
if phase_key not in event_dict:
raise TypeError(
f"Expected dictionary to have a field '{phase_key}' of type "
f"str: {event_dict}"
)
if not isinstance(event_dict[phase_key], str):
raise TypeError(
f"Expected dictionary to have a field '{phase_key}' of type "
f"str: {event_dict}"
)
phase_str: str = event_dict[phase_key]
if phase_str not in FlowEvent.FLOW_EVENT_PHASE_MAP:
raise TypeError(
f"Expected '{phase_key}' (phase field) of dict to be one of "
f"{list(FlowEvent.FLOW_EVENT_PHASE_MAP)}: {event_dict}"
)
phase: FlowEventPhase = FlowEvent.FLOW_EVENT_PHASE_MAP[phase_str]
return FlowEvent(
id,
phase,
enclosing_duration,
previous_flow=None,
next_flow=None,
base=Event.from_dict(event_dict),
)
@total_ordering
class ThreadState(enum.Enum):
"""Phase within the control flow lifecycle that a `FlowEvent` applies to."""
# Basic thread states, in zx_info_thread_t.state.
ZX_THREAD_STATE_NEW = 0x0000
ZX_THREAD_STATE_RUNNING = 0x0001
ZX_THREAD_STATE_SUSPENDED = 0x0002
# BLOCKED is never returned by itself.
# It is always returned with a more precise reason.
# See below.
ZX_THREAD_STATE_BLOCKED = 0x0003
ZX_THREAD_STATE_DYING = 0x0004
ZX_THREAD_STATE_DEAD = 0x0005
# More precise thread states.
ZX_THREAD_STATE_BLOCKED_EXCEPTION = 0x0103
ZX_THREAD_STATE_BLOCKED_SLEEPING = 0x0203
ZX_THREAD_STATE_BLOCKED_FUTEX = 0x0303
ZX_THREAD_STATE_BLOCKED_PORT = 0x0403
ZX_THREAD_STATE_BLOCKED_CHANNEL = 0x0503
ZX_THREAD_STATE_BLOCKED_WAIT_ONE = 0x0603
ZX_THREAD_STATE_BLOCKED_WAIT_MANY = 0x0703
ZX_THREAD_STATE_BLOCKED_INTERRUPT = 0x0803
ZX_THREAD_STATE_BLOCKED_PAGER = 0x0903
def __lt__(self, other: Any) -> bool:
if isinstance(other, ThreadState):
return self.value < other.value
return NotImplemented
class SchedulingRecord:
"""A record giving us information about cpu scheduling decisions"""
def __init__(
self,
start: trace_time.TimePoint,
tid: int,
prio: int | None,
args: dict[str, Any],
) -> None:
self.start: trace_time.TimePoint = start
self.tid: int = tid
self.prio: int | None = prio
self.args: dict[str, Any] = args.copy()
def is_idle(self) -> bool:
"""
True if the incoming thread is the idle thread
"""
return self.prio == INT32_MIN
class ContextSwitch(SchedulingRecord):
"""A record indicating that a thread has been scheduled on a given cpu"""
def __init__(
self,
start: trace_time.TimePoint,
incoming_tid: int,
outgoing_tid: int,
incoming_prio: int | None,
outgoing_prio: int | None,
outgoing_state: ThreadState,
args: dict[str, Any],
):
super().__init__(start, incoming_tid, incoming_prio, args.copy())
self.outgoing_tid = outgoing_tid
self.outgoing_prio = outgoing_prio
self.outgoing_state = outgoing_state
class Waking(SchedulingRecord):
"""A record indicating that a thread has been unblocked and is waiting to run on a given cpu"""
def __init__(
self,
start: trace_time.TimePoint,
tid: int,
prio: int | None,
args: dict[str, Any],
) -> None:
super().__init__(start, tid, prio, args.copy())
class Thread:
"""A thread within a trace model."""
def __init__(
self,
tid: int,
name: Optional[str] = None,
events: Optional[list[Event]] = None,
) -> None:
self.tid: int = tid
self.name: str = "" if name is None else name
self.events: list[Event] = [] if events is None else events
class Process:
"""A process within a trace model."""
def __init__(
self,
pid: int,
name: Optional[str] = None,
threads: Optional[list[Thread]] = None,
) -> None:
self.pid: int = pid
self.name: str = "" if name is None else name
self.threads: list[Thread] = [] if threads is None else threads
class Model:
"""The root of the trace model."""
def __init__(self) -> None:
self.processes: list[Process] = []
self.scheduling_records: dict[int, list[SchedulingRecord]] = {}
def all_events(self) -> Iterator[Event]:
for process in self.processes:
for thread in process.threads:
for event in thread.events:
yield event
def slice(
self,
start: Optional[trace_time.TimePoint] = None,
end: Optional[trace_time.TimePoint] = None,
) -> "Model":
"""Extract a sub-Model defined by a time interval.
Args:
start: Start of the time interval. If None, the start time of the
source Model is used. Only trace events that begin at or after
`start` will be included in the sub-model.
end: End of the time interval. If None, the end time of the source
Model is used. Only trace events that end at or before `end`
will be included in the sub-model.
"""
result = Model()
# The various event types have references to other events, which we will
# need to update so that all the relations in the new model stay within
# the new model. This dict tracks for each event of the old model, which
# event in the new model it corresponds to.
new_event_map: dict[Any, Event] = {}
T = TypeVar("T")
def get_new_event(old_event: T) -> T | None:
new_event: Event | None = new_event_map.get(old_event)
if not new_event:
return None
assert isinstance(new_event, type(old_event))
return new_event
# Step 1: Populate the model with new event objects. These events will
# have references into the old model.
for process in self.processes:
new_process = Process(process.pid, process.name)
result.processes.append(new_process)
for thread in process.threads:
new_thread = Thread(thread.tid, thread.name)
new_process.threads.append(new_thread)
for event in thread.events:
# Exclude any event that starts or ends outside of the
# specified range.
if (start is not None and event.start < start) or (
end is not None and event.start > end
):
continue
# Exclude any event whose duration ends outside of the
# specified range.
if isinstance(event, (AsyncEvent, DurationEvent)):
if (
end is not None
and event.duration > end - event.start
):
continue
new_event = copy.copy(event)
new_thread.events.append(new_event)
new_event.args = event.args.copy()
new_event_map[event] = new_event
# Step 2: Replace all referenced events by their corresponding ones in
# the new model.
for process in result.processes:
for thread in process.threads:
for event in thread.events:
if isinstance(event, DurationEvent):
if event.parent:
event.parent = get_new_event(event.parent)
updated_child_durations = []
for de in event.child_durations:
new_duration = get_new_event(de)
if new_duration is not None:
updated_child_durations.append(new_duration)
event.child_durations = updated_child_durations
updated_child_flows = []
for fe in event.child_flows:
new_flow = get_new_event(fe)
if new_flow is not None:
updated_child_flows.append(new_flow)
event.child_flows = updated_child_flows
elif isinstance(event, FlowEvent):
event.enclosing_duration = get_new_event(
event.enclosing_duration
)
event.previous_flow = get_new_event(event.previous_flow)
event.next_flow = get_new_event(event.next_flow)
def slice_scheduling_records(record: SchedulingRecord) -> bool:
return (start is None or record.start >= start) and (
end is None or record.start <= end
)
result.scheduling_records = {
cpu: list(filter(slice_scheduling_records, records))
for cpu, records in self.scheduling_records.items()
}
return result