blob: edd414cf39db79dd3e88876e880f50820a7af583 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import contextlib
import io
import shutil
import subprocess
import tempfile
import textwrap
import unittest
from unittest import mock
from api.log import log_pb2
from go.api.command import command_pb2
from google.protobuf import timestamp_pb2
import reproxy_logs
from pathlib import Path
def _write_file_contents(path: Path, contents: str):
with open(path, "w") as f:
f.write(contents)
def _digests_equal(left: log_pb2.LogRecord, right: log_pb2.LogRecord) -> bool:
return (
left.remote_metadata.action_digest
== right.remote_metadata.action_digest
)
class DownloadEventTests(unittest.TestCase):
def test_distribute_fits_in_first_interval(self):
event = reproxy_logs.DownloadEvent(
bytes_downloaded=1000,
start_time=5.0,
end_time=6.0,
)
intervals = list(event.distribute_over_intervals(interval_seconds=10.0))
for (actual_index, actual_bytes, actual_concurrent), (
expected_index,
expected_bytes,
expected_concurrent,
) in zip(intervals, [(0, 1000.0, 0.1)]):
self.assertEqual(actual_index, expected_index)
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
def test_distribute_fits_in_one_interval(self):
event = reproxy_logs.DownloadEvent(
bytes_downloaded=1000,
start_time=34.0,
end_time=37.0,
)
intervals = list(event.distribute_over_intervals(interval_seconds=10.0))
for (actual_index, actual_bytes, actual_concurrent), (
expected_index,
expected_bytes,
expected_concurrent,
) in zip(intervals, [(3, 1000.0, 0.3)]):
self.assertEqual(actual_index, expected_index)
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
def test_distribute_spans_two_intervals(self):
event = reproxy_logs.DownloadEvent(
bytes_downloaded=1000,
start_time=24.0,
end_time=34.0,
)
intervals = list(event.distribute_over_intervals(interval_seconds=10.0))
for (actual_index, actual_bytes, actual_concurrent), (
expected_index,
expected_bytes,
expected_concurrent,
) in zip(
intervals,
[
(2, 600.0, 0.6),
(3, 400.0, 0.4),
],
):
self.assertEqual(actual_index, expected_index)
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
def test_distribute_spans_one_exact_interval(self):
event = reproxy_logs.DownloadEvent(
bytes_downloaded=500,
start_time=20.0,
end_time=40.0,
)
intervals = list(event.distribute_over_intervals(interval_seconds=20.0))
for (actual_index, actual_bytes, actual_concurrent), (
expected_index,
expected_bytes,
expected_concurrent,
) in zip(
intervals,
[
(1, 500.0, 1.0),
],
):
self.assertEqual(actual_index, expected_index)
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
def test_distribute_spans_three_intervals(self):
event = reproxy_logs.DownloadEvent(
bytes_downloaded=1600,
start_time=28.0,
end_time=44.0,
)
intervals = list(event.distribute_over_intervals(interval_seconds=10.0))
for (actual_index, actual_bytes, actual_concurrent), (
expected_index,
expected_bytes,
expected_concurrent,
) in zip(
intervals,
[
(2, 200.0, 0.2),
(3, 1000.0, 1.0),
(4, 400.0, 0.4),
],
):
self.assertEqual(actual_index, expected_index)
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
class DistributeDownloadEventBytesOverTimeTests(unittest.TestCase):
def test_empty(self):
intervals = reproxy_logs.distribute_bytes_downloaded_over_time(
total_time_seconds=3.0,
interval_seconds=1.0,
events=[],
)
for (
(actual_bytes, actual_concurrent),
expected_bytes,
expected_concurrent,
) in zip(intervals, [0.0] * 4, [0.0] * 4):
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
def test_two_events(self):
events = [
reproxy_logs.DownloadEvent(
bytes_downloaded=100,
start_time=2.0,
end_time=12.0,
),
reproxy_logs.DownloadEvent(
bytes_downloaded=300,
start_time=12.0,
end_time=18.0,
),
]
intervals = reproxy_logs.distribute_bytes_downloaded_over_time(
total_time_seconds=20.0,
interval_seconds=5.0,
events=events,
)
# bytes[0]: (0, 30), (1, 50), (2, 20)
# bytes[1]: (2, 150), (3, 150)
# concurrent[0]: (0, 0.6), (1, 1.0), (2, 0.4)
# concurrent[1]: (2, 0.6), (3, 0.6)
for (
(actual_bytes, actual_concurrent),
expected_bytes,
expected_concurrent,
) in zip(
intervals,
[30.0, 50.0, 170.0, 150.0, 0.0],
[0.6, 1.0, 1.0, 0.6, 0.0],
):
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
class ReproxyLogTests(unittest.TestCase):
def test_construction(self):
record1 = log_pb2.LogRecord()
record1.command.output.output_files.extend(["obj/foo.o", "obj/foo2.o"])
record1.remote_metadata.action_digest = "ffff0000/13"
record2 = log_pb2.LogRecord()
record2.command.output.output_files.extend(["obj/bar.o"])
record2.remote_metadata.action_digest = "bbbbaaaa/9"
log_dump = log_pb2.LogDump(records=[record1, record2])
log = reproxy_logs.ReproxyLog(log_dump)
self.assertEqual(log.proto, log_dump)
self.assertEqual(
log.records_by_output_file,
{
Path("obj/foo.o"): record1,
Path("obj/foo2.o"): record1,
Path("obj/bar.o"): record2,
},
)
self.assertEqual(
log.records_by_action_digest,
{
"ffff0000/13": record1,
"bbbbaaaa/9": record2,
},
)
def test_diff_by_outputs_matching(self):
record1 = log_pb2.LogRecord()
record1.command.output.output_files.extend(["obj/foo.o"])
record1.remote_metadata.action_digest = "ffeeff001100/313"
log_dump = log_pb2.LogDump(records=[record1])
log = reproxy_logs.ReproxyLog(log_dump)
diffs = list(log.diff_by_outputs(log, _digests_equal))
self.assertEqual(diffs, [])
def test_diff_by_outputs_different(self):
output = Path("obj/foo")
record1 = log_pb2.LogRecord()
record1.command.output.output_files.extend([str(output)])
record1.remote_metadata.action_digest = "ffeeff001100/313"
record2 = log_pb2.LogRecord()
record2.command.output.output_files.extend([str(output)])
record2.remote_metadata.action_digest = "98231772bbbd/313"
log_dump1 = log_pb2.LogDump(records=[record1])
log_dump2 = log_pb2.LogDump(records=[record2])
log1 = reproxy_logs.ReproxyLog(log_dump1)
log2 = reproxy_logs.ReproxyLog(log_dump2)
diffs = list(log1.diff_by_outputs(log2, _digests_equal))
self.assertEqual(diffs, [(output, record1, record2)])
def test_bandwidth_no_remote_metadata(self):
record = log_pb2.LogRecord()
# no remote metadata
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
download_bytes, upload_bytes = log.bandwidth_summary()
self.assertEqual(download_bytes, 0)
self.assertEqual(upload_bytes, 0)
def test_bandwidth_no_upload_download(self):
record = log_pb2.LogRecord()
record.remote_metadata.action_digest = "ccddcdcdcdd/13"
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
download_bytes, upload_bytes = log.bandwidth_summary()
self.assertEqual(download_bytes, 0)
self.assertEqual(upload_bytes, 0)
def test_bandwidth_nonzero(self):
record1 = log_pb2.LogRecord()
record1.remote_metadata.real_bytes_downloaded = 100
record1.remote_metadata.real_bytes_uploaded = 10
record2 = log_pb2.LogRecord()
record2.remote_metadata.real_bytes_downloaded = 20
record2.remote_metadata.real_bytes_uploaded = 5
log_dump = log_pb2.LogDump(records=[record1, record2])
log = reproxy_logs.ReproxyLog(log_dump)
download_bytes, upload_bytes = log.bandwidth_summary()
self.assertEqual(download_bytes, 120)
self.assertEqual(upload_bytes, 15)
def test_download_profile_no_events(self):
log_dump = log_pb2.LogDump()
log = reproxy_logs.ReproxyLog(log_dump)
data = log.download_profile(interval_seconds=5.0)
self.assertEqual(data, [])
def test_download_profile_one_event(self):
record = log_pb2.LogRecord()
record.remote_metadata.real_bytes_downloaded = 100
# kwargs dict to workaround 'from' being a Python keyword
interval_kwargs = {
"from": timestamp_pb2.Timestamp(seconds=55, nanos=0),
"to": timestamp_pb2.Timestamp(seconds=65, nanos=0),
}
record.remote_metadata.event_times["DownloadResults"].MergeFrom(
command_pb2.TimeInterval(**interval_kwargs)
)
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
data = log.download_profile(interval_seconds=5.0)
for (
(actual_bytes, actual_concurrent),
expected_bytes,
expected_concurrent,
) in zip(data, [50.0, 50.0, 0.0], [1.0, 1.0, 0.0]):
self.assertAlmostEqual(actual_bytes, expected_bytes)
self.assertAlmostEqual(actual_concurrent, expected_concurrent)
class SetupLogdirForLogDumpTest(unittest.TestCase):
def test_already_a_directory(self):
with tempfile.TemporaryDirectory() as td:
reproxy_logs.setup_logdir_for_logdump(Path(td))
def test_log_file_copied_to_new_dir_missing_reproxy_prefix(self):
with tempfile.TemporaryDirectory() as td:
log_file = Path(td) / "test.rrpl"
log_contents = "fake log contents, not checked"
_write_file_contents(log_file, log_contents)
with mock.patch.object(Path, "mkdir") as mock_mkdir:
with mock.patch.object(shutil, "copy2") as mock_copy:
new_log_dir = reproxy_logs.setup_logdir_for_logdump(
log_file
)
mock_mkdir.assert_called_with(parents=True, exist_ok=True)
mock_copy.assert_called_with(
log_file, new_log_dir / "reproxy_test.rrpl"
)
def test_log_file_copied_to_new_dir_with_reproxy_prefix(self):
with tempfile.TemporaryDirectory() as td:
log_base = "reproxy_foo.rrpl"
log_file = Path(td) / log_base
log_contents = "fake log contents, not checked"
_write_file_contents(log_file, log_contents)
with mock.patch.object(Path, "mkdir") as mock_mkdir:
with mock.patch.object(shutil, "copy2") as mock_copy:
new_log_dir = reproxy_logs.setup_logdir_for_logdump(
log_file
)
mock_mkdir.assert_called_with(parents=True, exist_ok=True)
mock_copy.assert_called_with(log_file, new_log_dir / log_base)
class ConvertReproxyActionsLogTest(unittest.TestCase):
def test_basic(self):
empty_log = log_pb2.LogDump()
with mock.patch.object(subprocess, "check_call") as mock_process_call:
with mock.patch.object(
reproxy_logs, "_log_dump_from_pb", return_value=empty_log
) as mock_parse:
log_dump = reproxy_logs.convert_reproxy_actions_log(
reproxy_logdir=Path("/tmp/reproxy.log.dir"),
reclient_bindir=Path("/usr/local/reclient/bin"),
)
mock_process_call.assert_called_once()
mock_parse.assert_called_once()
self.assertEqual(log_dump, log_dump)
class DiffLogsTests(unittest.TestCase):
def test_flow(self):
empty_log = log_pb2.LogDump()
empty_logs = [reproxy_logs.ReproxyLog(empty_log)] * 2
args = argparse.Namespace(
reproxy_logs=[Path("log1"), Path("log2")],
)
with mock.patch.object(
reproxy_logs, "parse_logs", return_value=empty_logs
) as mock_multi_parse:
status = reproxy_logs.diff_logs(args)
self.assertEqual(status, 0)
class WarmthLogsTests(unittest.TestCase):
def test_logs_comparison(self):
record1 = log_pb2.LogRecord()
record1.remote_metadata.action_digest = "aaaa/44"
record1.remote_metadata.result.status = (
command_pb2.CommandResultStatus.SUCCESS
)
record2 = log_pb2.LogRecord()
record2.remote_metadata.action_digest = (
record1.remote_metadata.action_digest
)
record2.remote_metadata.result.status = (
command_pb2.CommandResultStatus.CACHE_HIT
)
record3 = log_pb2.LogRecord()
record3.remote_metadata.action_digest = "bbbb/55"
record3.remote_metadata.result.status = (
command_pb2.CommandResultStatus.CACHE_HIT
)
record4 = log_pb2.LogRecord()
record4.remote_metadata.action_digest = "cccc/66"
record4.remote_metadata.result.status = (
command_pb2.CommandResultStatus.CACHE_HIT
)
record5 = log_pb2.LogRecord()
record5.remote_metadata.action_digest = "dddd/77"
record5.remote_metadata.result.status = (
command_pb2.CommandResultStatus.CACHE_HIT
)
ref_actions = reproxy_logs.ReproxyLog(
log_pb2.LogDump(records=[record1, record3, record4])
)
test_actions = reproxy_logs.ReproxyLog(
log_pb2.LogDump(
records=[
record2,
record3,
record5,
]
)
)
with mock.patch.object(
reproxy_logs, "parse_logs", return_value=(ref_actions, test_actions)
) as mock_parse:
result = io.StringIO()
with contextlib.redirect_stdout(result):
status = reproxy_logs.warmth_logs(
args=argparse.Namespace(
reference_log=Path("first.rrpl"),
test_log=Path("second.rrpl"),
)
)
self.assertEqual(status, 0)
self.assertEqual(
result.getvalue(),
textwrap.dedent(
"""\
The first build had 3 actions.
The second build had 3 actions.
There are 2 actions in common with the first build.
Among those common actions, 2 were cache hits.
Cache hits warmed up directly by the first build: 1
Cache hits warmed up before the first build: 1
"""
),
)
class LookupOutputFileDigestTests(unittest.TestCase):
def test_output_path_not_found(self):
empty_log = log_pb2.LogDump()
log = reproxy_logs.ReproxyLog(empty_log)
args = argparse.Namespace(
log=Path("foo.rrpl"),
path=Path("output/does/not/exist.o"),
)
with mock.patch.object(
reproxy_logs, "parse_log", return_value=log
) as mock_parse:
self.assertEqual(
reproxy_logs.lookup_output_file_digest_command(args), 1
)
def test_output_path_found(self):
record = log_pb2.LogRecord()
path1 = "obj/aaa.o"
path2 = "obj/bbb.o"
digest1 = "ababababa/111"
digest2 = "efefefefe/222"
file_digests = [(path1, digest1), (path2, digest2)]
for path, digest in file_digests:
record.command.output.output_files.append(path)
record.remote_metadata.output_file_digests[path] = digest
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
for path, digest in file_digests:
args = argparse.Namespace(
log=Path("foo.rrpl"),
path=Path(path),
)
with mock.patch.object(
reproxy_logs, "parse_log", return_value=log
) as mock_parse:
result = io.StringIO()
with contextlib.redirect_stdout(result):
self.assertEqual(
reproxy_logs.lookup_output_file_digest_command(args), 0
)
self.assertEqual(result.getvalue(), digest + "\n")
class FilterRecordsTests(unittest.TestCase):
def test_basic_counter(self):
counter = 0
def action(unused):
nonlocal counter # reference the one in the above scope
counter += 1
record = log_pb2.LogRecord()
reproxy_logs.filter_and_apply_records(
records=[record, record],
predicate=lambda record: True,
action=action,
)
self.assertEqual(counter, 2)
def test_basic_no_matches(self):
counter = 0
def action(unused):
nonlocal counter # reference the one in the above scope
counter += 1
record = log_pb2.LogRecord()
reproxy_logs.filter_and_apply_records(
records=[record, record],
predicate=lambda record: False,
action=action,
)
self.assertEqual(counter, 0)
class FilterRlibsCommandTest(unittest.TestCase):
def test_filter(self):
record1 = log_pb2.LogRecord()
record2 = log_pb2.LogRecord()
record1.command.output.output_files.extend(["foo.rlib"]) # match this
record2.command.output.output_files.extend(["bar.cc.o"]) # but not this
logdump = log_pb2.LogDump(records=[record1, record2])
with mock.patch.object(
reproxy_logs,
"parse_log",
return_value=reproxy_logs.ReproxyLog(logdump),
) as mock_parse:
printed_text = io.StringIO()
with contextlib.redirect_stdout(printed_text):
exit_code = reproxy_logs.filter_rlibs_command(
argparse.Namespace(log=Path("x.rrpl"))
)
self.assertEqual(exit_code, 0)
mock_parse.assert_called_once()
self.assertEqual(printed_text.getvalue(), str(record1) + "\n\n")
class MainTests(unittest.TestCase):
def test_diff(self):
empty_log = log_pb2.LogDump()
empty_logs = [reproxy_logs.ReproxyLog(empty_log)] * 2
with mock.patch.object(
reproxy_logs, "parse_logs", return_value=empty_logs
) as mock_multi_parse:
status = reproxy_logs.main(["diff", "left.rrpl", "right.rrpl"])
self.assertEqual(status, 0)
mock_multi_parse.assert_called()
def test_output_file_digest(self):
record = log_pb2.LogRecord()
path1 = "obj/aaa.o"
digest1 = "ababababa/111"
record.command.output.output_files.append(path1)
record.remote_metadata.output_file_digests[path1] = digest1
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
with mock.patch.object(
reproxy_logs, "parse_log", return_value=log
) as mock_multi_parse:
status = reproxy_logs.main(
["output_file_digest", "--log", "log.rrpl", "--path", path1]
)
self.assertEqual(status, 0)
mock_multi_parse.assert_called_once()
def test_bandwidth(self):
up = 33
down = 999
record = log_pb2.LogRecord()
record.remote_metadata.real_bytes_uploaded = up
record.remote_metadata.real_bytes_downloaded = down
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
result = io.StringIO()
with mock.patch.object(
reproxy_logs, "parse_log", return_value=log
) as mock_multi_parse:
with contextlib.redirect_stdout(result):
status = reproxy_logs.main(["bandwidth", "log.rrpl"])
self.assertEqual(
result.getvalue(),
f"total_download_bytes = {down}\ntotal_upload_bytes = {up}\n",
)
self.assertEqual(status, 0)
mock_multi_parse.assert_called_once()
def test_plot_download(self):
record = log_pb2.LogRecord()
record.remote_metadata.real_bytes_downloaded = 1000000
# kwargs dict to workaround 'from' being a Python keyword
interval_kwargs = {
"from": timestamp_pb2.Timestamp(seconds=20, nanos=0),
"to": timestamp_pb2.Timestamp(seconds=40, nanos=0),
}
record.remote_metadata.event_times["DownloadResults"].MergeFrom(
command_pb2.TimeInterval(**interval_kwargs)
)
log_dump = log_pb2.LogDump(records=[record])
log = reproxy_logs.ReproxyLog(log_dump)
result = io.StringIO()
with mock.patch.object(
reproxy_logs, "parse_log", return_value=log
) as mock_multi_parse:
with contextlib.redirect_stdout(result):
status = reproxy_logs.main(
["plot_download", "--interval", "10", "log.rrpl"]
)
self.assertEqual(
result.getvalue(),
"\n".join(
[
"time,bytes,concurrent",
"0.0,500000.0,1.0",
"10.0,500000.0,1.0",
"20.0,0.0,0.0",
]
)
+ "\n",
)
self.assertEqual(status, 0)
mock_multi_parse.assert_called_once()
def test_filter_rlibs(self):
log = reproxy_logs.ReproxyLog(log_pb2.LogDump())
with mock.patch.object(
reproxy_logs, "parse_log", return_value=log
) as mock_parse:
with mock.patch.object(
reproxy_logs, "filter_and_apply_records"
) as mock_filter:
status = reproxy_logs.main(["filter_rlibs", "log.rrpl"])
self.assertEqual(status, 0)
mock_parse.assert_called_once()
mock_filter.assert_called_once()
if __name__ == "__main__":
unittest.main()