blob: cd1f0fd9da42747f83bdec95bef15f5c57600362 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import re
import shutil
import subprocess
import sys
import tempfile
from abc import ABC, abstractmethod
from collections import namedtuple
from functools import reduce
from operator import add
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from fuchsia_task_lib import FuchsiaTask, TaskExecutionException, Terminal
Arguments = namedtuple('WorkflowArguments', [
'manifest',
'dump',
'dry_run',
'verbose',
'help',
'workflow_arguments',
])
ARGUMENTS = None
class WorkflowArgumentException(Exception):
pass
class WorkflowArguments:
def __init__(self, arguments: List[str]) -> None:
self._global_args = []
self._entity_args = {}
for i, arg in enumerate(arguments):
if '=' in arg:
k, v = arg.split('=', 1)
args = self._entity_args.get(k, {}).get('args', []) + [(i, v)]
self._entity_args[k] = {
'args': args,
'consumed': False,
}
else:
self._global_args.append((i, arg))
@property
def global_arguments(self) -> List[Tuple[float, str]]:
return self._global_args
def get_entity_arguments(self, *mnemonics: str, type: str = None) -> List[Tuple[float, str]]:
args = []
for mnemonic in (*mnemonics, f'entity.{type}'):
if mnemonic in self._entity_args:
self._entity_args[mnemonic]['consumed'] = True
args.extend(self._entity_args[mnemonic]['args'])
return args
@property
def unused_mnemonics(self) -> None:
return [k for k, v in self._entity_args.items() if not v['consumed']]
def parse_workflow_args() -> None:
global ARGUMENTS
ARG = 0
BOOL_ARG = 1 << 0
FORWARD_ARG = 1 << 1
arg_opts = {
'--workflow-manifest': ('manifest', ARG),
'--workflow-dump': ('dump', BOOL_ARG),
'--workflow-dry-run': ('dry_run', BOOL_ARG),
'--workflow-verbose': ('verbose', BOOL_ARG),
'--help': ('help', BOOL_ARG | FORWARD_ARG),
}
cla = sys.argv[1:]
args = {
'manifest': None,
'dump': None,
'dry_run': None,
'verbose': None,
'help': None,
}
workflow_arguments = []
while cla:
arg = cla.pop(0)
if arg in arg_opts:
target, opt = arg_opts[arg]
if args[target] is None:
args[target] = True if opt & BOOL_ARG else cla.pop(0)
if opt & FORWARD_ARG:
forward_args = [arg] + ([] if opt & BOOL_ARG else [args[target]])
workflow_arguments.extend([f'entity.task={arg}' for arg in forward_args])
else:
raise Exception(f'Duplicate argument {arg}')
else:
workflow_arguments.append(arg)
args['help'] = bool(args['help'])
args['dry_run'] = bool(args['dry_run'])
args['verbose'] = args['verbose'] or args['dry_run']
args['dump'] = bool(args['dump'])
if args['manifest']:
args['manifest'] = Path(args['manifest'])
else:
raise Exception(f'Expected argument --workflow-manifest')
ARGUMENTS = Arguments(
workflow_arguments=WorkflowArguments(workflow_arguments),
**args
)
def run(*command: Union[Path, str], **kwargs: Dict[str, Any]) -> None:
if ARGUMENTS.verbose:
print('Executing command: ', command)
if ARGUMENTS.dry_run:
return
subprocess.check_call(command, **kwargs)
class Entity(ABC):
def __init__(
self,
*,
entity_collection: 'EntityCollection',
label: str,
task_num: int = 1,
mnemonic: str,
mnemonic_suffix: str,
explicit_args: List[Tuple[float, str]],
propagated_args: List[Tuple[float, str]],
global_args: List[Tuple[float, str]],
) -> None:
self._entity_collection = entity_collection
self._label = label
self._task_num = task_num
self._mnemonic = mnemonic
self._mnemonic_suffix = mnemonic_suffix
self._explicit_args = explicit_args
self._propagated_args = propagated_args
self._global_args = global_args
@property
def name(self) -> str:
# Uses the mnemonic, with the suffix if necessary.
return self.mnemonic + (
self.mnemonic_suffix if self.needs_suffix() else ''
)
@property
def mnemonic(self) -> str:
return self._mnemonic
@property
def mnemonic_suffix(self) -> str:
return self._mnemonic_suffix
def needs_suffix(self) -> bool:
return self._entity_collection.is_ambiguous(self._mnemonic)
@property
def label(self) -> str:
return self._label
def sort_arguments(self, args: List[Tuple[float, str]]) -> List[str]:
return list((list(zip(*sorted(args, key=lambda arg: arg[0]))) or [None, []])[1])
@property
def explicit_args(self) -> List[str]:
return self.sort_arguments(self._explicit_args)
@property
def propagated_args(self) -> List[str]:
return self.sort_arguments(self._propagated_args)
@property
def global_args(self) -> List[str]:
return self.sort_arguments(self._global_args)
@property
def task_num(self) -> int:
return self._task_num
@property
@abstractmethod
def task_count(self) -> int:
raise Exception('Entity.task_count is not implemented.')
@classmethod
def create(cls, *_, **kwargs) -> 'Entity':
raise Exception('Entity.create is not implemented.')
@abstractmethod
def run(self, total_tasks: int) -> None:
raise Exception('Entity.run is not implemented.')
def __str__(self) -> str:
return self.label
class Task(Entity):
def __init__(self, *, task_runner: Path, default_argument_scope: str, **entity_kwargs) -> None:
super().__init__(**entity_kwargs)
self._task_runner = task_runner.resolve()
self._default_argument_scope = default_argument_scope
self._previous_state_file = self._entity_collection.last_task and self._entity_collection.last_task.incremental_workflow_state_file
@property
def task_count(self) -> int:
return 1
@property
def incremental_workflow_state_file(self) -> Path:
return self._entity_collection.global_working_dir / f'{self.mnemonic}{self.mnemonic_suffix}.state'
@property
def task_runner(self) -> Path:
return self._task_runner
@property
def default_argument_scope(self) -> str:
return 'global' if self._entity_collection.entrypoint.task_count == 1 else self._default_argument_scope
@classmethod
def create(
cls,
entity: Dict[str, Any],
explicit_args: List[Tuple[float, str]] = [],
**entity_kwargs,
) -> 'Task':
return Task(
task_runner=Path(entity['task_runner']),
default_argument_scope=entity['default_argument_scope'],
explicit_args=[(i - len(entity['args']), arg) for i, arg in enumerate(entity['args'])] + explicit_args,
**entity_kwargs,
)
@property
def effective_args(self) -> List[str]:
meta_args = [
arg
for meta_arg in [
'--workflow_task_name',
self.name,
*(['--workflow_final_task'] if self._entity_collection.last_task == self else []),
*(['--workflow_previous_state', self._previous_state_file] if self._previous_state_file else []),
'--workflow_next_state',
self.incremental_workflow_state_file,
'--default_argument_scope',
self.default_argument_scope,
]
for arg in ['__META_ARGUMENT__', meta_arg]
]
global_args = [
(idx, arg)
for orig_idx, orig_arg in self._global_args
for idx, arg in [(orig_idx - .5, '__GLOBAL_ARGUMENT__'), (orig_idx, orig_arg)]
]
workflow_args = [
(idx, arg)
for orig_idx, orig_arg in self._propagated_args
for idx, arg in [(orig_idx - .5, '__WORKFLOW_ARGUMENT__'), (orig_idx, orig_arg)]
]
return [
*meta_args,
*self.sort_arguments(global_args + workflow_args + self._explicit_args)
]
def run(self, total_tasks: int = 1) -> None:
task_description = f'{Terminal.purple(self.name)} (step {self.task_num}/{total_tasks})'
print(Terminal.if_no_color('<progress> ') + f'{Terminal.bold("Running task:")} {task_description}')
try:
run(self.task_runner, *self.effective_args)
except subprocess.SubprocessError:
raise TaskExecutionException(f'{Terminal.red("Error:")} Task {task_description} failed.')
except KeyboardInterrupt:
raise TaskExecutionException(f'{Terminal.red("Task Interrupted")}: {task_description}.')
if FuchsiaTask.read_workflow_state(self.incremental_workflow_state_file)['workflow']['halt_execution']:
raise TaskExecutionException('Workflow execution halted.')
class Workflow(Entity):
def __init__(self, *, sequence: List[Entity], **entity_kwargs) -> None:
super().__init__(**entity_kwargs)
self._sequence = sequence
@property
def task_count(self) -> int:
return sum([dep.task_count for dep in self.sequence])
@classmethod
def create(
cls,
entity_collection: 'EntityCollection',
entity: Dict[str, Any],
explicit_args: List[Tuple[float, str]],
propagated_args: List[Tuple[float, str]],
task_num: int = 1,
**entity_kwargs,
) -> 'Workflow':
explicit_args = [(i - len(entity['args']), arg) for i, arg in enumerate(entity['args'])] + explicit_args
child_task_num = task_num
sequence = []
for step in entity['sequence']:
sequence.append(entity_collection.create(
label=step,
task_num=child_task_num,
propagated_args=propagated_args + explicit_args,
))
child_task_num += sequence[-1].task_count
return Workflow(
entity_collection=entity_collection,
sequence=sequence,
explicit_args=explicit_args,
propagated_args=propagated_args,
task_num=task_num,
**entity_kwargs,
)
@property
def sequence(self) -> List[Entity]:
return self._sequence
def run(self, total_tasks: Optional[int] = None) -> None:
print(Terminal.if_no_color('<progress> ') + f'{Terminal.bold("Running workflow:")} {Terminal.green(self.name)}')
for entity in self.sequence:
entity.run(total_tasks=total_tasks or self.task_count)
class EntityCollection:
def __init__(
self,
entities: Dict[str, Dict[str, Any]],
entrypoint: str,
args: WorkflowArguments,
global_working_dir: Path,
) -> None:
self._data_entities = entities
self._args = args
self._global_working_dir = global_working_dir
self._mnemonics_by_label: Dict[str, List[str]] = {}
self._entity_by_mnemonic: Dict[str, Entity] = {}
self._last_task = None
self._entrypoint = self.create(
label=entrypoint,
task_num=1,
propagated_args=[],
)
def is_ambiguous(self, mnemonic: str) -> bool:
return len(self._mnemonics_by_label.get(mnemonic, [])) > 1
def _determine_mnemonics(self, label: str) -> Tuple[str, str]:
mnemonic = re.search(r'[\w\.\-_]+$', label)[0]
associated_mnemonics = self._mnemonics_by_label.get(mnemonic, [])
suffix = f'.{len(associated_mnemonics) + 1}'
self._mnemonics_by_label[mnemonic] = [*associated_mnemonics, mnemonic + suffix]
return mnemonic, suffix
def create(
self,
label: str,
task_num: int,
propagated_args: List[Tuple[float, str]],
) -> Union[Workflow, Task]:
mnemonic, suffix = self._determine_mnemonics(label)
entity_type = self._data_entities[label]['type']
entity = {
'task': Task,
'workflow': Workflow,
}[entity_type].create(
entity_collection=self,
label=label,
mnemonic=mnemonic,
mnemonic_suffix=suffix,
explicit_args=self._args.get_entity_arguments(mnemonic, mnemonic + suffix, type=entity_type),
propagated_args=propagated_args,
global_args=self._args.global_arguments,
entity=self._data_entities[label],
task_num=task_num,
)
if entity_type == 'task':
self._last_task = entity
self._entity_by_mnemonic[mnemonic + suffix] = entity
return entity
@property
def last_task(self) -> Optional[Task]:
return self._last_task
@property
def global_working_dir(self) -> Path:
return self._global_working_dir
@property
def entrypoint(self) -> Entity:
return self._entrypoint
def from_manifest(
entities: Dict[str, Dict[str, Any]],
entrypoint: str,
args: WorkflowArguments,
global_working_dir: Path,
) -> Entity:
return EntityCollection(entities, entrypoint, args, global_working_dir).entrypoint
class WorkflowRunner:
def __init__(
self,
entities: Dict[str, Dict[str, Any]],
entrypoint: str,
args: WorkflowArguments,
) -> None:
self._args = args
self._working_directory = Path(tempfile.gettempdir()) / (
'workflow.%s' % re.search(r'[\w\.\-_]+$', entrypoint)[0]
)
self.entrypoint = EntityCollection.from_manifest(
entities,
entrypoint,
args,
self._working_directory
)
def validate_arguments(self):
if self._args.unused_mnemonics:
raise WorkflowArgumentException(
f'{Terminal.bold(Terminal.red("Error:"))} Unrecognized workflow or task: {self._args.unused_mnemonics}'
)
def run(self) -> None:
if self._working_directory.exists():
shutil.rmtree(self._working_directory)
self._working_directory.mkdir()
self.entrypoint.run()
def dump_workflow_tree(self) -> 'WorkflowRunner':
CHILD = Terminal.bold('├── ')
NESTED = Terminal.bold('│ ')
LAST_CHILD = Terminal.bold('└── ')
LAST_NESTED = Terminal.bold(' ')
root = self.entrypoint
def preorder_traversal(
node: Entity,
prefixes: List[str] = [],
is_last_child: bool = False,
) -> None:
def print_entry(first_line: Any, *additional_lines: Any):
print(reduce(add, prefixes[:-1], '') + (
'' if not prefixes else LAST_CHILD if is_last_child else CHILD
) + str(first_line))
for line in additional_lines:
print(reduce(add, prefixes, '') + str(line))
suffix = Terminal.cyan(node.mnemonic_suffix) if node.needs_suffix() else ""
if isinstance(node, Workflow):
print_entry(
f'{Terminal.bold("Workflow:")} {Terminal.green(node.mnemonic) + suffix}',
f'{Terminal.bold("Label:")} {Terminal.underline(node.label)}',
f'{Terminal.bold("Subtask Arguments:")} {node.explicit_args}',
)
for child, is_last in zip(node.sequence, (len(node.sequence) - 1) * [False] + [True]):
preorder_traversal(
child,
[*prefixes, LAST_NESTED if is_last else NESTED],
is_last,
)
else:
print_entry(
f'{Terminal.bold("Task:")} {Terminal.purple(node.mnemonic) + suffix} (step {node.task_num}/{root.task_count})',
f'{Terminal.bold("Label:")} {Terminal.underline(node.label)}',
f'{Terminal.bold("Runner:")} {Terminal.underline(node.task_runner)}',
f'{Terminal.bold("Workflow Arguments:")} {node.propagated_args}',
f'{Terminal.bold("Default Argument Scope:")} {node.default_argument_scope}',
f'{Terminal.bold("Task Arguments:")} {node.explicit_args}',
f'Effective Arguments: {node.effective_args}',
)
print(Terminal.bold('Execution Sequence:'))
print(f'{Terminal.bold("Global Arguments:")} {self.entrypoint.global_args}')
print(f'{Terminal.bold("Working Directory:")} {self._working_directory}')
preorder_traversal(root)
return self
def main():
# Parse arguments.
parse_workflow_args()
# Parse the workflow.
workflow_spec = json.loads(Path(ARGUMENTS.manifest).read_text())
# Run the workflow.
runner = WorkflowRunner(
workflow_spec['entities'],
workflow_spec['entrypoint'],
ARGUMENTS.workflow_arguments,
)
# Validate user arguments.
try:
runner.validate_arguments()
except WorkflowArgumentException as e:
print(e)
runner.dump_workflow_tree()
return 2
if ARGUMENTS.dump or ARGUMENTS.verbose or ARGUMENTS.help:
runner.dump_workflow_tree()
if ARGUMENTS.dump:
return 0
try:
runner.run()
except TaskExecutionException as e:
print(e)
return 1
if __name__ == '__main__':
sys.exit(main())