blob: 480670a6f31db0d735131558fad2e05f01319eac [file] [log] [blame]
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import typing
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
import build_dir
from async_utils.command import (
AsyncCommand,
CommandEvent,
CommandOutput,
StderrEvent,
StdoutEvent,
)
class ExecutableCommand(ABC):
"""Abstract base class for wrappers that can execute commands."""
@abstractmethod
async def start(self, *args: str) -> AsyncCommand:
"""Start this command with the given arguments.
Returns:
AsyncCommand: Wrapper for the started command.
"""
def sync(
self,
*args: str,
stdout_callback: typing.Callable[[StdoutEvent], None] | None = None,
stderr_callback: typing.Callable[[StderrEvent], None] | None = None,
) -> CommandOutput:
"""Run this command to completion synchronously.
Note that this method creates its own asyncio loop and will fail if
it is called in the context of an existing asyncio loop. For async,
use start() to get an AsyncCommand directly.
Returns:
CommandOutput: The result of running the command to completion.
"""
def local_callback(event: CommandEvent) -> None:
if stdout_callback is not None and isinstance(event, StdoutEvent):
stdout_callback(event)
elif stderr_callback is not None and isinstance(event, StderrEvent):
stderr_callback(event)
async def operation() -> CommandOutput:
running_command = await self.start(*args)
return await running_command.run_to_completion(
callback=local_callback
)
return asyncio.run(operation())
class FxCmd(ExecutableCommand):
"""Wrapper for executing `fx` commands through Python.
Usage (async):
execution = FxCmd(timeout=30.0).start("build")
result = await execution.run_to_completion()
Usage (sync):
def callback(line: StdoutEvent):
print(line.text)
FxCmd(timeout=30.0).sync("build", stdout_callback=callback)
"""
def __init__(
self,
build_directory: str | Path | None = None,
timeout: float | None = None,
):
"""Entry-point for running `fx` commands.
This creates and configures a wrapper for running any fx command.
Args:
build_directory (Path | None, optional): If set, use
this as the build directory. Otherwise, determine build
directory from environment.
timeout (float | None, optional): Timeout for the command
in seconds. Default is no timeout.
"""
if build_directory is not None:
build_directory = str(build_directory)
self._build_directory: str | None = build_directory
self._timeout: float | None = timeout
def command_line(self, *args: str) -> list[str]:
"""The formatted command line this command will execute for the given args."""
build_directory = self._build_directory
if build_directory is None:
build_directory = str(build_dir.get_build_directory())
return ["fx", "--dir", build_directory] + list(args)
async def start(self, *args: str) -> AsyncCommand:
"""Start an invocation of fx asynchronously.
The returned command can be iterated over for output of the
command, or run_to_completion can be used to get the final
result of the called process.
Returns:
AsyncCommand: An asynchronously running invocation of fx.
"""
new_args = self.command_line(*args)
return await AsyncCommand.create(
new_args[0], *new_args[1:], timeout=self._timeout
)
EventType = typing.TypeVar("EventType")
ReturnType = typing.TypeVar("ReturnType")
class QueueFinished:
"""Sentinel value to determine a queue is finished."""
@dataclass
class CommandFailed(Exception):
"""Exception for when a command fails to execute."""
# The return code of the command
return_code: int
@dataclass
class CommandTransformFailed(CommandFailed):
"""Exception for when a transformation operation failed."""
# The exception raised by the transformer.
inner: Exception
class CommandTimeout(CommandFailed):
"""Exception for when a command fails due to a timeout."""
def __init__(self, return_code: int):
super().__init__(return_code=return_code)
@dataclass
class RunningCommand(typing.Generic[EventType, ReturnType]):
"""Container for the output of a running command."""
# The command being executed.
command: AsyncCommand
# Task computing the result of the command execution.
# This must be awaited.
result: asyncio.Task[ReturnType | CommandFailed]
# Queue of events during the execution of the command.
events: asyncio.Queue[EventType | QueueFinished]
class CommandTransformer(typing.Generic[EventType, ReturnType], ABC):
"""Generic transformer of command output."""
def __init__(self, *args: str, inner: ExecutableCommand):
"""Create a transformer that executes a command with the given args
and configured executor.
Args:
inner (ExecutableCommand): A configured wrapper around running
a command.
"""
self._args: list[str] = list(args)
self._inner: ExecutableCommand = inner
async def start(self) -> RunningCommand[EventType, ReturnType]:
"""Start the command asynchronously.
Returns:
RunningCommand[EventType, ReturnType]: Control object for the command.
"""
cmd = await self._inner.start(*self._args)
events = asyncio.Queue[EventType | QueueFinished]()
async def task() -> ReturnType | CommandFailed:
event_exception: Exception | None = None
def do_event_add(event: EventType) -> None:
events.put_nowait(event)
def event_callback(event: CommandEvent) -> None:
nonlocal event_exception
try:
self._handle_event(event, do_event_add)
except Exception as e:
event_exception = e
final = await cmd.run_to_completion(callback=event_callback)
events.put_nowait(QueueFinished())
if final.was_timeout:
return CommandTimeout(final.return_code)
if final.return_code != 0:
return CommandFailed(final.return_code)
if event_exception is not None:
return CommandTransformFailed(
final.return_code, event_exception
)
try:
return self._to_output(final)
except Exception as e:
return CommandTransformFailed(final.return_code, e)
t = asyncio.create_task(task())
return RunningCommand(cmd, t, events)
def sync(
self, event_callback: typing.Callable[[EventType], None] | None = None
) -> ReturnType:
"""Run the command to completion synchronously.
Args:
event_callback (typing.Callable[[EventType], None] | None, optional):
Optional receiver for bespoke command events.
Raises:
CommandError: If the command's return code is not 0
CommandTimeout: If the command reached a specified timeout and was cancelled.
Returns:
ReturnType: The return value for the wrapped command.
"""
async def task() -> ReturnType:
running_command = await self.start()
async def drain_events() -> None:
while event := await running_command.events.get():
if isinstance(event, QueueFinished):
return
if event_callback is not None:
event_callback(event)
results = await asyncio.gather(
running_command.result, drain_events()
)
result = results[0]
if isinstance(result, CommandFailed):
raise result
return result
return asyncio.run(task())
def _handle_event(
self, event: CommandEvent, callback: typing.Callable[[EventType], None]
) -> None:
"""Optional method to override to convert command output into events.
Args:
event (CommandEvent): The incoming event.
callback (typing.Callable[[EventType], None]): A callback
to publish a new event in the bespoke event type for
this transformer.
"""
@abstractmethod
def _to_output(
self,
output: CommandOutput,
) -> ReturnType:
"""Abstract method that must be implemented to turn command
output into the declared return type.
Args:
output (CommandOutput): Output of the command.
Returns:
ReturnType: The output of the command.
"""