blob: afbca872465918e03210c06604c230cf37f9588b [file] [log] [blame] [edit]
# Copyright 2025 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, call, patch
from git import (
ArgsCollection,
Context,
LsFilesCommand,
RevParseCommand,
StatusCommand,
get_repo_root_for_repo,
get_submodule_paths,
get_target_repository_at_path,
verify_repository_root_is_cog,
)
class TestRevParseCommand(unittest.TestCase):
def setUp(self) -> None:
self.mock_context = MagicMock(spec=Context)
self.mock_context.args = MagicMock()
self.command = RevParseCommand()
def test_show_toplevel(self) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.args.remaining_args = ["--show-toplevel"]
# We need to mock get_relative_path because execute calls it
self.mock_context.get_relative_path.return_value = ""
self.assertEqual(self.command.execute(self.mock_context), 0)
self.mock_context.print.assert_called_with(str(Path("/repo")))
@patch("git.get_target_repository_at_path")
def test_show_toplevel_in_submodule(
self, mock_get_target_repo: MagicMock
) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.args.remaining_args = ["--show-toplevel"]
# We are deep inside a submodule
self.mock_context.get_relative_path.return_value = "sub/module/dir"
# The target repo detection logic says we are in 'sub/module'
mock_get_target_repo.return_value = "sub/module"
self.assertEqual(self.command.execute(self.mock_context), 0)
self.mock_context.print.assert_called_with(
str(Path("/repo/sub/module"))
)
@patch("git.get_workspace_id_and_snapshot_version")
def test_head_revision(self, mock_get_workspace_info: MagicMock) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.get_relative_path.return_value = ""
self.mock_context.args.remaining_args = ["HEAD"]
mock_get_workspace_info.return_value = ("workspace-id", 123)
# Mock run_real_git response
self.mock_context.run_real_git.return_value = (
'result {\n commit_hash: "abcdef123456"\n}'
)
self.assertEqual(self.command.execute(self.mock_context), 0)
self.mock_context.print.assert_called_with("abcdef123456")
@patch("git.get_workspace_id_and_snapshot_version")
def test_mixed_args_order(self, mock_get_workspace_info: MagicMock) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.get_relative_path.return_value = ""
mock_get_workspace_info.return_value = ("workspace-id", 123)
self.mock_context.run_real_git.return_value = (
'result {\n commit_hash: "abcdef123456"\n}'
)
# Test order: --show-toplevel, HEAD
self.mock_context.args.remaining_args = ["--show-toplevel", "HEAD"]
self.assertEqual(self.command.execute(self.mock_context), 0)
self.mock_context.print.assert_has_calls(
[call(str(Path("/repo"))), call("abcdef123456")]
)
self.mock_context.print.reset_mock()
# Test order: HEAD, --show-toplevel
self.mock_context.args.remaining_args = ["HEAD", "--show-toplevel"]
self.assertEqual(self.command.execute(self.mock_context), 0)
self.mock_context.print.assert_has_calls(
[call("abcdef123456"), call(str(Path("/repo")))]
)
def test_unsupported_revision(self) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.get_relative_path.return_value = ""
self.mock_context.args.remaining_args = ["main"]
self.assertEqual(self.command.execute(self.mock_context), 1)
self.mock_context.error.assert_called_with(
"cog workspaces only support 'HEAD' revisions at this time"
)
@patch("git.get_workspace_id_and_snapshot_version")
def test_head_revision_not_found(
self, mock_get_workspace_info: MagicMock
) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.get_relative_path.return_value = ""
self.mock_context.args.remaining_args = ["HEAD"]
mock_get_workspace_info.return_value = ("workspace-id", 123)
# Mock run_real_git response with no hash
self.mock_context.run_real_git.return_value = (
'result {\n some_other_field: "value"\n}'
)
# Expect return code 1
self.assertEqual(self.command.execute(self.mock_context), 1)
# Expect an error message
self.mock_context.error.assert_called()
@patch("git.get_workspace_id_and_snapshot_version")
def test_cannot_determine_workspace_id(
self, mock_get_workspace_info: MagicMock
) -> None:
self.mock_context.get_repository_root.return_value = Path("/repo")
self.mock_context.get_relative_path.return_value = ""
self.mock_context.args.remaining_args = ["HEAD"]
mock_get_workspace_info.return_value = ("", 0)
self.assertEqual(self.command.execute(self.mock_context), 1)
self.mock_context.error.assert_called_with(
"Cannot determine workspace id or snapshot version"
)
class TestStatusCommand(unittest.TestCase):
def setUp(self) -> None:
self.mock_context = MagicMock(spec=Context)
self.mock_context.args = MagicMock()
self.mock_context.args.global_git_args = []
self.mock_context.git_subcommand_args = MagicMock()
self.command = StatusCommand()
def test_execution(self) -> None:
self.assertEqual(self.command.execute(self.mock_context), 0)
self.mock_context.print.assert_called_with("not implemented yet")
class TestLsFilesCommand(unittest.TestCase):
def setUp(self) -> None:
self.mock_context = MagicMock(spec=Context)
self.mock_context.args = MagicMock()
self.mock_context.args.global_git_args = []
self.mock_context.args.remaining_args = []
self.mock_context.invoker_cwd = "/cwd"
self.command = LsFilesCommand()
def test_basic_execution(self) -> None:
self.command.run(self.mock_context)
self.mock_context.run_real_git.assert_called_with(
["ls-files"], cwd="/cwd"
)
def test_with_arguments(self) -> None:
self.mock_context.args.remaining_args = ["-z", "-c", "file.txt"]
self.command.run(self.mock_context)
self.mock_context.run_real_git.assert_called_with(
["ls-files", "-z", "-c", "file.txt"], cwd="/cwd"
)
def test_with_global_args(self) -> None:
self.mock_context.args.global_git_args = ["-C", "/foo"]
self.command.run(self.mock_context)
self.mock_context.run_real_git.assert_called_with(
["-C", "/foo", "ls-files"], cwd="/cwd"
)
def test_output_formatting_z(self) -> None:
self.mock_context.args.remaining_args = ["-z"]
self.mock_context.run_real_git.return_value = "file1\0file2\0"
self.command.run(self.mock_context)
self.mock_context.output.assert_called_with("file1\0file2\0", end="\0")
def test_output_formatting_newline(self) -> None:
self.mock_context.args.remaining_args = []
self.mock_context.run_real_git.return_value = "file1\nfile2\n"
self.command.run(self.mock_context)
self.mock_context.output.assert_called_with("file1\nfile2\n", end="\n")
class TestGetRelativePathFromArgs(unittest.TestCase):
def test_c_and_git_dir_raises(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/cwd",
"--repository-root",
"/repo",
"--",
"-C",
"foo",
"--git-dir",
"bar",
"status",
]
context = Context(ArgsCollection(args))
with self.assertRaises(ValueError):
context.get_relative_path()
def test_c_absolute(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/cwd",
"--repository-root",
"/repo",
"--",
"-C",
"/repo/foo",
"status",
]
context = Context(ArgsCollection(args))
self.assertEqual(context.get_relative_path(), "foo")
def test_c_relative(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/cwd",
"--repository-root",
"/cwd",
"--",
"-C",
"foo",
"status",
]
context = Context(ArgsCollection(args))
self.assertEqual(context.get_relative_path(), "foo")
def test_git_dir_absolute(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/cwd",
"--repository-root",
"/repo",
"--",
"--git-dir",
"/repo/foo/.git",
"status",
]
context = Context(ArgsCollection(args))
self.assertEqual(context.get_relative_path(), "foo")
def test_git_dir_relative(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/cwd",
"--repository-root",
"/cwd",
"--",
"--git-dir",
"foo/.git",
"status",
]
context = Context(ArgsCollection(args))
self.assertEqual(context.get_relative_path(), "foo")
def test_git_dir_no_dot_git(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/cwd",
"--repository-root",
"/repo",
"--",
"--git-dir",
"/repo/foo",
"status",
]
context = Context(ArgsCollection(args))
with self.assertRaises(ValueError):
context.get_relative_path()
def test_invoker_cwd(self) -> None:
args = [
"--real-git",
"git",
"--invoker-cwd",
"/repo/foo",
"--repository-root",
"/repo",
"--",
"status",
]
context = Context(ArgsCollection(args))
self.assertEqual(context.get_relative_path(), "foo")
class TestGetRepoRootForRepo(unittest.TestCase):
def test_empty_repo(self) -> None:
self.assertEqual(get_repo_root_for_repo(""), "fuchsia")
def test_submodule_repo(self) -> None:
self.assertEqual(get_repo_root_for_repo("foo/bar"), "fuchsia/foo/bar")
class TestGetSubmodulePaths(unittest.TestCase):
def test_no_gitmodules(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
self.assertEqual(get_submodule_paths(tmp_dir), [])
def test_with_nested_submodules(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
# Top-level .gitmodules
gitmodules = tmp_dir / ".gitmodules"
gitmodules.write_text('[submodule "foo"]\n\tpath = foo\n')
# Nested submodule .gitmodules
foo_dir = tmp_dir / "foo"
foo_dir.mkdir()
foo_gitmodules = foo_dir / ".gitmodules"
foo_gitmodules.write_text('[submodule "bar"]\n\tpath = bar\n')
self.assertEqual(get_submodule_paths(tmp_dir), ["foo"])
self.assertEqual(get_submodule_paths(foo_dir), ["bar"])
def test_with_submodules(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
gitmodules = tmp_dir / ".gitmodules"
gitmodules.write_text(
'[submodule "submodule1"]\n\tpath = path/to/sub1\n'
'[submodule "submodule2"]\n\tpath = path/to/sub2\n'
)
self.assertEqual(
get_submodule_paths(tmp_dir), ["path/to/sub1", "path/to/sub2"]
)
def test_malformed_gitmodules(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
gitmodules = tmp_dir / ".gitmodules"
gitmodules.write_text("not a valid ini file")
self.assertEqual(get_submodule_paths(tmp_dir), [])
class TestGetTargetRepositoryAtPath(unittest.TestCase):
@patch("git.get_submodule_paths")
def test_default_repo_root(
self, mock_get_submodule_paths: MagicMock
) -> None:
mock_get_submodule_paths.return_value = []
repository_root = Path("/foo/fuchsia")
self.assertEqual(get_target_repository_at_path("", repository_root), "")
@patch("git.get_submodule_paths")
def test_default_repo_root_with_subdirectory(
self, mock_get_submodule_paths: MagicMock
) -> None:
mock_get_submodule_paths.return_value = []
repository_root = Path("/foo/fuchsia")
self.assertEqual(
get_target_repository_at_path("bar", repository_root), ""
)
@patch("git.get_submodule_paths")
def test_repo_root_with_valid_submodule(
self, mock_get_submodule_paths: MagicMock
) -> None:
mock_get_submodule_paths.return_value = ["subdir"]
repository_root = Path("/foo/fuchsia")
self.assertEqual(
get_target_repository_at_path("subdir/bar", repository_root),
"subdir",
)
@patch("git.get_submodule_paths")
def test_repo_root_with_invalid_submodule(
self, mock_get_submodule_paths: MagicMock
) -> None:
mock_get_submodule_paths.return_value = ["otherdir"]
repository_root = Path("/foo/fuchsia")
self.assertEqual(
get_target_repository_at_path("subdir", repository_root), ""
)
def test_repo_root_with_nested_submodules(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
repository_root = Path(tmp_dir_str)
(repository_root / ".gitmodules").write_text(
'[submodule "with_submodule"]\n\tpath = with_submodule\n'
'[submodule "without_submodule"]\n\tpath = without_submodule\n'
)
(repository_root / "with_submodule").mkdir()
(repository_root / "with_submodule" / ".gitmodules").write_text(
'[submodule "submodule"]\n\tpath = submodule\n'
)
(repository_root / "with_submodule" / "submodule").mkdir()
self.assertEqual(
get_target_repository_at_path(
"with_submodule/submodule/baz", repository_root
),
"with_submodule/submodule",
)
def test_repo_root_with_deeply_nested_submodules(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
repository_root = Path(tmp_dir_str)
# Level 1
(repository_root / ".gitmodules").write_text(
'[submodule "level1"]\n\tpath = level1\n'
)
(repository_root / "level1").mkdir()
# Level 2
(repository_root / "level1" / ".gitmodules").write_text(
'[submodule "level2"]\n\tpath = level2\n'
)
(repository_root / "level1" / "level2").mkdir()
# Level 3
(repository_root / "level1" / "level2" / ".gitmodules").write_text(
'[submodule "level3"]\n\tpath = level3\n'
)
(repository_root / "level1" / "level2" / "level3").mkdir()
self.assertEqual(
get_target_repository_at_path(
"level1/level2/level3/foo", repository_root
),
"level1/level2/level3",
)
class TestVerifyRepositoryRootIsCog(unittest.TestCase):
def test_is_cog(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
repo_root = tmp_dir / "fuchsia"
repo_root.mkdir()
citc_dir = tmp_dir / ".citc"
citc_dir.mkdir()
self.assertTrue(verify_repository_root_is_cog(repo_root))
def test_is_not_cog(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
repo_root = tmp_dir / "fuchsia"
repo_root.mkdir()
self.assertFalse(verify_repository_root_is_cog(repo_root))
class TestArgsCollection(unittest.TestCase):
def test_parse_simple_command(self) -> None:
args = [
"--real-git",
"/usr/bin/git",
"--invoker-cwd",
"/tmp",
"--",
"status",
]
collection = ArgsCollection(args)
self.assertEqual(
collection.polyfill_args,
["--real-git", "/usr/bin/git", "--invoker-cwd", "/tmp"],
)
self.assertEqual(collection.global_git_args, [])
self.assertEqual(collection.command_name, "status")
self.assertEqual(collection.remaining_args, [])
def test_parse_command_with_args(self) -> None:
args = [
"--real-git",
"/usr/bin/git",
"--invoker-cwd",
"/tmp",
"--",
"ls-files",
"-z",
]
collection = ArgsCollection(args)
self.assertEqual(collection.command_name, "ls-files")
self.assertEqual(collection.remaining_args, ["-z"])
def test_parse_with_global_args(self) -> None:
args = [
"--real-git",
"/usr/bin/git",
"--invoker-cwd",
"/tmp",
"--",
"-C",
"/foo",
"status",
]
collection = ArgsCollection(args)
self.assertEqual(collection.global_git_args, ["-C", "/foo"])
self.assertEqual(collection.command_name, "status")
def test_raises_error_without_separator(self) -> None:
args = ["status"]
with self.assertRaises(ValueError):
ArgsCollection(args)
def test_raises_error_without_command(self) -> None:
args = ["--real-git", "/usr/bin/git", "--invoker-cwd", "/tmp", "--"]
with self.assertRaises(ValueError):
ArgsCollection(args)
if __name__ == "__main__":
unittest.main()