blob: b56f57b9217e9ee5eb64b2ba83d5cc037266a6af [file]
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
from typing import TYPE_CHECKING
from shared.protocol import Response
from shared.protocol.threads import ThreadsRequest
if TYPE_CHECKING:
from daemon.daemon import Daemon
COMMAND_NAME = "threads"
async def handle(daemon: Daemon, _req: ThreadsRequest) -> Response:
if not daemon.zxdb_writer:
return Response(
success=False, message="Not connected to zxdb DAP server"
)
try:
resp = await daemon.dap_client.threads(daemon.zxdb_writer)
body = resp.body.model_dump(by_alias=True) if resp.body else None
return Response(
success=True,
body=body,
)
except Exception as e:
return Response(success=False, message=f"Failed to get threads: {e}")