blob: 397e0903ec200a8f30ef46268e3096ee09f07f37 [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
from collections.abc import Iterator
from dataclasses import dataclass
import gzip
import json
import math
import sys
import typing
import zlib
import environment
import event
async def writer(
recorder: event.EventRecorder,
out_stream: typing.TextIO,
) -> None:
"""Asynchronously serialize events to the given stream.
Args:
recorder (event.EventRecorder): The source of events to
drain. Continues until all events are written.
out_stream (typing.TextIO): Output text stream.
"""
value: event.Event
async for value in recorder.iter():
try:
json.dump(value.to_dict(), out_stream) # type:ignore
out_stream.write("\n")
# Eagerly flush after each line. This task may terminate at any time,
# including from an interrupt, so this ensures we at least see
# the most recently written lines.
out_stream.flush()
except TypeError as e:
print(f"LOG ERROR: {e} {value}")
@dataclass
class LogIterElement:
# If set, this element provides the path of the log that was opened.
log_path: str | None = None
# If set, this elements provides one event from the file.
log_event: event.Event | None = None
# If set, this element provides a warning message. Parsing should continue.
warning: str | None = None
# If set, this element provides a fatal error message. Parsing should stop.
error: str | None = None
class LogSource:
__static_key = object()
def __init__(
self,
__static_key: typing.Any,
exec_env: environment.ExecutionEnvironment | None = None,
stream: typing.TextIO | None = None,
):
assert (
__static_key == self.__static_key
), "LogSource must be created by from_env() or from_stream()."
assert (
exec_env is None or stream is None
), "LogSource must be either an environment or stream."
self._exec_env = exec_env
self._stream = stream
@classmethod
def from_env(
cls, exec_env: environment.ExecutionEnvironment
) -> "LogSource":
return LogSource(cls.__static_key, exec_env=exec_env)
@classmethod
def from_stream(cls, stream: typing.TextIO | None = None) -> "LogSource":
return LogSource(cls.__static_key, stream=stream)
def read_log(self) -> Iterator[LogIterElement]:
stream = self._stream
try:
if stream is None and self._exec_env is not None:
log_path = self._exec_env.get_most_recent_log()
yield LogIterElement(log_path=log_path)
stream = gzip.open(log_path, "rt")
assert stream is not None
for line in stream:
if not line:
continue
json_contents = json.loads(line)
log_event: event.Event = event.Event.from_dict(json_contents) # type: ignore[attr-defined]
yield LogIterElement(log_event=log_event)
except environment.EnvironmentError as e:
yield LogIterElement(error=f"Failed to read log: {e}")
return
except gzip.BadGzipFile as e:
yield LogIterElement(
error=f"File does not appear to be a gzip file. ({e})"
)
return
except json.JSONDecodeError as e:
yield LogIterElement(
warning=f"Found invalid JSON data, skipping the rest and proceeding. ({e})"
)
except (EOFError, zlib.error) as e:
yield LogIterElement(
warning=f"File may be corrupt, skipping the rest and proceeding. ({e})",
)
def pretty_print(
log_source: LogSource,
) -> bool:
suite_names: dict[int, str] = dict()
command_to_suite: dict[int, int] = dict()
formatted_suite_events: collections.defaultdict[
int, list[str]
] = collections.defaultdict(list)
time_base: float | None = None
def format_time(e: event.Event) -> str:
assert time_base
ts = e.timestamp - time_base
seconds = math.floor(ts)
millis = int(ts * 1e3 % 1e3)
return f"{seconds:04}.{millis:03}"
for element in log_source.read_log():
if (e := element.log_event) is not None:
if e.id == 0:
time_base = e.timestamp
if not e.payload:
continue
if ex := e.payload.test_suite_started:
assert e.id
suite_names[e.id] = ex.name
formatted_suite_events[e.id].append(
f"[{format_time(e)}] Starting suite {ex.name}"
)
if (
(pid := e.parent)
and pid in suite_names
and (command := e.payload.program_execution)
):
assert e.id
command_to_suite[e.id] = pid
args = " ".join([command.command] + command.flags)
env = command.environment
formatted_suite_events[pid].append(
f"[{format_time(e)}] Running command\n Args: {args}\n Env: {env}"
)
if e.id in command_to_suite:
if output := e.payload.program_output:
formatted_suite_events[command_to_suite[e.id]].append(
f"[{format_time(e)}] {output.data}"
)
if termination := e.payload.program_termination:
formatted_suite_events[command_to_suite[e.id]].append(
f"[{format_time(e)}] Command terminated: {termination.return_code}"
)
if e.id in suite_names and (outcome := e.payload.test_suite_ended):
formatted_suite_events[e.id].append(
f"[{format_time(e)}] Suite ended with status {outcome.status.value}"
)
elif (warning := element.warning) is not None:
print(warning, file=sys.stderr)
elif (error := element.error) is not None:
print(error, file=sys.stderr)
return False
print(f"{len(suite_names)} tests were run")
for id in sorted(suite_names.keys()):
name = suite_names[id]
print(f"\n[START {name}]")
for line in formatted_suite_events[id]:
print(line.strip())
print(f"[END {name}]\n")
return True