blob: fe41e296ddb3c37ab1ab9786f72ff0001ee4ddff [file] [edit]
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import os
import signal
from collections.abc import Awaitable, Callable
from typing import Any, Final, TypeVar, cast, final
from async_utils.command import AsyncCommand
from ffx_cmd.lib import FfxCmd
from pydap.client import DapClient
from shared.protocol import (
AttachRequest,
BaseRequest,
GetStateRequest,
Response,
StopRequest,
deserialize_request,
serialize,
)
# TODO(https://fxbug.dev/504962182): Replace this with something more appropriate.
UDS_PATH = "/tmp/fx-debug-daemon.sock"
DEFAULT_DAP_PORT: Final[int] = 15678
class CommandHandlerRegistry:
def __init__(self) -> None:
self.handlers: dict[
str, Callable[[BaseRequest], Awaitable[Response]]
] = {}
RequestT = TypeVar("RequestT", bound=BaseRequest)
def register(
self,
command: str,
handler: Callable[[RequestT], Awaitable[Response]],
) -> None:
self.handlers[command] = cast(
Callable[[BaseRequest], Awaitable[Response]], handler
)
async def handle(self, command: str, req: BaseRequest) -> Response:
if command in self.handlers:
try:
return await self.handlers[command](req)
except Exception as e:
return Response(success=False, message=f"Handler error: {e}")
return Response(success=False, message=f"Unknown command: {command}")
@final
class Daemon:
def __init__(
self, port: int | None, connect_to_existing: bool = False
) -> None:
self.registry = CommandHandlerRegistry()
self.dap_client = DapClient()
self.background_tasks: set[asyncio.Task[None]] = set()
self.active_handlers: set[asyncio.Task[Any]] = set()
self.event_queue: asyncio.Queue[Any] = asyncio.Queue()
self.stop_event = asyncio.Event()
self.zxdb_writer: asyncio.StreamWriter | None = None
self.zxdb_reader: asyncio.StreamReader | None = None
self.port = port
self.connect_to_existing = connect_to_existing
self.dap_proc: AsyncCommand | None = None
self.registry.register("stop", self.handle_stop)
self.registry.register(
"get-state",
self.handle_get_state,
)
self.registry.register(
"attach",
self.handle_attach,
)
async def handle_stop(self, _req: StopRequest) -> Response:
self.stop_event.set()
return Response(success=True, message="Daemon stopping")
async def handle_get_state(self, _req: GetStateRequest) -> Response:
if not self.zxdb_writer:
return Response(
success=False, message="Not connected to zxdb DAP server"
)
try:
threads_resp = await self.dap_client.threads(self.zxdb_writer)
threads = []
for t in threads_resp.threads:
threads.append(
{
"id": t.id,
"name": t.name,
}
)
return Response(success=True, body={"threads": threads})
except Exception as e:
return Response(
success=False, message=f"Failed to get threads: {e}"
)
async def handle_attach(self, req: AttachRequest) -> Response:
if not self.zxdb_writer:
return Response(
success=False, message="Not connected to zxdb DAP server"
)
from pydap.models import AttachRequestArguments
attach_args = AttachRequestArguments(
_restart=None, extra_fields={"process": req.filter}
)
try:
resp = await self.dap_client.attach(self.zxdb_writer, attach_args)
return Response(success=True, body=resp)
except Exception as e:
return Response(success=False, message=f"Failed to attach: {e}")
async def run(self) -> int:
if os.path.exists(UDS_PATH):
os.unlink(UDS_PATH)
server = await asyncio.start_unix_server(
self.handle_uds_client, UDS_PATH
)
print(f"Daemon listening on {UDS_PATH}")
if not self.connect_to_existing:
import package_server
async with package_server.ensure_running():
ffx_cmd = FfxCmd()
pid = os.getpid()
args = [
"debug",
"connect",
"--",
"--enable-debug-adapter",
f"--signal-when-ready={pid}",
]
if self.port is not None:
args.extend(["--debug-adapter-port", str(self.port)])
else:
self.port = DEFAULT_DAP_PORT
self.dap_proc = await ffx_cmd.start(*args)
# Wait for signal from zxdb
loop = asyncio.get_running_loop()
signal_fut = loop.create_future()
def handle_sigusr1() -> None:
signal_fut.set_result(True)
loop.add_signal_handler(signal.SIGUSR1, handle_sigusr1)
try:
await asyncio.wait_for(signal_fut, timeout=30.0)
print("Received SIGUSR1 from zxdb.")
except asyncio.TimeoutError:
print("Timed out waiting for SIGUSR1 from zxdb.")
if self.dap_proc:
self.dap_proc.terminate()
server.close()
await server.wait_closed()
os.unlink(UDS_PATH)
return 1
finally:
loop.remove_signal_handler(signal.SIGUSR1)
return await self._run_dap_session(server)
else:
return await self._run_dap_session(server)
async def _run_dap_session(self, server: asyncio.AbstractServer) -> int:
# Poll for connection to DAP port
connected = False
for _ in range(20):
try:
(
self.zxdb_reader,
self.zxdb_writer,
) = await asyncio.open_connection("localhost", self.port)
connected = True
print("Connected to DAP server.")
break
except Exception:
await asyncio.sleep(1)
assert self.zxdb_reader is not None
assert self.zxdb_writer is not None
if not connected:
print("Failed to connect to DAP server after polling.")
if self.dap_proc:
self.dap_proc.terminate()
server.close()
await server.wait_closed()
os.unlink(UDS_PATH)
return 1
# Run DAP client
self.background_tasks.add(
asyncio.create_task(
self.dap_client.run(self.zxdb_reader, self.event_queue)
)
)
# Initialize DAP
from pydap.models import InitializeArguments
await self.dap_client.initialize(
self.zxdb_writer,
InitializeArguments(adapterID="zxdb"),
)
await self.stop_event.wait()
_done, pending = await asyncio.wait(self.active_handlers, timeout=5.0)
for task in pending:
task.cancel()
server.close()
await server.wait_closed()
if os.path.exists(UDS_PATH):
os.unlink(UDS_PATH)
if self.dap_proc:
self.dap_proc.terminate()
return 0
async def handle_uds_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
# This function is only called when there is a new connection. Each connection is only
# expected to send a single request so there is no looping to do here. At the start of a new
# connection we store this task so that the main task can be sure that there are no dangling
# connections during shutdown.
current_task = asyncio.current_task()
assert current_task is not None
self.active_handlers.add(current_task)
line = await reader.readline()
if not line:
return
try:
req = deserialize_request(line.decode("utf-8"))
resp = await self.registry.handle(req.command, req)
writer.write(serialize(resp).encode("utf-8"))
await writer.drain()
except Exception as e:
resp = Response(success=False, message=f"Error: {e}")
writer.write(serialize(resp).encode("utf-8"))
await writer.drain()
finally:
self.active_handlers.remove(current_task)
writer.close()
await writer.wait_closed()