blob: b9acfca989506d4cdcf77dab74a3aa9d934be535 [file] [edit]
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import json
import logging
from typing import Any
from pydap.models import (
AttachRequestArguments,
ContinueArguments,
DisconnectArguments,
InitializeArguments,
MessageType,
PauseArguments,
StackTraceArguments,
StackTraceResponse,
ThreadsResponse,
dataclass_to_dict,
from_dict,
)
logger = logging.getLogger(__name__)
class DapError(Exception):
"""Base exception for DAP client errors."""
class DapClient:
"""A client for the Debug Adapter Protocol."""
def __init__(self) -> None:
self._pending_requests: dict[int, asyncio.Future[Any]] = {}
self._seq_counter = 1
async def run(
self, reader: asyncio.StreamReader, event_queue: asyncio.Queue[Any]
) -> None:
"""Runs the client's read loop, processing messages from the reader."""
while True:
try:
msg = await self._read_message(reader)
if msg is None:
break # EOF
msg_type = msg.get("type")
if msg_type == MessageType.EVENT.value:
await event_queue.put(msg)
elif msg_type == MessageType.RESPONSE.value:
req_seq = msg.get("request_seq")
if req_seq in self._pending_requests:
fut = self._pending_requests.pop(req_seq)
if not fut.done():
fut.set_result(msg)
except Exception:
logger.exception("Error in DAP client run loop")
break
async def send_request(
self,
writer: asyncio.StreamWriter,
command: str,
arguments: dict[str, Any] | None = None,
timeout: float = 5.0,
) -> dict[str, Any]:
"""Sends a request to the debug adapter and waits for the response."""
seq = self._seq_counter
self._seq_counter += 1
loop = asyncio.get_running_loop()
fut = loop.create_future()
self._pending_requests[seq] = fut
request = {
"seq": seq,
"type": MessageType.REQUEST.value,
"command": command,
}
if arguments:
request["arguments"] = arguments
await self._write_message(writer, request)
try:
return await asyncio.wait_for(fut, timeout=timeout)
except asyncio.TimeoutError:
self._pending_requests.pop(seq, None)
raise DapError(
f"Request {command} (seq={seq}) timed out after {timeout}s"
)
async def initialize(
self, writer: asyncio.StreamWriter, args: InitializeArguments
) -> dict[str, Any]:
"""Sends an initialize request."""
return await self.send_request(
writer, "initialize", dataclass_to_dict(args)
)
async def disconnect(
self, writer: asyncio.StreamWriter, args: DisconnectArguments
) -> dict[str, Any]:
"""Sends a disconnect request."""
return await self.send_request(
writer, "disconnect", dataclass_to_dict(args)
)
async def stack_trace(
self, writer: asyncio.StreamWriter, args: StackTraceArguments
) -> StackTraceResponse:
"""Sends a stackTrace request."""
resp = await self.send_request(
writer, "stackTrace", dataclass_to_dict(args)
)
return from_dict(StackTraceResponse, resp.get("body", {}))
async def continue_thread(
self, writer: asyncio.StreamWriter, args: ContinueArguments
) -> dict[str, Any]:
"""Sends a continue request."""
return await self.send_request(
writer, "continue", dataclass_to_dict(args)
)
async def pause_thread(
self, writer: asyncio.StreamWriter, args: PauseArguments
) -> dict[str, Any]:
"""Sends a pause request."""
return await self.send_request(writer, "pause", dataclass_to_dict(args))
async def threads(self, writer: asyncio.StreamWriter) -> ThreadsResponse:
"""Sends a threads request."""
resp = await self.send_request(writer, "threads")
return from_dict(ThreadsResponse, resp.get("body", {}))
async def attach(
self, writer: asyncio.StreamWriter, args: AttachRequestArguments
) -> dict[str, Any]:
"""Sends an attach request."""
data = dataclass_to_dict(args)
# Map _restart to __restart for protocol compliance
restart = data.pop("_restart", None)
if restart is not None:
data["__restart"] = restart
extra_fields = data.pop("extra_fields", None)
if extra_fields:
data.update(extra_fields)
return await self.send_request(writer, "attach", data)
async def _read_message(
self, reader: asyncio.StreamReader
) -> dict[str, Any] | None:
"""Reads a single message from the reader, handling protocol framing."""
content_length = None
while True:
line = await reader.readline()
if not line:
return None # EOF
trimmed = line.decode("utf-8").strip()
if not trimmed:
break # End of headers
if trimmed.startswith("Content-Length:"):
parts = trimmed.split(":")
if len(parts) >= 2:
try:
content_length = int(parts[1].strip())
except ValueError:
raise DapError(f"Invalid Content-Length: {trimmed}")
if content_length is None:
raise DapError("Missing Content-Length header")
body = await reader.readexactly(content_length)
return json.loads(body.decode("utf-8"))
async def _write_message(
self, writer: asyncio.StreamWriter, value: dict[str, Any]
) -> None:
"""Writes a message to the writer, handling protocol framing."""
content = json.dumps(value, separators=(",", ":")).encode("utf-8")
header = f"Content-Length: {len(content)}\r\n\r\n".encode("utf-8")
writer.write(header)
writer.write(content)
await writer.drain()