| # Copyright 2026 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import asyncio |
| import json |
| import logging |
| from typing import Any |
| |
| from .dap_types import DapBaseModel |
| from .models import ( |
| AttachRequestArguments, |
| ContinueArguments, |
| DisconnectArguments, |
| EvaluateArguments, |
| InitializeArguments, |
| LaunchArguments, |
| MessageType, |
| PauseArguments, |
| Response, |
| StackTraceArguments, |
| StackTraceResponse, |
| ThreadsResponse, |
| ) |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class DapError(Exception): |
| """Base exception for DAP client errors.""" |
| |
| |
| class DapClient: |
| """A client for the Debug Adapter Protocol.""" |
| |
| def __init__(self) -> None: |
| """Initializes the DAP client.""" |
| self._pending_requests: dict[int, asyncio.Future[Any]] = {} |
| self._seq_counter = 1 |
| |
| async def run( |
| self, reader: asyncio.StreamReader, event_queue: asyncio.Queue[Any] |
| ) -> None: |
| """Runs the client's read loop, processing messages from the reader. |
| |
| Args: |
| reader: Stream reader to receive messages from the debug adapter. |
| event_queue: Queue to put received DAP events into. |
| """ |
| while True: |
| try: |
| msg = await self._read_message(reader) |
| if msg is None: |
| break # EOF |
| |
| msg_type = msg.get("type") |
| if msg_type == MessageType.EVENT.value: |
| await event_queue.put(msg) |
| elif msg_type == MessageType.RESPONSE.value: |
| req_seq = msg.get("request_seq") |
| if req_seq in self._pending_requests: |
| fut = self._pending_requests.pop(req_seq) |
| if not fut.done(): |
| fut.set_result(msg) |
| except Exception: |
| logger.exception("Error in DAP client run loop") |
| break |
| |
| async def _send_request( |
| self, |
| writer: asyncio.StreamWriter, |
| command: str, |
| arguments: DapBaseModel | None = None, |
| timeout: float = 5.0, |
| ) -> dict[str, Any]: |
| """Sends a request to the debug adapter and waits for the response. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| command: The DAP command name. |
| arguments: Optional arguments for the command. |
| timeout: Maximum time to wait for response in seconds. |
| |
| Returns: |
| The response message dictionary from the adapter. |
| |
| Raises: |
| DapError: If the request times out or framing fails. |
| """ |
| seq = self._seq_counter |
| self._seq_counter += 1 |
| |
| loop = asyncio.get_running_loop() |
| fut = loop.create_future() |
| self._pending_requests[seq] = fut |
| |
| request: dict[str, Any] = { |
| "seq": seq, |
| "type": MessageType.REQUEST.value, |
| "command": command, |
| } |
| if arguments is not None: |
| if not isinstance(arguments, DapBaseModel): |
| raise TypeError( |
| f"arguments must be a DapBaseModel, got {type(arguments)}" |
| ) |
| request["arguments"] = arguments.dump_dap() |
| |
| await self._write_message(writer, request) |
| try: |
| return await asyncio.wait_for(fut, timeout=timeout) |
| except asyncio.TimeoutError: |
| self._pending_requests.pop(seq, None) |
| raise DapError( |
| f"Request {command} (seq={seq}) timed out after {timeout}s" |
| ) |
| |
| async def initialize( |
| self, writer: asyncio.StreamWriter, args: InitializeArguments |
| ) -> Response: |
| """Sends an initialize request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the initialize request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "initialize", args) |
| return Response.model_validate(resp) |
| |
| async def disconnect( |
| self, writer: asyncio.StreamWriter, args: DisconnectArguments |
| ) -> Response: |
| """Sends a disconnect request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the disconnect request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "disconnect", args) |
| return Response.model_validate(resp) |
| |
| async def stack_trace( |
| self, writer: asyncio.StreamWriter, args: StackTraceArguments |
| ) -> StackTraceResponse: |
| """Sends a stackTrace request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the stackTrace request. |
| |
| Returns: |
| The stackTrace response model. |
| """ |
| resp = await self._send_request(writer, "stackTrace", args) |
| return StackTraceResponse.model_validate(resp) |
| |
| async def continue_thread( |
| self, writer: asyncio.StreamWriter, args: ContinueArguments |
| ) -> Response: |
| """Sends a continue request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the continue request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "continue", args) |
| return Response.model_validate(resp) |
| |
| async def pause_thread( |
| self, writer: asyncio.StreamWriter, args: PauseArguments |
| ) -> Response: |
| """Sends a pause request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the pause request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "pause", args) |
| return Response.model_validate(resp) |
| |
| async def threads(self, writer: asyncio.StreamWriter) -> ThreadsResponse: |
| """Sends a threads request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| |
| Returns: |
| The threads response model. |
| """ |
| resp = await self._send_request(writer, "threads") |
| return ThreadsResponse.model_validate(resp) |
| |
| async def attach( |
| self, writer: asyncio.StreamWriter, args: AttachRequestArguments |
| ) -> Response: |
| """Sends an attach request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the attach request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "attach", args) |
| return Response.model_validate(resp) |
| |
| async def launch( |
| self, writer: asyncio.StreamWriter, args: LaunchArguments |
| ) -> Response: |
| """Sends a launch request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the launch request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "launch", args) |
| return Response.model_validate(resp) |
| |
| async def evaluate( |
| self, writer: asyncio.StreamWriter, args: EvaluateArguments |
| ) -> Response: |
| """Sends an evaluate request. |
| |
| Args: |
| writer: Stream writer to send the request to. |
| args: Arguments for the evaluate request. |
| |
| Returns: |
| The response model. |
| """ |
| resp = await self._send_request(writer, "evaluate", args) |
| return Response.model_validate(resp) |
| |
| async def _read_message( |
| self, reader: asyncio.StreamReader |
| ) -> dict[str, Any] | None: |
| """Reads a single message from the reader, handling protocol framing. |
| |
| Args: |
| reader: Stream reader to read from. |
| |
| Returns: |
| The parsed message dictionary, or None on EOF. |
| |
| Raises: |
| DapError: If framing headers are invalid or missing. |
| """ |
| content_length = None |
| while True: |
| line = await reader.readline() |
| if not line: |
| return None # EOF |
| trimmed = line.decode("utf-8").strip() |
| if not trimmed: |
| break # End of headers |
| |
| if trimmed.startswith("Content-Length:"): |
| parts = trimmed.split(":") |
| if len(parts) >= 2: |
| try: |
| content_length = int(parts[1].strip()) |
| except ValueError: |
| raise DapError(f"Invalid Content-Length: {trimmed}") |
| |
| if content_length is None: |
| raise DapError("Missing Content-Length header") |
| |
| body = await reader.readexactly(content_length) |
| return json.loads(body.decode("utf-8")) |
| |
| async def _write_message( |
| self, writer: asyncio.StreamWriter, value: dict[str, Any] |
| ) -> None: |
| """Writes a message to the writer, handling protocol framing. |
| |
| Args: |
| writer: Stream writer to write to. |
| value: The message dictionary to serialize and send. |
| """ |
| content = json.dumps(value, separators=(",", ":")).encode("utf-8") |
| header = f"Content-Length: {len(content)}\r\n\r\n".encode("utf-8") |
| writer.write(header) |
| writer.write(content) |
| await writer.drain() |