blob: bc2f9b2fa6d953a855051cd6aae774efe0307f58 [file]
#!/usr/bin/env python3
# allow-non-vendored-python
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Syncs changes between Cog <=> CartFS checkouts."""
import argparse
import concurrent.futures
import enum
import hashlib
import json
import logging
import os
import re
import shlex
import shutil
import sys
import textwrap
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable
import cartfs
import logger
import preflight
import workspace
class SyncError(workspace.WorkspaceError):
"""Error raised during sync operations."""
class WorkspaceType(enum.Enum):
COG = "cog"
CARTFS = "cartfs"
@dataclass
class SyncResult:
added: set[str] = field(default_factory=set)
modified: set[str] = field(default_factory=set)
deleted: set[str] = field(default_factory=set)
noop: set[str] = field(default_factory=set)
failed: set[str] = field(default_factory=set)
class WorkspaceSyncService:
"""Syncs changes between Cog <=> CartFS checkouts."""
def __init__(self) -> None:
self.workspace = workspace.Workspace()
self.cog_root = self.workspace.workspace_root
logger.log_info(f"Cog root: {self.cog_root}")
if self.workspace.has_cartfs_dir:
logger.log_info(f"CartFS root: {self.cartfs_root}")
else:
logger.log_info("CartFS checkout is not initialized.")
@property
def cartfs_root(self) -> Path:
return self.workspace.cartfs_dir
@property
def _config(self) -> dict[str, Any]:
return self.workspace.config
def _git_citc(self, *command: str, cwd: Path | None = None) -> str:
return self.workspace._run(
["git-citc"] + list(command),
cwd=cwd or self.cog_root,
capture_output=True,
)
@property
def _affected_cog_repos(self) -> set[str]:
return set(
repo
for repo in self._git_citc("api.get-modified-repos")
.strip()
.split("\n")
if repo and repo not in self._config["repo"]["ignored"]
)
def _get_cog_commit(self, repo: str) -> str:
"""Returns the commit hash of a Cog repository."""
stdout = self._git_citc("api.get-repo-states", repo)
matches = re.findall(r"base_commit_hash:\s*\"?([^\s\"]+)\"?", stdout)
if len(matches) != 1:
raise workspace.WorkspaceError(
f"Expected 1 match for base_commit_hash, got {len(matches)}:\n{stdout}"
)
return matches[0]
@property
def _cog_transfer_file_hashes(self) -> dict[str, str | None]:
file = self.cartfs_root / "cog_transfer_file_hashes.json"
if not file.exists():
return {}
try:
return json.loads(file.read_text())
except (json.JSONDecodeError, OSError) as e:
logger.log_error(f"Failed to read and parse {file}: {e}")
return {}
def affected_files(self, workspace_type: WorkspaceType) -> set[str]:
"""Returns the set of files different from the base commit in the given workspace."""
if workspace_type == WorkspaceType.COG:
def _get_affected_files(repo: str) -> set[str]:
base_commit = self._get_cog_commit(repo)
stdout_lines = self._git_citc(
"cli.diff",
"--patch=false",
base_commit,
"@",
cwd=self.cog_root / repo,
).split("\n")
return {
f'{repo}/{line.split(" ", 1)[1]}'
for line in stdout_lines
if re.match(r"^\[.\] .+$", line)
}
affected_cog_repos = self._affected_cog_repos
if not affected_cog_repos:
return set()
with concurrent.futures.ThreadPoolExecutor() as executor:
return set.union(
*executor.map(_get_affected_files, affected_cog_repos)
)
# This technically isn't correct since we ideally want to track the files between
# COG_BASE..CARTFS_HEAD, but instead this tracks COG_BASE..COG_HEAD from the last
# Cog => CartFS sync, which is reflected in `cog_transfer_file_hashes.json`.
#
# Tangibly, this means that if `fx ...` subcommands modify files that aren't captured in
# COG_BASE..COG_HEAD, then those files won't be synced back from CartFS => Cog.
#
# TODO(https://fxbug.dev/501540419): We should consider using `git` porcelain commands to
# get the set of files changed between CARTFS_BASE..CARTFS_HEAD, as CARTFS_BASE is
# approximately COG_HEAD. However, this will introduce extra complexity since:
# 1. Currently, Cog-relative file paths are the source of truth and we map these to CartFS
# via `_cartfs_path`. Querying `git` from CartFS means we'll need some way to map
# CartFS-relative file paths to Cog-relative file paths.
# 2. We'll need to account for git submodules.
if workspace_type == WorkspaceType.CARTFS:
return set(self._cog_transfer_file_hashes.keys())
raise workspace.WorkspaceError(
f"Unsupported workspace type: {workspace_type}"
)
def _md5hash(self, path: Path) -> str | None:
try:
if path.is_symlink():
return hashlib.md5(os.readlink(path).encode()).hexdigest()
if not path.is_file():
return None
with open(path, "rb") as f:
file_hash = hashlib.md5()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
except OSError as e:
logger.log_error(f"Failed to calculate hash for {path}: {e}")
return None
def _cog_path(self, path: str) -> Path:
return self.cog_root / path
def cartfs_path(self, path: str) -> Path:
# Map Cog fuchsia dir to CartFS fuchsia dir.
assert self._config["repo"]["fuchsia"]
fuchsia_prefix = f'{self._config["repo"]["fuchsia"]}/'
if path.startswith(fuchsia_prefix):
path = path[len(fuchsia_prefix) :].lstrip("/")
return self.cartfs_root / "fuchsia" / path
# Map Cog integration dir (if it exists) to CartFS integration dir.
integration_prefix = (
self._config["repo"]["integration"]
and f'{self._config["repo"]["integration"]}/'
)
if integration_prefix and path.startswith(integration_prefix):
path = path[len(integration_prefix) :].lstrip("/")
return self.cartfs_root / "integration" / path
# Strip the repo-configured prefix and nest within the configured destination subdirectory.
strip_prefix = self._config["repo"]["stripSrcPrefix"]
if not path.startswith(strip_prefix):
raise SyncError(
f"Expected path {path} to start with strip prefix '{strip_prefix}'"
)
return (
self.cartfs_root
/ self._config["repo"]["destSubdir"]
/ path[len(strip_prefix) :].lstrip("/")
)
def ensure_cartfs_cwd(self) -> Path:
assert self._config["repo"]["fuchsia"]
# Don't resolve the CWD since it might be a symlinked directory
# (e.g. `//vendor/company` => `//../vendor/company`) for superprojects.
cog_cwd = Path(os.environ.get("PWD") or Path.cwd())
# Only map CWDs in the Cog fuchsia dir to the CartFS fuchsia dir.
cog_fuchsia_dir = self.cog_root / self._config["repo"]["fuchsia"]
if not cog_cwd.is_relative_to(cog_fuchsia_dir):
logger.log_warn(
f'Please cd into "{os.path.relpath(cog_fuchsia_dir, cog_cwd)}" before running fx.'
)
return self.cartfs_root / "fuchsia"
# Don't resolve the CWD since it might be a symlinked directory
# (e.g. `//integration` => `//../integration`) for superprojects.
cartfs_cwd = (
self.cartfs_root / "fuchsia" / cog_cwd.relative_to(cog_fuchsia_dir)
)
# New empty directories aren't reported by `git-citc cli.diff`, so account for the rare edge
# case where the user's CWD is in a new empty subdir of the Cog fuchsia checkout.
if not cartfs_cwd.is_symlink() and not cartfs_cwd.exists():
cartfs_cwd.mkdir(parents=True, exist_ok=True)
return cartfs_cwd
def sync_batch(
self,
src_func: Callable[[str], Path],
dest_func: Callable[[str], Path],
paths: set[str],
hash_func: Callable[[Path], str | None],
) -> SyncResult:
result = SyncResult()
for path in paths:
try:
src = src_func(path)
dest = dest_func(path)
except Exception as e:
raise SyncError(f"Failed to resolve path '{path}'") from e
logger.log_debug(f"Syncing path '{path}': {src} -> {dest}")
# `git citc cli.diff` only reports files, not directories.
if (src.is_dir() and not src.is_symlink()) or (
dest.is_dir() and not dest.is_symlink()
):
result.failed.add(path)
continue
# Handle deleted files.
if not src.exists() and not src.is_symlink():
if dest.exists() or dest.is_symlink():
# Attempt to delete the file. Note that deleting files from a Cog destination
# currently fails and will be caught below.
# We don't expect sync file deletions initiated from CartFS often.
try:
dest.unlink()
result.deleted.add(path)
except OSError as e:
logger.log_error(
f"Failed to apply deletion of '{dest.name}': {e}"
)
result.failed.add(path)
else:
result.noop.add(path)
continue
# Handle added and modified files.
# Only write to CartFS destinations if the file contents have changed to avoid updating
# the file's `mtime` which unnecessarily invalidates incremental rebuilds.
if hash_func(src) == hash_func(dest):
result.noop.add(path)
continue
try:
dest.parent.mkdir(parents=True, exist_ok=True)
result_field = (
result.modified if dest.exists() else result.added
)
if dest.is_symlink() or (src.is_symlink() and dest.exists()):
dest.unlink()
shutil.copy2(src, dest, follow_symlinks=False)
result_field.add(path)
except OSError as e:
logger.log_error(f"Failed to sync '{dest.name}': {e}")
result.failed.add(path)
return result
@workspace.lock
def sync_cog_to_cartfs(self) -> SyncResult:
"""Syncs changes from Cog to CartFS."""
if not self.workspace.has_cartfs_dir:
with logger.set_level(min(logger.get_log_level(), logging.INFO)):
logger.log_info(
"CartFS workspace is not initialized yet, initializing now..."
)
self.workspace.init_cartfs_workspace()
logger.log_info("CartFS workspace initialized.")
if not self.workspace.is_checkout_uptodate():
with logger.set_level(min(logger.get_log_level(), logging.INFO)):
logger.log_info(
"CartFS checkout is not up to date, updating CartFS checkout..."
)
self.workspace.checkout_cartfs_to_cog_revisions()
logger.log_info(
"CartFS checkout updated, resuming execution..."
)
cog_affected_files = self.affected_files(WorkspaceType.COG)
cartfs_affected_files = self.affected_files(WorkspaceType.CARTFS)
all_affected_files = cog_affected_files | cartfs_affected_files
sync_result = self.sync_batch(
self._cog_path, self.cartfs_path, all_affected_files, self._md5hash
)
# Keep track of the checkout files in CartFS that differ from the Cog base.
# This helps apply any reverted Cog file modifications that are omitted by
# `git citc cli.diff` in the next Cog => CartFS sync.
#
# Note: This approach is less accurate than using `git` since it cannot account for `fx`
# commands that modify CartFS checkout files that aren't part of the Cog changes.
# However, it is faster and significantly less complex when accounting for
# reversing Cog => CartFS file path transformations and git submodules.
(self.cartfs_root / "cog_transfer_file_hashes.json").write_text(
json.dumps(
{
path: self._md5hash(self.cartfs_path(path))
for path in cog_affected_files | sync_result.failed
}
)
)
return sync_result
@workspace.lock
def sync_cartfs_to_cog(
self, diff_against_previous_cog_to_cartfs_sync: bool
) -> SyncResult:
"""Syncs changes from CartFS to Cog."""
# Coherence check: We should not be running `sync_workspace.py --from-cartfs-to-cog` with an
# uninitialized CartFS workspace.
if not self.workspace.has_cartfs_dir:
raise workspace.WorkspaceError(
"No associated CartFS workspace found. "
"Please initiate a Sync in the opposite direction "
"or run `//scripts/cog/setup_cog_workspace.py` first."
)
if diff_against_previous_cog_to_cartfs_sync:
previous_transfer_file_hashes = {
self._cog_path(path): hash_val
for path, hash_val in self._cog_transfer_file_hashes.items()
}
def _hash_func(path: Path) -> str | None:
actual_hash = self._md5hash(path)
# Only previously synced Cog paths will have their hashes overridden. CartFS paths
# will purposely miss the dictionary lookup and return their actual_hash.
try_cache_hash = previous_transfer_file_hashes.get(
path, actual_hash
)
if actual_hash != try_cache_hash:
logger.log_info(
f"Detected Cider edits to '{path.name}'. Treating its base hash as "
f"'{try_cache_hash}' to prevent accidental overwrite."
)
return try_cache_hash
else:
_hash_func = self._md5hash
return self.sync_batch(
self.cartfs_path,
self._cog_path,
self.affected_files(WorkspaceType.CARTFS),
_hash_func,
)
class SyncDirection(enum.Enum):
FROM_COG_TO_CARTFS = "from cog to cartfs"
FROM_CARTFS_TO_COG = "from cartfs to cog"
def _parse_args() -> argparse.Namespace:
"""Parses command-line arguments."""
parser = argparse.ArgumentParser(
description="Syncs changes between Cog and CartFS checkouts."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--from-cog-to-cartfs",
action="store_const",
const=SyncDirection.FROM_COG_TO_CARTFS,
dest="sync_direction",
help="Sync source file changes from Cog to CartFS.",
)
group.add_argument(
"--from-cartfs-to-cog",
action="store_const",
const=SyncDirection.FROM_CARTFS_TO_COG,
dest="sync_direction",
help="Sync source file changes from CartFS to Cog.",
)
parser.add_argument(
"--unsafe-overwrite-cog-changes-since-last-sync",
action="store_true",
help=(
"By default, when syncing from CartFS to Cog, we compare CartFS file changes against "
"the snapshot of Cog file hashes generated from the last Cog => CartFS sync instead of "
"the current Cog file contents. This helps us avoid destroying file edits made in "
"Cider during a build or test, etc.\n"
"Setting this flag will force the sync to compare CartFS file changes against the "
"current Cog file contents instead.\n"
"Only compatible with syncing `--from-cartfs-to-cog`."
),
)
parser.add_argument(
"--report",
type=Path,
help="Path to write a bash sourceable report of the sync operation results.",
)
logger.add_args(parser, default_log_level=logging.WARNING)
args = parser.parse_args()
if (
args.unsafe_overwrite_cog_changes_since_last_sync
and args.sync_direction != SyncDirection.FROM_CARTFS_TO_COG
):
parser.error(
"`--unsafe-overwrite-cog-changes-since-last-sync` is only compatible with "
"`--from-cartfs-to-cog`."
)
return args
def _main() -> None:
args = _parse_args()
if not args.color:
os.environ["NO_COLOR"] = "1"
logger.init_logger(
log_level=args.log_level,
colors=args.color,
enable_status_updates=args.enable_status_updates,
)
sync_service = WorkspaceSyncService()
if args.sync_direction == SyncDirection.FROM_COG_TO_CARTFS:
logger.log_info("Syncing changes from Cog to CartFS...")
sync_result = sync_service.sync_cog_to_cartfs()
if sync_result.failed:
logger.log_warn(
f"Failed to sync {len(sync_result.failed)} files from Cog to CartFS:\n"
f"{textwrap.indent(chr(10).join(sync_result.failed), ' ')}\n"
"Some source file changes made in Cider won't be made effective for this "
"`fx`/`ffx` command invocation!"
)
else:
logger.log_info("Syncing changes from CartFS to Cog...")
sync_result = sync_service.sync_cartfs_to_cog(
not args.unsafe_overwrite_cog_changes_since_last_sync
)
if sync_result.failed:
logger.log_warn(
f"Failed to sync {len(sync_result.failed)} files from CartFS to Cog:\n"
f"{textwrap.indent(chr(10).join(sync_result.failed), ' ')}\n"
"Some source file changes made by `fx`/`ffx` tooling won't be reflected back "
"to Cider!"
)
logger.log_info(
"Sync complete.\n"
f"Added files ({len(sync_result.added)}): {sorted(sync_result.added)}\n"
f"Modified files ({len(sync_result.modified)}): {sorted(sync_result.modified)}\n"
f"Deleted files ({len(sync_result.deleted)}): {sorted(sync_result.deleted)}\n"
f"No-op files ({len(sync_result.noop)}): {sorted(sync_result.noop)}\n"
f"Failed files ({len(sync_result.failed)}): {sorted(sync_result.failed)}"
)
if args.report:
args.report.write_text(
f"CARTFS_CWD={shlex.quote(str(sync_service.ensure_cartfs_cwd()))}\n"
f"CARTFS_FUCHSIA_DIR={shlex.quote(str(sync_service.cartfs_root / 'fuchsia'))}\n"
)
def main() -> int:
"""Main function to sync the Cog <=> CartFS workspaces."""
try:
_main()
return 0
except (workspace.NotInCogWorkspaceError, cartfs.CartfsNotRunningError):
# Let `preflight.check_all()` surface a better error message.
pass
except Exception:
logger.log_exception("An unexpected error occurred:")
except KeyboardInterrupt:
logger.log_error("Sync cancelled by user (KeyboardInterrupt).")
return 130
# Run preflight checks to diagnose errors and provide helpful error messages.
#
# Since this script is run twice per `fx`/`ffx` user invocation, opportunistically assume the
# user has a working environment and run preflight checks otherwise to reduce this script's
# overhead.
with logger.set_level(min(logger.get_log_level(), logging.INFO)):
logger.log_info("Running environment diagnostics...")
if preflight.check_all():
logger.log_warn("No obvious environmental issues found.")
maybe_add_env_var = (
"rerun the command with FUCHSIA_COG_DEBUG=1 and "
if logger.get_log_level() > logging.DEBUG
else ""
)
logger.log_warn(
f"To file a bug, please {maybe_add_env_var}upload command output to "
"http://go/fuchsia-cog-bug"
)
return 1
if __name__ == "__main__":
sys.exit(main())