| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os |
| import shlex |
| import signal |
| import subprocess |
| import sys |
| import time |
| from threading import Thread |
| |
| _on_windows = sys.platform == "win32" |
| |
| |
| class ProcessError(Exception): |
| """Raised when invalid operations are run on a Process.""" |
| |
| |
| class Process(object): |
| """A Process object used to run various commands. |
| |
| Attributes: |
| _command: The initial command to run. |
| _subprocess_kwargs: The kwargs to send to Popen for more control over |
| execution. |
| _process: The subprocess.Popen object currently executing a process. |
| _listening_thread: The thread that is listening for the process to stop. |
| _redirection_thread: The thread that is redirecting process output. |
| _on_output_callback: The callback to call when output is received. |
| _on_terminate_callback: The callback to call when the process terminates |
| without stop() being called first. |
| _started: Whether or not start() was called. |
| _stopped: Whether or not stop() was called. |
| """ |
| |
| def __init__(self, command, **kwargs): |
| """Creates a Process object. |
| |
| Note that this constructor does not begin the process. To start the |
| process, use Process.start(). |
| """ |
| # Split command string into list if shell=True is not specified |
| self._use_shell = kwargs.get("shell", False) |
| if not self._use_shell and isinstance(command, str): |
| command = shlex.split(command) |
| self._command = command |
| self._subprocess_kwargs = kwargs |
| if _on_windows: |
| self._subprocess_kwargs[ |
| "creationflags" |
| ] = subprocess.CREATE_NEW_PROCESS_GROUP |
| else: |
| self._subprocess_kwargs["start_new_session"] = True |
| self._process = None |
| |
| self._listening_thread = None |
| self._redirection_thread = None |
| self._on_output_callback = lambda *args, **kw: None |
| self._binary_output = False |
| self._on_terminate_callback = lambda *args, **kw: "" |
| |
| self._started = False |
| self._stopped = False |
| |
| def set_on_output_callback(self, on_output_callback, binary=False): |
| """Sets the on_output_callback function. |
| |
| Args: |
| on_output_callback: The function to be called when output is sent to |
| the output. The output callback has the following signature: |
| |
| >>> def on_output_callback(output_line): |
| >>> return None |
| |
| binary: If True, read the process output as raw binary. |
| Returns: |
| self |
| """ |
| self._on_output_callback = on_output_callback |
| self._binary_output = binary |
| return self |
| |
| def set_on_terminate_callback(self, on_terminate_callback): |
| """Sets the on_self_terminate callback function. |
| |
| Args: |
| on_terminate_callback: The function to be called when the process |
| has terminated on its own. The callback has the following |
| signature: |
| |
| >>> def on_self_terminate_callback(popen_process): |
| >>> return 'command to run' or None |
| |
| If a string is returned, the string returned will be the command |
| line used to run the command again. If None is returned, the |
| process will end without restarting. |
| |
| Returns: |
| self |
| """ |
| self._on_terminate_callback = on_terminate_callback |
| return self |
| |
| def start(self): |
| """Starts the process's execution.""" |
| if self._started: |
| raise ProcessError("Process has already started.") |
| self._started = True |
| self._process = None |
| |
| self._listening_thread = Thread(target=self._exec_loop) |
| self._listening_thread.start() |
| |
| time_up_at = time.time() + 1 |
| |
| while self._process is None: |
| if time.time() > time_up_at: |
| raise OSError("Unable to open process!") |
| |
| self._stopped = False |
| |
| @staticmethod |
| def _get_timeout_left(timeout, start_time): |
| return max(0.1, timeout - (time.time() - start_time)) |
| |
| def is_running(self): |
| """Checks that the underlying Popen process is still running |
| |
| Returns: |
| True if the process is running. |
| """ |
| return self._process is not None and self._process.poll() is None |
| |
| def _join_threads(self): |
| """Waits for the threads associated with the process to terminate.""" |
| if self._listening_thread is not None: |
| self._listening_thread.join() |
| self._listening_thread = None |
| |
| if self._redirection_thread is not None: |
| self._redirection_thread.join() |
| self._redirection_thread = None |
| |
| def _kill_process(self): |
| """Kills the underlying process/process group. Implementation is |
| platform-dependent.""" |
| if _on_windows: |
| subprocess.check_call(f"taskkill /F /T /PID {self._process.pid}") |
| else: |
| self.signal(signal.SIGKILL) |
| |
| def wait(self, kill_timeout=60.0): |
| """Waits for the process to finish execution. |
| |
| If the process has reached the kill_timeout, the process will be killed |
| instead. |
| |
| Note: the on_self_terminate callback will NOT be called when calling |
| this function. |
| |
| Args: |
| kill_timeout: The amount of time to wait until killing the process. |
| """ |
| if self._stopped: |
| raise ProcessError("Process is already being stopped.") |
| self._stopped = True |
| |
| try: |
| self._process.wait(kill_timeout) |
| except subprocess.TimeoutExpired: |
| self._kill_process() |
| finally: |
| self._join_threads() |
| self._started = False |
| |
| def signal(self, sig): |
| """Sends a signal to the process. |
| |
| Args: |
| sig: The signal to be sent. |
| """ |
| if _on_windows: |
| raise ProcessError("Unable to call Process.signal on windows.") |
| |
| pgid = os.getpgid(self._process.pid) |
| os.killpg(pgid, sig) |
| |
| def stop(self): |
| """Stops the process. |
| |
| This command is effectively equivalent to kill, but gives time to clean |
| up any related work on the process, such as output redirection. |
| |
| Note: the on_self_terminate callback will NOT be called when calling |
| this function. |
| """ |
| self.wait(0) |
| |
| def _redirect_output(self): |
| """Redirects the output from the command into the on_output_callback.""" |
| if self._binary_output: |
| while True: |
| data = self._process.stdout.read(1024) |
| |
| if not data: |
| return |
| else: |
| self._on_output_callback(data) |
| else: |
| while True: |
| line = self._process.stdout.readline().decode("utf-8", errors="replace") |
| |
| if not line: |
| return |
| else: |
| # Output the line without trailing \n and whitespace. |
| self._on_output_callback(line.rstrip()) |
| |
| @staticmethod |
| def __start_process(command, **kwargs): |
| """A convenient wrapper function for starting the process.""" |
| acts_logger = logging.getLogger() |
| acts_logger.debug('Starting command "%s" with kwargs %s', command, kwargs) |
| return subprocess.Popen(command, **kwargs) |
| |
| def _exec_loop(self): |
| """Executes Popen in a loop. |
| |
| When Popen terminates without stop() being called, |
| self._on_terminate_callback() will be called. The returned value from |
| _on_terminate_callback will then be used to determine if the loop should |
| continue and start up the process again. See set_on_terminate_callback() |
| for more information. |
| """ |
| command = self._command |
| while True: |
| self._process = self.__start_process( |
| command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| bufsize=1, |
| **self._subprocess_kwargs, |
| ) |
| self._redirection_thread = Thread(target=self._redirect_output) |
| self._redirection_thread.start() |
| self._process.wait() |
| |
| if self._stopped: |
| logging.debug("The process for command %s was stopped.", command) |
| break |
| else: |
| logging.debug("The process for command %s terminated.", command) |
| # Wait for all output to be processed before sending |
| # _on_terminate_callback() |
| self._redirection_thread.join() |
| logging.debug("Beginning on_terminate_callback for %s.", command) |
| retry_value = self._on_terminate_callback(self._process) |
| if retry_value: |
| if not self._use_shell and isinstance(retry_value, str): |
| retry_value = shlex.split(retry_value) |
| command = retry_value |
| else: |
| break |