| # Copyright 2026 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import asyncio |
| import io |
| import json |
| import unittest |
| |
| from pydap.client import DapClient |
| from pydap.models import * |
| |
| |
| class MockWriter: |
| def __init__(self) -> None: |
| self.buffer = io.BytesIO() |
| |
| def write(self, data: bytes) -> None: |
| self.buffer.write(data) |
| |
| async def drain(self) -> None: |
| pass |
| |
| |
| class TestDapClient(unittest.IsolatedAsyncioTestCase): |
| async def test_read_message(self) -> None: |
| data = b'Content-Length: 26\r\n\r\n{"seq":1,"type":"request"}' |
| reader = asyncio.StreamReader() |
| reader.feed_data(data) |
| reader.feed_eof() |
| |
| client = DapClient() |
| msg = await client._read_message(reader) |
| self.assertIsNotNone(msg) |
| assert msg is not None |
| self.assertEqual(msg["seq"], 1) |
| |
| self.assertEqual(msg["type"], "request") |
| |
| async def test_write_message(self) -> None: |
| client = DapClient() |
| value = {"seq": 1, "type": "request"} |
| |
| writer = MockWriter() |
| await client._write_message(writer, value) # type: ignore |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| self.assertEqual(headers, b"Content-Length: 26") |
| self.assertEqual(json.loads(body.decode("utf-8")), value) |
| |
| async def test__send_request(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| |
| send_task = asyncio.create_task( |
| client._send_request(writer, "initialize") # type: ignore |
| ) |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "initialize", |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp["success"]) |
| self.assertEqual(resp["request_seq"], seq) |
| |
| async def test_initialize(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| args = InitializeArguments(adapter_id="test") |
| send_task = asyncio.create_task( |
| client.initialize(writer, args) # type: ignore |
| ) |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "initialize", |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| |
| async def test_disconnect(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| args = DisconnectArguments(terminate_debuggee=True) |
| send_task = asyncio.create_task( |
| client.disconnect(writer, args) # type: ignore |
| ) |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "disconnect", |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| |
| async def test_stack_trace(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| args = StackTraceArguments(thread_id=1) |
| send_task = asyncio.create_task( |
| client.stack_trace(writer, args) # type: ignore |
| ) |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "stackTrace", |
| "body": {"stackFrames": []}, |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertEqual(resp.body.stack_frames, []) |
| |
| async def test_continue_thread(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| args = ContinueArguments(thread_id=1) |
| send_task = asyncio.create_task( |
| client.continue_thread(writer, args) # type: ignore |
| ) |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "continue", |
| "body": {"allThreadsContinued": True}, |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| |
| async def test_pause_thread(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| args = PauseArguments(thread_id=1) |
| send_task = asyncio.create_task( |
| client.pause_thread(writer, args) # type: ignore |
| ) |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "pause", |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| |
| async def test_threads(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| send_task = asyncio.create_task(client.threads(writer)) # type: ignore |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "threads", |
| "body": { |
| "threads": [ |
| {"id": 1234, "name": "main"}, |
| {"id": 5678, "name": "worker"}, |
| ] |
| }, |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertEqual(len(resp.body.threads), 2) |
| self.assertEqual(resp.body.threads[0].id, 1234) |
| self.assertEqual(resp.body.threads[0].name, "main") |
| |
| async def test_attach(self) -> None: |
| client = DapClient() |
| |
| writer = MockWriter() |
| args = AttachRequestArguments( |
| restart=True, extra_fields={"process": "my_process"} |
| ) |
| send_task = asyncio.create_task(client.attach(writer, args)) # type: ignore |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "attach", |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| self.assertTrue(req_val["arguments"]["__restart"]) |
| self.assertEqual(req_val["arguments"]["process"], "my_process") |
| |
| async def test_launch(self) -> None: |
| client = DapClient() |
| writer = MockWriter() |
| args = LaunchArguments(process="my_process", launch_command="run") |
| send_task = asyncio.create_task(client.launch(writer, args)) # type: ignore |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "launch", |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| self.assertEqual(req_val["arguments"]["process"], "my_process") |
| self.assertEqual(req_val["arguments"]["launchCommand"], "run") |
| |
| async def test_evaluate(self) -> None: |
| client = DapClient() |
| writer = MockWriter() |
| args = EvaluateArguments(expression="1 + 1", context="repl") |
| send_task = asyncio.create_task(client.evaluate(writer, args)) # type: ignore |
| |
| await asyncio.sleep(0.1) |
| |
| buffer_val = writer.buffer.getvalue() |
| headers, body = buffer_val.split(b"\r\n\r\n", 1) |
| req_val = json.loads(body.decode("utf-8")) |
| seq = req_val["seq"] |
| |
| response = { |
| "seq": 10, |
| "type": "response", |
| "request_seq": seq, |
| "success": True, |
| "command": "evaluate", |
| "body": {"result": "2"}, |
| } |
| |
| if seq in client._pending_requests: |
| client._pending_requests[seq].set_result(response) |
| |
| resp = await send_task |
| self.assertTrue(resp.success) |
| self.assertEqual((resp.body or {}).get("result"), "2") |
| self.assertEqual(req_val["arguments"]["expression"], "1 + 1") |
| self.assertEqual(req_val["arguments"]["context"], "repl") |