blob: f139109a4d5622447a8096d87ab4f0c7b32104db [file] [edit]
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import unittest
from unittest.mock import AsyncMock, Mock, patch
from daemon.daemon import CommandHandlerRegistry, Daemon
from shared.protocol import (
AttachRequest,
BaseRequest,
ContinueRequest,
PauseRequest,
Response,
ThreadsRequest,
)
class TestCommandHandlerRegistry(unittest.IsolatedAsyncioTestCase):
async def test_register_and_handle(self) -> None:
registry = CommandHandlerRegistry()
async def mock_handler(_req: BaseRequest) -> Response:
return Response(success=True, body={"data": "handled"})
registry.register("test_cmd", mock_handler)
resp = await registry.handle(
"test_cmd", BaseRequest(command="test_cmd")
)
self.assertTrue(resp.success)
self.assertEqual(resp.body, {"data": "handled"})
async def test_unknown_command(self) -> None:
registry = CommandHandlerRegistry()
resp = await registry.handle("unknown", BaseRequest(command="unknown"))
self.assertFalse(resp.success)
self.assertIsNotNone(resp.message)
self.assertIn("Unknown command", resp.message or "")
async def test_attach_registration(self) -> None:
daemon = Daemon(port=15678)
self.assertIn("attach", daemon.registry.handlers)
async def test_handle_attach_success(self) -> None:
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
with patch.object(
daemon.dap_client, "attach", new_callable=AsyncMock
) as mock_attach:
mock_attach_resp = Mock()
mock_attach_resp.dump_dap.return_value = {"success": True}
mock_attach.return_value = mock_attach_resp
req = AttachRequest(filter="my_process")
resp = await daemon.handle_attach(req)
self.assertTrue(resp.success)
self.assertEqual(resp.body, {"success": True})
mock_attach.assert_called_once()
async def test_handle_attach_failure(self) -> None:
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
with patch.object(
daemon.dap_client, "attach", new_callable=AsyncMock
) as mock_attach:
mock_attach.side_effect = Exception("Failed to attach")
req = AttachRequest(filter="my_process")
resp = await daemon.handle_attach(req)
self.assertFalse(resp.success)
self.assertIn("Failed to attach", resp.message or "")
async def test_handle_attach_not_connected(self) -> None:
daemon = Daemon(port=15678)
daemon.zxdb_writer = None
req = AttachRequest(filter="my_process")
resp = await daemon.handle_attach(req)
self.assertFalse(resp.success)
self.assertIn("Not connected", resp.message or "")
@patch("daemon.daemon.ZxdbDapClient")
async def test_handle_continue(self, mock_dap_client_class: Mock) -> None:
mock_dap_client = mock_dap_client_class.return_value
mock_continue_resp = Mock()
mock_continue_resp.dump_dap.return_value = {"success": True}
mock_dap_client.continue_thread = AsyncMock(
return_value=mock_continue_resp
)
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
resp = await daemon.handle_continue(ContinueRequest(thread_id=1))
self.assertTrue(resp.success)
mock_dap_client.continue_thread.assert_called_once()
@patch("daemon.daemon.ZxdbDapClient")
async def test_handle_pause_sync(self, mock_dap_client_class: Mock) -> None:
mock_dap_client = mock_dap_client_class.return_value
mock_dap_client.pause_thread = AsyncMock(return_value={"success": True})
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
async def trigger_stopped() -> None:
await asyncio.sleep(0.1)
daemon.event_waiter.notify_thread_stop(
1,
{
"type": "event",
"event": "stopped",
"body": {"reason": "pause"},
},
)
loop = asyncio.get_event_loop()
loop.create_task(trigger_stopped())
resp = await daemon.handle_pause(PauseRequest(thread_id=1))
self.assertTrue(resp.success)
mock_dap_client.pause_thread.assert_called_once()
def test_threads_registration(self) -> None:
daemon = Daemon(port=15678)
self.assertIn("threads", daemon.registry.handlers)
@patch("daemon.daemon.ZxdbDapClient")
async def test_handle_threads(self, mock_dap_client_class: Mock) -> None:
mock_dap_client = mock_dap_client_class.return_value
mock_threads_resp = Mock()
mock_body = Mock()
mock_thread1 = Mock()
mock_thread1.id = 1
mock_thread1.name = "main"
mock_thread2 = Mock()
mock_thread2.id = 2
mock_thread2.name = "worker"
mock_body.threads = [mock_thread1, mock_thread2]
mock_body.model_dump.return_value = {
"threads": [
{"id": 1, "name": "main"},
{"id": 2, "name": "worker"},
]
}
mock_threads_resp.body = mock_body
mock_dap_client.threads = AsyncMock(return_value=mock_threads_resp)
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
resp = await daemon.handle_threads(ThreadsRequest())
if not resp.success:
print(f"Test failed with message: {resp.message}")
self.assertTrue(resp.success)
assert resp.body is not None
from shared.protocol import GetStateResponse
# Double-compatibility check:
# Pydantic v2 union coercion automatically parses the dictionary returned by handle_threads
# (which matches GetStateResponse's fields) into a typed GetStateResponse object at runtime.
# We check the type to support both strongly-typed GetStateResponse objects and raw dictionaries
# in mock testing.
if isinstance(resp.body, GetStateResponse):
threads = resp.body.threads
self.assertEqual(len(threads), 2)
self.assertEqual(threads[0].id, 1)
self.assertEqual(threads[0].name, "main")
self.assertEqual(threads[1].id, 2)
self.assertEqual(threads[1].name, "worker")
else:
threads = resp.body["threads"]
self.assertEqual(len(threads), 2)
self.assertEqual(threads[0]["id"], 1)
self.assertEqual(threads[0]["name"], "main")
self.assertEqual(threads[1]["id"], 2)
self.assertEqual(threads[1]["name"], "worker")
@patch("daemon.daemon.ZxdbDapClient")
async def test_handle_get_state(self, mock_dap_client_class: Mock) -> None:
"""Verifies handle_get_state successfully queries threads and returns GetStateResponse."""
mock_dap_client = mock_dap_client_class.return_value
mock_threads_resp = Mock()
mock_body = Mock()
mock_thread1 = Mock()
mock_thread1.id = 1
mock_thread1.name = "main"
mock_body.threads = [mock_thread1]
mock_threads_resp.body = mock_body
mock_dap_client.threads = AsyncMock(return_value=mock_threads_resp)
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
daemon.active_processes = {1234: "test_process"}
from shared.protocol import GetStateRequest, GetStateResponse
resp = await daemon.handle_get_state(GetStateRequest())
self.assertTrue(resp.success)
state_resp = resp.body
assert isinstance(state_resp, GetStateResponse)
self.assertEqual(len(state_resp.threads), 1)
self.assertEqual(state_resp.threads[0].id, 1)
self.assertEqual(state_resp.threads[0].name, "main")
self.assertEqual(state_resp.processes, {1234: "test_process"})
@patch("daemon.daemon.ZxdbDapClient")
async def test_handle_get_state_defensive(
self, mock_dap_client_class: Mock
) -> None:
"""Verifies handle_get_state gracefully handles None threads response body."""
mock_dap_client = mock_dap_client_class.return_value
mock_threads_resp = Mock()
mock_threads_resp.body = None # Simulate missing DAP body
mock_dap_client.threads = AsyncMock(return_value=mock_threads_resp)
daemon = Daemon(port=15678)
daemon.zxdb_writer = Mock()
daemon.active_processes = {1234: "test_process"}
from shared.protocol import GetStateRequest, GetStateResponse
resp = await daemon.handle_get_state(GetStateRequest())
self.assertTrue(resp.success)
state_resp = resp.body
assert isinstance(state_resp, GetStateResponse)
self.assertEqual(
len(state_resp.threads), 0
) # Successfully defaulted to empty list without crashing
self.assertEqual(state_resp.processes, {1234: "test_process"})
@patch("daemon.daemon.asyncio.start_unix_server")
@patch("daemon.daemon.ZxdbDapClient")
async def test_run_cleanup_detach_on_existing_session(
self, mock_dap_client_class: Mock, mock_start_unix_server: Mock
) -> None:
mock_dap_client = mock_dap_client_class.return_value
mock_dap_client.zxdb_detach = AsyncMock()
daemon = Daemon(port=15678)
daemon.connect_to_existing = True
daemon.zxdb_writer = Mock() # Simulate active connection
# Mock the unix server
mock_server = AsyncMock()
mock_server.close = Mock() # close is synchronous
mock_start_unix_server.return_value = mock_server
# Start run() in a task
run_task = asyncio.create_task(daemon.run())
# Let it run and reach the wait
await asyncio.sleep(0.05)
# Trigger stop
daemon.stop_event.set()
# Wait for run to complete
await run_task
# Verify zxdb_detach was called with all=True
mock_dap_client.zxdb_detach.assert_called_once()
args, kwargs = mock_dap_client.zxdb_detach.call_args
self.assertTrue(args[1].detach_all)
if __name__ == "__main__":
unittest.main()