blob: ad043bf1c297fe867dea98089a8c7cecb7425140 [file] [edit]
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import asyncio
import sys
from typing import Final
from cli.commands import (
attach,
continue_cmd,
detach,
get_state,
pause,
schema,
stack_trace,
start,
stop,
threads,
variables,
wait_for_event,
)
from cli.commands.base import BaseCommand
from daemon_manager.manager import UDS_PATH
from pydantic import ValidationError
from shared.protocol import (
BaseRequest,
deserialize_request,
make_request,
serialize,
)
# The maximum number of seconds that we will wait for the daemon to start.
# In particular, this is how long we will wait for the daemon to write into
# the pipe FD that we pass to it when we start the new process.
DAEMON_STARTUP_TIMEOUT_SECS: Final[float] = 10.0
def request_to_namespace(req: BaseRequest) -> argparse.Namespace:
"""Converts a BaseRequest to argparse.Namespace for command executors."""
return argparse.Namespace(**req.model_dump())
async def main(args: list[str]) -> int:
parser = argparse.ArgumentParser(description="fx debug cli")
parser.add_argument("--json", help="JSON request string")
parser.add_argument(
"--ack-seq",
type=int,
help="Acknowledge events up to this sequence number",
)
subparsers = parser.add_subparsers(dest="command", required=False)
# Statically register commands.
commands: dict[str, type[BaseCommand]] = {}
command_classes = [
attach.Command,
continue_cmd.Command,
detach.Command,
get_state.Command,
pause.Command,
schema.Command,
stack_trace.Command,
start.Command,
stop.Command,
threads.Command,
variables.Command,
wait_for_event.Command,
]
for cmd_class in command_classes:
existing_choices = set(subparsers.choices.keys())
cmd_class.register_cli(subparsers)
new_choices = set(subparsers.choices.keys()) - existing_choices
for choice in new_choices:
commands[choice] = cmd_class
parsed_args = parser.parse_args(args)
if parsed_args.json and parsed_args.command:
print(
"Error: --json and command are mutually exclusive", file=sys.stderr
)
return 1
if not parsed_args.json and not parsed_args.command:
print(
"Error: Either --json or a command must be provided",
file=sys.stderr,
)
return 1
req: BaseRequest | None = None
if parsed_args.json:
try:
req = deserialize_request(parsed_args.json)
cmd_cls: type[BaseCommand] | None = commands.get(req.command)
if cmd_cls is not None:
exit_code = await cmd_cls.execute(request_to_namespace(req))
if exit_code is not None:
return exit_code
except (ValueError, ValidationError) as e:
print(f"Error: {e}", file=sys.stderr)
return 1
elif parsed_args.command:
cmd_cls = commands.get(parsed_args.command)
if cmd_cls is not None:
exit_code = await cmd_cls.execute(parsed_args)
if exit_code is not None:
return exit_code
try:
args_dict = vars(parsed_args)
if cmd_cls is not None and cmd_cls.COMMAND_NAME:
args_dict["command"] = cmd_cls.COMMAND_NAME
req = make_request(args_dict)
except (ValueError, ValidationError) as e:
print(f"Error: {e}", file=sys.stderr)
return 1
assert req is not None
assert isinstance(req, BaseRequest)
if parsed_args.ack_seq is not None:
req.ack_seq = parsed_args.ack_seq
return await send_command(req)
async def send_command(req: BaseRequest) -> int:
if not UDS_PATH.exists():
print(
f"Daemon socket not found at {UDS_PATH}.\n\n"
"To start the daemon, run:\n"
" fx debug cli start\n\n"
"Or run your tests with debugging enabled:\n"
" fx test <test_name> --agent-debugging-mode",
file=sys.stderr,
)
return 1
try:
reader, writer = await asyncio.open_unix_connection(UDS_PATH)
except Exception as e:
print(f"Error communicating with daemon: {e}", file=sys.stderr)
return 1
try:
writer.write(serialize(req).encode("utf-8"))
await writer.drain()
try:
response_line = await asyncio.wait_for(
reader.readline(), timeout=5.0
)
except asyncio.TimeoutError:
print(
"Timed out waiting for response from daemon.", file=sys.stderr
)
return 1
if response_line:
print(response_line.decode("utf-8").strip())
else:
print("No response received from daemon.", file=sys.stderr)
return 1
return 0
except Exception as e:
print(f"Error communicating with daemon: {e}", file=sys.stderr)
return 1
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
if __name__ == "__main__":
sys.exit(asyncio.run(main(sys.argv[1:])))