blob: 25d5313ee34ff12f1479675edd99ec519a915597 [file] [log] [blame]
#!/usr/bin/env python3.8
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import textwrap
import unittest
from unittest import mock
from typing import AbstractSet, Iterable
import action_tracer
class ToolCommandTests(unittest.TestCase):
def test_empty_command(self):
with self.assertRaises(IndexError):
action_tracer.ToolCommand().tool
def test_command_no_args(self):
command = action_tracer.ToolCommand(tokens=['echo'])
self.assertEqual(command.env_tokens, [])
self.assertEqual(command.tool, 'echo')
self.assertEqual(command.args, [])
def test_command_with_env(self):
command = action_tracer.ToolCommand(tokens=['TMPDIR=/my/tmp', 'echo'])
self.assertEqual(command.env_tokens, ['TMPDIR=/my/tmp'])
self.assertEqual(command.tool, 'echo')
self.assertEqual(command.args, [])
def test_command_with_args(self):
command = action_tracer.ToolCommand(
tokens=['ln', '-f', '-s', 'foo', 'bar'])
self.assertEqual(command.env_tokens, [])
self.assertEqual(command.tool, 'ln')
self.assertEqual(command.args, ['-f', '-s', 'foo', 'bar'])
def test_unwrap_no_change(self):
tokens = ['ln', '-f', '-s', 'foo', 'bar']
command = action_tracer.ToolCommand(tokens=tokens)
self.assertEqual(command.unwrap().tokens, tokens)
def test_unwrap_one_level(self):
tokens = ['wrapper', '--opt', 'foo', '--', 'bar.sh', 'arg']
command = action_tracer.ToolCommand(tokens=tokens)
self.assertEqual(command.unwrap().tokens, ['bar.sh', 'arg'])
def test_unwrap_one_of_many_level(self):
tokens = [
'wrapper', '--opt', 'foo', '--', 'bar.sh', 'arg', '--', 'inner.sh'
]
command = action_tracer.ToolCommand(tokens=tokens)
command2 = command.unwrap()
self.assertEqual(command2.tokens, ['bar.sh', 'arg', '--', 'inner.sh'])
command3 = command2.unwrap()
self.assertEqual(command3.tokens, ['inner.sh'])
class IsKnownWrapperTests(unittest.TestCase):
def test_action_tracer_is_not_wrapper(self):
command = action_tracer.ToolCommand(
tokens=[
'path/to/python3.x', 'path/to/not_a_wrapper.py', '--opt1',
'arg1'
])
self.assertFalse(action_tracer.is_known_wrapper(command))
def test_action_tracer_is_not_wrapper_implicit_interpreter(self):
command = action_tracer.ToolCommand(
tokens=['path/to/not_a_wrapper.py', '--opt1', 'arg1'])
self.assertFalse(action_tracer.is_known_wrapper(command))
def test_action_tracer_is_wrapper(self):
command = action_tracer.ToolCommand(
tokens=[
'path/to/python3.x', 'path/to/action_tracer.py', '--', 'foo.sh',
'arg1', 'arg2'
])
self.assertTrue(action_tracer.is_known_wrapper(command))
def test_action_tracer_is_wrapper_extra_python_flag(self):
command = action_tracer.ToolCommand(
tokens=[
'path/to/python3.x', '-S', 'path/to/action_tracer.py', '--',
'foo.sh', 'arg1', 'arg2'
])
self.assertTrue(action_tracer.is_known_wrapper(command))
def test_action_tracer_is_wrapper_implicit_interpreter(self):
command = action_tracer.ToolCommand(
tokens=['path/to/action_tracer.py', '--', 'foo.sh', 'arg1', 'arg2'])
self.assertTrue(action_tracer.is_known_wrapper(command))
class DepEdgesParseTests(unittest.TestCase):
def test_invalid_input(self):
with self.assertRaises(ValueError):
action_tracer.parse_dep_edges(
"output.txt input1.txt") # missing ":"
def test_output_only(self):
dep = action_tracer.parse_dep_edges("output.txt:")
self.assertEqual(dep.ins, set())
self.assertEqual(dep.outs, {"output.txt"})
def test_multipl_outputs_only(self):
dep = action_tracer.parse_dep_edges("output.txt output2.txt :")
self.assertEqual(dep.ins, set())
self.assertEqual(dep.outs, {"output.txt", "output2.txt"})
def test_output_with_one_input(self):
dep = action_tracer.parse_dep_edges("output.txt:input.cc")
self.assertEqual(dep.ins, {"input.cc"})
self.assertEqual(dep.outs, {"output.txt"})
def test_output_with_multiple_inputs(self):
dep = action_tracer.parse_dep_edges(
"output.txt:input.cc includes/header.h")
self.assertEqual(dep.ins, {"input.cc", "includes/header.h"})
self.assertEqual(dep.outs, {"output.txt"})
def test_output_with_multiple_inputs_unusual_spacing(self):
dep = action_tracer.parse_dep_edges(
" output.txt : input.cc includes/header.h ")
self.assertEqual(dep.ins, {"input.cc", "includes/header.h"})
self.assertEqual(dep.outs, {"output.txt"})
def test_file_name_with_escaped_space(self):
dep = action_tracer.parse_dep_edges(
"output.txt: source\\ input.cc includes/header.h")
self.assertEqual(dep.ins, {"source input.cc", "includes/header.h"})
self.assertEqual(dep.outs, {"output.txt"})
class ParseDepFileTests(unittest.TestCase):
def test_empty(self):
depfile = action_tracer.parse_depfile([])
self.assertEqual(depfile.deps, [])
self.assertEqual(depfile.all_ins, set())
self.assertEqual(depfile.all_outs, set())
def test_multiple_ins_multiple_outs(self):
depfile = action_tracer.parse_depfile(["a b: c d"])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins={"c", "d"}, outs={"a", "b"}),
])
self.assertEqual(depfile.all_ins, {"c", "d"})
self.assertEqual(depfile.all_outs, {"a", "b"})
def test_two_deps(self):
depfile = action_tracer.parse_depfile([
"A: B",
"C: D E",
])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins={"B"}, outs={"A"}),
action_tracer.DepEdges(ins={"D", "E"}, outs={"C"}),
])
self.assertEqual(depfile.all_ins, {"B", "D", "E"})
self.assertEqual(depfile.all_outs, {"A", "C"})
def test_continuation(self):
depfile = action_tracer.parse_depfile(
[
"a \\\n",
"b: \\\n",
"c \\\n",
"d",
])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins={"c", "d"}, outs={"a", "b"}),
])
self.assertEqual(depfile.all_ins, {"c", "d"})
self.assertEqual(depfile.all_outs, {"a", "b"})
def test_carriage_continuation(self):
depfile = action_tracer.parse_depfile(
[
"a \\\r\n",
"b: c \\\r\n",
"d e",
])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins={"c", "d", "e"}, outs={"a", "b"}),
])
self.assertEqual(depfile.all_ins, {"c", "d", "e"})
self.assertEqual(depfile.all_outs, {"a", "b"})
def test_space_in_filename(self):
depfile = action_tracer.parse_depfile(["a\\ b: c d"])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins={"c", "d"}, outs={"a b"}),
])
self.assertEqual(depfile.all_ins, {"c", "d"})
self.assertEqual(depfile.all_outs, {"a b"})
def test_consecutive_backslashes(self):
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile(["a\\\\ b: c"])
def test_trailing_escaped_whitespace(self):
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile([
"a \\ \r\n",
"b: c",
])
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile([
"a \\ \n",
"b: c",
])
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile([
"a \\ \n",
"b: c",
])
def test_unfinished_line_continuation(self):
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile([
"a \\\n",
"b: c \\\n",
])
def test_blank_line(self):
depfile = action_tracer.parse_depfile([
"a:b",
" ",
"b:",
])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins={"b"}, outs={"a"}),
action_tracer.DepEdges(ins=set(), outs={"b"}),
])
self.assertEqual(depfile.all_ins, {"b"})
self.assertEqual(depfile.all_outs, {"a", "b"})
def test_comment(self):
depfile = action_tracer.parse_depfile([
" # a:b",
"b:",
])
self.assertEqual(
depfile.deps, [
action_tracer.DepEdges(ins=set(), outs={"b"}),
])
self.assertEqual(depfile.all_ins, set())
self.assertEqual(depfile.all_outs, {"b"})
def test_continuation_blank_line(self):
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile([
"a: \\\n",
"",
"b",
])
def test_continuation_comment(self):
with self.assertRaises(ValueError):
depfile = action_tracer.parse_depfile(
[
"a: \\\n",
"# comment",
"b",
])
class ParseFsatraceOutputTests(unittest.TestCase):
def test_empty_stream(self):
self.assertEqual(
list(action_tracer.parse_fsatrace_output([])),
[],
)
def test_ignore_malformed_line(self):
self.assertEqual(
list(action_tracer.parse_fsatrace_output(["invalid_line"])),
[],
)
def test_read(self):
self.assertEqual(
list(action_tracer.parse_fsatrace_output(["r|README.md"])),
[action_tracer.Read("README.md")],
)
def test_write(self):
self.assertEqual(
list(action_tracer.parse_fsatrace_output(["w|main.o"])),
[action_tracer.Write("main.o")],
)
def test_touch(self):
self.assertEqual(
list(action_tracer.parse_fsatrace_output(["t|file.stamp"])),
[action_tracer.Write("file.stamp")],
)
def test_delete(self):
self.assertEqual(
list(action_tracer.parse_fsatrace_output(["d|remove-me.tmp"])),
[action_tracer.Delete("remove-me.tmp")],
)
def test_move(self):
self.assertEqual(
list(
action_tracer.parse_fsatrace_output(["m|dest.txt|source.txt"])),
[
action_tracer.Delete("source.txt"),
action_tracer.Write("dest.txt"),
],
)
def test_sequence(self):
self.assertEqual(
list(
action_tracer.parse_fsatrace_output(
[
"m|dest.txt|source.txt",
"r|input.txt",
"w|output.log",
])),
[
action_tracer.Delete("source.txt"),
action_tracer.Write("dest.txt"),
action_tracer.Read("input.txt"),
action_tracer.Write("output.log"),
],
)
class MatchConditionsTests(unittest.TestCase):
def test_no_conditions(self):
self.assertFalse(action_tracer.MatchConditions().matches("foo/bar"))
def test_prefix_matches(self):
self.assertTrue(
action_tracer.MatchConditions(prefixes={"fo"}).matches("foo/bar"))
def test_suffix_matches(self):
self.assertTrue(
action_tracer.MatchConditions(suffixes={"ar"}).matches("foo/bar"))
def test_component_matches(self):
self.assertTrue(
action_tracer.MatchConditions(
components={"bar", "bq"}).matches("foo/bar/baz.txt"))
class AccessShouldCheckTests(unittest.TestCase):
def test_no_required_prefix(self):
ignore_conditions = action_tracer.MatchConditions()
self.assertTrue(
action_tracer.Read("book").should_check(
ignore_conditions=ignore_conditions, required_path_prefix=""))
self.assertTrue(
action_tracer.Write("block").should_check(
ignore_conditions=ignore_conditions, required_path_prefix=""))
def test_required_prefix_matches(self):
ignore_conditions = action_tracer.MatchConditions()
prefix = "/home/project"
self.assertTrue(
action_tracer.Read("/home/project/book").should_check(
ignore_conditions=ignore_conditions,
required_path_prefix=prefix))
self.assertTrue(
action_tracer.Write("/home/project/out/block").should_check(
ignore_conditions=ignore_conditions,
required_path_prefix=prefix))
def test_required_prefix_no_match(self):
ignore_conditions = action_tracer.MatchConditions()
prefix = "/home/project"
self.assertFalse(
action_tracer.Read("book").should_check(
ignore_conditions=ignore_conditions,
required_path_prefix=prefix))
self.assertFalse(
action_tracer.Write("output/log").should_check(
ignore_conditions=ignore_conditions,
required_path_prefix=prefix))
def test_no_ignored_prefix(self):
ignore_conditions = action_tracer.MatchConditions(prefixes={})
self.assertTrue(
action_tracer.Read("book").should_check(
ignore_conditions=ignore_conditions))
self.assertTrue(
action_tracer.Write("output/log").should_check(
ignore_conditions=ignore_conditions))
def test_ignored_prefix_matches(self):
ignore_conditions = action_tracer.MatchConditions(prefixes={"/tmp"})
self.assertFalse(
action_tracer.Read("/tmp/book").should_check(
ignore_conditions=ignore_conditions))
self.assertFalse(
action_tracer.Write("/tmp/log").should_check(
ignore_conditions=ignore_conditions))
def test_ignored_prefix_no_match(self):
ignore_conditions = action_tracer.MatchConditions(
prefixes={"/tmp", "/no/look/here"})
self.assertTrue(
action_tracer.Read("book").should_check(
ignore_conditions=ignore_conditions))
self.assertTrue(
action_tracer.Write("out/log").should_check(
ignore_conditions=ignore_conditions))
def test_no_ignored_suffix(self):
ignore_conditions = action_tracer.MatchConditions(suffixes={})
self.assertTrue(
action_tracer.Read("book").should_check(
ignore_conditions=ignore_conditions))
self.assertTrue(
action_tracer.Write("output/log").should_check(
ignore_conditions=ignore_conditions))
def test_ignored_suffix_matches(self):
# e.g. from compiler --save-temps
ignore_conditions = action_tracer.MatchConditions(suffixes={".ii"})
self.assertFalse(
action_tracer.Read("book.ii").should_check(
ignore_conditions=ignore_conditions))
self.assertFalse(
action_tracer.Write("tmp/log.ii").should_check(
ignore_conditions=ignore_conditions))
def test_ignored_suffix_no_match(self):
# e.g. from compiler --save-temps
ignore_conditions = action_tracer.MatchConditions(
suffixes={".ii", ".S"})
self.assertTrue(
action_tracer.Read("book.txt").should_check(
ignore_conditions=ignore_conditions))
self.assertTrue(
action_tracer.Write("out/process.log").should_check(
ignore_conditions=ignore_conditions))
def test_ignored_path_components_no_match(self):
ignore_conditions = action_tracer.MatchConditions(
components={"__auto__", ".generated"})
self.assertTrue(
action_tracer.Read("book").should_check(
ignore_conditions=ignore_conditions))
self.assertTrue(
action_tracer.Write("out/log").should_check(
ignore_conditions=ignore_conditions))
def test_ignored_path_components_matches(self):
ignore_conditions = action_tracer.MatchConditions(
components={"__auto__", ".generated"})
self.assertFalse(
action_tracer.Read("library/__auto__/book").should_check(
ignore_conditions=ignore_conditions))
self.assertFalse(
action_tracer.Write(".generated/out/log").should_check(
ignore_conditions=ignore_conditions))
class CheckAccessAllowedTests(unittest.TestCase):
def test_allowed_read(self):
self.assertTrue(
action_tracer.Read("foo.txt").allowed(
allowed_reads={"foo.txt"}, allowed_writes={}))
def test_forbiddden_read(self):
self.assertFalse(
action_tracer.Read("bar.txt").allowed(
allowed_reads={}, allowed_writes={}))
def test_allowed_write(self):
self.assertTrue(
action_tracer.Write("foo.txt").allowed(
allowed_reads={}, allowed_writes={"foo.txt"}))
def test_forbiddden_write(self):
self.assertFalse(
action_tracer.Write("baz.txt").allowed(
allowed_reads={}, allowed_writes={}))
def test_allowed_delete(self):
self.assertTrue(
action_tracer.Delete("foo.txt").allowed(
allowed_reads={}, allowed_writes={"foo.txt"}))
def test_forbiddden_delete(self):
self.assertFalse(
action_tracer.Delete("baz.txt").allowed(
allowed_reads={}, allowed_writes={}))
class FormatAccessSetTest(unittest.TestCase):
def test_empty(self):
self.assertEqual(str(action_tracer.FSAccessSet()), "[empty accesses]")
def test_reads(self):
self.assertEqual(
str(action_tracer.FSAccessSet(reads={"c", "a", "b"})),
textwrap.dedent(
"""\
Reads:
a
b
c"""))
def test_writes(self):
self.assertEqual(
str(action_tracer.FSAccessSet(writes={"e", "f", "d"})),
textwrap.dedent(
"""\
Writes:
d
e
f"""))
def test_deletes(self):
self.assertEqual(
str(action_tracer.FSAccessSet(deletes={"r", "q", "p"})),
textwrap.dedent(
"""\
Deletes:
p
q
r"""))
def test_reads_writes(self):
files = {"c", "a", "b"}
self.assertEqual(
str(action_tracer.FSAccessSet(reads=files, writes=files)),
textwrap.dedent(
"""\
Reads:
a
b
c
Writes:
a
b
c"""))
def test_writes_deletes(self):
files = {"c", "a", "b"}
self.assertEqual(
str(action_tracer.FSAccessSet(writes=files, deletes=files)),
textwrap.dedent(
"""\
Writes:
a
b
c
Deletes:
a
b
c"""))
class FinalizeFileSystemAccessesTest(unittest.TestCase):
def test_no_accesses(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses([]),
action_tracer.FSAccessSet())
def test_reads(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Read("r1.txt"),
action_tracer.Read("r2.txt"),
]), action_tracer.FSAccessSet(reads={"r1.txt", "r2.txt"}))
def test_writes(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Write("wb.txt"),
action_tracer.Write("wa.txt"),
]), action_tracer.FSAccessSet(writes={"wa.txt", "wb.txt"}))
def test_reads_writes_no_deletes(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Read("r2.txt"),
action_tracer.Write("wb.txt"),
action_tracer.Write("wa.txt"),
action_tracer.Read("r1.txt"),
]),
action_tracer.FSAccessSet(
reads={"r1.txt", "r2.txt"}, writes={"wa.txt", "wb.txt"}))
def test_read_after_write(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Write("temp.txt"),
action_tracer.Read("temp.txt"),
]), action_tracer.FSAccessSet(reads=set(), writes={"temp.txt"}))
def test_delete(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Delete("d1.txt"),
action_tracer.Delete("d2.txt"),
]), action_tracer.FSAccessSet(deletes={"d1.txt", "d2.txt"}))
def test_delete_after_write(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Write("temp.txt"),
action_tracer.Delete("temp.txt"),
]), action_tracer.FSAccessSet())
def test_write_after_delete(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Delete("temp.txt"),
action_tracer.Write("temp.txt"),
]), action_tracer.FSAccessSet(writes={"temp.txt"}))
def test_write_read_delete(self):
self.assertEqual(
action_tracer.finalize_filesystem_accesses(
[
action_tracer.Write("temp.txt"),
action_tracer.Read("temp.txt"),
action_tracer.Delete("temp.txt"),
]), action_tracer.FSAccessSet())
class CheckAccessPermissionsTests(unittest.TestCase):
def test_no_accesses(self):
self.assertEqual(
action_tracer.check_access_permissions(
action_tracer.FSAccessSet(), action_tracer.AccessConstraints()),
action_tracer.FSAccessSet(),
)
def test_ok_read(self):
self.assertEqual(
action_tracer.check_access_permissions(
action_tracer.FSAccessSet(reads={"readable.txt"}),
action_tracer.AccessConstraints(
allowed_reads={"readable.txt"})),
action_tracer.FSAccessSet(),
)
def test_forbidden_read(self):
read = "unreadable.txt"
self.assertEqual(
action_tracer.check_access_permissions(
action_tracer.FSAccessSet(reads={read}),
action_tracer.AccessConstraints()),
action_tracer.FSAccessSet(reads={read}),
)
def test_ok_write(self):
self.assertEqual(
action_tracer.check_access_permissions(
action_tracer.FSAccessSet(writes={"writeable.txt"}),
action_tracer.AccessConstraints(
allowed_writes={"writeable.txt"})),
action_tracer.FSAccessSet(),
)
def test_forbidden_writes(self):
# make sure multiple violations accumulate
bad_writes = {
"unwriteable.txt",
"you-shall-not-pass.txt",
}
self.assertEqual(
action_tracer.check_access_permissions(
action_tracer.FSAccessSet(writes=bad_writes),
action_tracer.AccessConstraints()),
action_tracer.FSAccessSet(writes=bad_writes),
)
def test_read_from_temporary_writes_ok(self):
temp_file = "__file.tmp"
reads = {temp_file}
writes = {
"unwriteable.txt",
temp_file,
}
self.assertEqual(
action_tracer.check_access_permissions(
action_tracer.FSAccessSet(reads=reads, writes=writes),
action_tracer.AccessConstraints()),
action_tracer.FSAccessSet(reads=set(), writes=writes),
)
class CheckMissingWritesTests(unittest.TestCase):
def test_no_accesses(self):
self.assertEqual(
action_tracer.check_missing_writes([], {}),
{},
)
def test_only_reads(self):
self.assertEqual(
action_tracer.check_missing_writes(
[action_tracer.Read("newspaper.pdf")],
{},
),
{},
)
def test_excess_write(self):
self.assertEqual(
action_tracer.check_missing_writes(
[action_tracer.Write("side-effect.txt")],
{},
),
{},
)
def test_fulfilled_write(self):
self.assertEqual(
action_tracer.check_missing_writes(
[action_tracer.Write("compiled.o")],
{"compiled.o"},
),
set(),
)
def test_missing_write(self):
self.assertEqual(
action_tracer.check_missing_writes(
[],
{"write-me.out"},
),
{"write-me.out"},
)
def test_missing_and_fulfilled_write(self):
self.assertEqual(
action_tracer.check_missing_writes(
[action_tracer.Write("compiled.o")],
{
"write-me.out",
"compiled.o",
},
),
{"write-me.out"},
)
def test_written_then_deleted(self):
self.assertEqual(
action_tracer.check_missing_writes(
[
action_tracer.Write("compiled.o"),
action_tracer.Delete("compiled.o"),
],
{"compiled.o"},
),
{"compiled.o"},
)
def test_deleted_then_written(self):
self.assertEqual(
action_tracer.check_missing_writes(
[
action_tracer.Delete("compiled.o"),
action_tracer.Write("compiled.o"),
],
{"compiled.o"},
),
set(),
)
def abspaths(container: Iterable[str]) -> AbstractSet[str]:
return {os.path.abspath(f) for f in container}
class AccessConstraintsTests(unittest.TestCase):
def test_empty_action(self):
action = action_tracer.Action(inputs=["script.sh"])
self.assertEqual(
action.access_constraints(),
action_tracer.AccessConstraints(
allowed_reads=abspaths({"script.sh"})))
def test_have_inputs(self):
action = action_tracer.Action(
inputs=["script.sh", "input.txt", "main.cc"])
self.assertEqual(
action.access_constraints(),
action_tracer.AccessConstraints(
allowed_reads=abspaths({"script.sh", "input.txt", "main.cc"})))
def test_have_outputs(self):
action = action_tracer.Action(inputs=["script.sh"], outputs=["main.o"])
self.assertEqual(
action.access_constraints(),
action_tracer.AccessConstraints(
allowed_reads=abspaths({"script.sh", "main.o"}),
allowed_writes=abspaths({"main.o"}),
required_writes=abspaths({"main.o"})))
def test_have_depfile_writeable_inputs(self):
action = action_tracer.Action(inputs=["script.sh"], depfile="foo.d")
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch("builtins.open", mock.mock_open(
read_data="foo.o: foo.cc foo.h\n")) as mock_file:
constraints = action.access_constraints(
writeable_depfile_inputs=True)
mock_exists.assert_called_once()
mock_file.assert_called_once()
self.assertEqual(
constraints,
action_tracer.AccessConstraints(
allowed_reads=abspaths(
{"script.sh", "foo.d", "foo.o", "foo.cc", "foo.h"}),
allowed_writes=abspaths({"foo.d", "foo.o", "foo.cc", "foo.h"})))
def test_have_depfile_nonwritable_inputs(self):
action = action_tracer.Action(inputs=["script.sh"], depfile="foo.d")
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch("builtins.open", mock.mock_open(
read_data="foo.o: foo.cc foo.h\n")) as mock_file:
constraints = action.access_constraints(
writeable_depfile_inputs=False)
mock_exists.assert_called_once()
mock_file.assert_called_once()
self.assertEqual(
constraints,
action_tracer.AccessConstraints(
allowed_reads=abspaths(
{"script.sh", "foo.d", "foo.o", "foo.cc", "foo.h"}),
allowed_writes=abspaths({"foo.d", "foo.o"})))
def test_links_are_followed(self):
def fake_realpath(s: str) -> str:
return f'test/realpath/{s}'
action = action_tracer.Action(inputs=["script.sh"], depfile="foo.d")
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch("builtins.open", mock.mock_open(
read_data="foo.o: foo.cc foo.h\n")) as mock_file:
with mock.patch.object(os.path, 'realpath',
wraps=fake_realpath) as mock_realpath:
constraints = action.access_constraints(
writeable_depfile_inputs=False)
mock_exists.assert_called_once()
mock_file.assert_called_once()
mock_realpath.assert_called()
self.assertEqual(
constraints,
action_tracer.AccessConstraints(
allowed_reads=abspaths(
{
"test/realpath/script.sh",
"test/realpath/foo.d",
"test/realpath/foo.o",
"test/realpath/foo.cc",
"test/realpath/foo.h",
}),
allowed_writes=abspaths({"foo.d", "foo.o"})))
def test_have_nonexistent_depfile(self):
action = action_tracer.Action(depfile="foo.d")
with mock.patch.object(os.path, 'exists',
return_value=False) as mock_exists:
constraints = action.access_constraints()
mock_exists.assert_called()
self.assertEqual(
constraints,
action_tracer.AccessConstraints(
allowed_writes=abspaths({"foo.d"}),
allowed_reads=abspaths({"foo.d"})))
class DiagnoseStaleOutputsTest(unittest.TestCase):
def test_no_accesses_no_constraints(self):
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[],
access_constraints=action_tracer.AccessConstraints(),
)
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(),
)
def test_missing_write_no_inputs(self):
required_writes = {"write.me"}
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[],
access_constraints=action_tracer.AccessConstraints(
required_writes=required_writes),
)
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(
required_writes=required_writes,
nonexistent_outputs={"write.me"}),
)
def test_missing_write_with_used_input(self):
used_input = "read.me"
required_writes = {"write.me"}
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[action_tracer.Read(used_input)],
access_constraints=action_tracer.AccessConstraints(
allowed_reads={used_input},
required_writes=required_writes,
),
)
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(
required_writes=required_writes,
nonexistent_outputs={"write.me"}),
)
def test_stale_output_no_inputs(self):
required_writes = {"write.me"}
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[],
access_constraints=action_tracer.AccessConstraints(
required_writes=required_writes),
)
mock_exists.assert_called_once()
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(required_writes=required_writes),
)
def test_stale_output_with_used_input(self):
def fake_read_ctime(path: str):
if path.startswith("read"):
return 200
raise ValueError(f'Unexpected path: {path}')
def fake_write_ctime(path: str):
if path.startswith("write"):
return 100
raise ValueError(f'Unexpected path: {path}')
used_input = "read.me"
required_writes = {"write.me"}
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch.object(os.path, 'getctime',
wraps=fake_read_ctime) as mock_read_ctime:
with mock.patch.object(
action_tracer, 'realpath_ctime',
wraps=fake_write_ctime) as mock_write_ctime:
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[action_tracer.Read(used_input)],
access_constraints=action_tracer.AccessConstraints(
allowed_reads={used_input},
required_writes=required_writes),
)
mock_exists.assert_called_once()
mock_read_ctime.assert_called()
mock_write_ctime.assert_called()
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(
required_writes=required_writes,
newest_input=used_input,
stale_outputs={"write.me"}),
)
def test_stale_output_with_multiple_used_inputs(self):
def fake_read_ctime(path: str):
if path == "read.me":
return 200
if path == "read.me.newer":
return 300
raise Exception(f'fake_read_ctime for unexpected path: {path}')
def fake_write_ctime(path: str):
if path.startswith("write"):
return 250
raise Exception(f'fake_write_ctime for unexpected path: {path}')
used_input = "read.me"
# Make sure the timestamp of the newest input is used for comparison.
used_input_newer = "read.me.newer"
required_writes = {"write.me"}
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch.object(os.path, 'getctime',
wraps=fake_read_ctime) as mock_read_ctime:
with mock.patch.object(
action_tracer, 'realpath_ctime',
wraps=fake_write_ctime) as mock_write_ctime:
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[
action_tracer.Read(used_input),
action_tracer.Read(used_input_newer),
],
access_constraints=action_tracer.AccessConstraints(
allowed_reads={used_input, used_input_newer},
required_writes=required_writes),
)
mock_exists.assert_called_once()
mock_read_ctime.assert_called()
mock_write_ctime.assert_called()
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(
required_writes=required_writes,
# newer input is used for comparison
newest_input=used_input_newer,
stale_outputs={"write.me"}),
)
def test_fresh_output_with_used_input(self):
def fake_getctime(path: str):
if path.startswith("read"):
return 100
if path.startswith("write"):
return 200
return 0
used_input = "read.me"
written_output = "write.me"
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch.object(os.path, 'getctime',
wraps=fake_getctime) as mock_ctime:
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[
action_tracer.Read(used_input),
action_tracer.Write(written_output),
],
access_constraints=action_tracer.AccessConstraints(
allowed_reads={used_input},
required_writes={written_output}),
)
# There are no untouched outputs, so getctime is never called.
mock_exists.assert_not_called()
mock_ctime.assert_not_called()
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(
required_writes={written_output},
# newest_input is not evaluated
stale_outputs=set(),
),
)
def test_fresh_output_with_used_input_readable_output(self):
def fake_getctime(path: str):
if path.startswith("read"):
return 100
if path.startswith("write"):
return 200
return 0
used_input = "read.me"
written_output = "write.me"
with mock.patch.object(os.path, 'exists',
return_value=True) as mock_exists:
with mock.patch.object(os.path, 'getctime',
wraps=fake_getctime) as mock_ctime:
output_diagnostics = action_tracer.diagnose_stale_outputs(
accesses=[
action_tracer.Read(used_input),
action_tracer.Read(written_output),
action_tracer.Write(written_output),
],
access_constraints=action_tracer.AccessConstraints(
allowed_reads={used_input, written_output},
required_writes={written_output}),
)
# There are no untouched outputs, so getctime is never called.
mock_exists.assert_not_called()
mock_ctime.assert_not_called()
self.assertEqual(
output_diagnostics,
action_tracer.StalenessDiagnostics(
required_writes={written_output},
# newest_input is not evaluated
stale_outputs=set(),
),
)
class AllParentDirsTests(unittest.TestCase):
def test_empty_path(self):
dirs = action_tracer.all_parent_dirs("")
self.assertEqual(dirs, set())
def test_empty_path_slash(self):
dirs = action_tracer.all_parent_dirs("/")
self.assertEqual(dirs, set())
def test_one_path(self):
dirs = action_tracer.all_parent_dirs("foo")
self.assertEqual(dirs, set())
def test_one_path_absolute(self):
dirs = action_tracer.all_parent_dirs("/foo")
self.assertEqual(dirs, set())
def test_subdir(self):
dirs = action_tracer.all_parent_dirs("parent/child")
self.assertEqual(dirs, {"parent"})
def test_subdir_absolute(self):
dirs = action_tracer.all_parent_dirs("/parent/child")
self.assertEqual(dirs, {"/parent"})
def test_subsubdir(self):
dirs = action_tracer.all_parent_dirs("matron/parent/child")
self.assertEqual(dirs, {"matron", "matron/parent"})
def test_subsubdir_absolute(self):
dirs = action_tracer.all_parent_dirs("/boss/parent/child")
self.assertEqual(dirs, {"/boss", "/boss/parent"})
class DetectAllDirs(unittest.TestCase):
def test_empty(self):
dirs = action_tracer.detect_all_dirs([])
self.assertEqual(dirs, set())
def test_one(self):
dirs = action_tracer.detect_all_dirs(["/x/y/z"])
self.assertEqual(dirs, {"/x", "/x/y"})
def test_different_parents(self):
dirs = action_tracer.detect_all_dirs(["/x/y/z", "/a/b/c"])
self.assertEqual(dirs, {"/x", "/x/y", "/a", "/a/b"})
def test_common_parent(self):
dirs = action_tracer.detect_all_dirs(["/x/w/z", "/x/w/c"])
self.assertEqual(dirs, {"/x", "/x/w"})
def test_common_grandparent(self):
dirs = action_tracer.detect_all_dirs(["/x/y/z", "/x/w/c"])
self.assertEqual(dirs, {"/x", "/x/y", "/x/w"})
class MainArgParserTests(unittest.TestCase):
# These args are required, and there's nothing interesting about them to test.
required_args = "--trace-output t.out --label //pkg:tgt "
def test_only_required_args(self):
parser = action_tracer.main_arg_parser()
args = parser.parse_args(self.required_args.split())
self.assertEqual(args.trace_output, "t.out")
self.assertEqual(args.label, "//pkg:tgt")
# Make sure some checks are enabled by default
self.assertTrue(args.check_access_permissions)
def test_check_access_permissions(self):
parser = action_tracer.main_arg_parser()
args = parser.parse_args(
(self.required_args + "--check-access-permissions").split())
self.assertTrue(args.check_access_permissions)
def test_no_check_access_permissions(self):
parser = action_tracer.main_arg_parser()
args = parser.parse_args(
(self.required_args + "--no-check-access-permissions").split())
self.assertFalse(args.check_access_permissions)
if __name__ == '__main__':
unittest.main()