blob: b86d941d9389fe7cd60cb338a12e00b4e513c3f6 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shlex
import signal
import subprocess
import sys
import time
from threading import Thread
_on_windows = sys.platform == "win32"
class ProcessError(Exception):
"""Raised when invalid operations are run on a Process."""
class Process(object):
"""A Process object used to run various commands.
Attributes:
_command: The initial command to run.
_subprocess_kwargs: The kwargs to send to Popen for more control over
execution.
_process: The subprocess.Popen object currently executing a process.
_listening_thread: The thread that is listening for the process to stop.
_redirection_thread: The thread that is redirecting process output.
_on_output_callback: The callback to call when output is received.
_on_terminate_callback: The callback to call when the process terminates
without stop() being called first.
_started: Whether or not start() was called.
_stopped: Whether or not stop() was called.
"""
def __init__(self, command, **kwargs):
"""Creates a Process object.
Note that this constructor does not begin the process. To start the
process, use Process.start().
"""
# Split command string into list if shell=True is not specified
self._use_shell = kwargs.get("shell", False)
if not self._use_shell and isinstance(command, str):
command = shlex.split(command)
self._command = command
self._subprocess_kwargs = kwargs
if _on_windows:
self._subprocess_kwargs[
"creationflags"
] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
self._subprocess_kwargs["start_new_session"] = True
self._process = None
self._listening_thread = None
self._redirection_thread = None
self._on_output_callback = lambda *args, **kw: None
self._binary_output = False
self._on_terminate_callback = lambda *args, **kw: ""
self._started = False
self._stopped = False
def set_on_output_callback(self, on_output_callback, binary=False):
"""Sets the on_output_callback function.
Args:
on_output_callback: The function to be called when output is sent to
the output. The output callback has the following signature:
>>> def on_output_callback(output_line):
>>> return None
binary: If True, read the process output as raw binary.
Returns:
self
"""
self._on_output_callback = on_output_callback
self._binary_output = binary
return self
def set_on_terminate_callback(self, on_terminate_callback):
"""Sets the on_self_terminate callback function.
Args:
on_terminate_callback: The function to be called when the process
has terminated on its own. The callback has the following
signature:
>>> def on_self_terminate_callback(popen_process):
>>> return 'command to run' or None
If a string is returned, the string returned will be the command
line used to run the command again. If None is returned, the
process will end without restarting.
Returns:
self
"""
self._on_terminate_callback = on_terminate_callback
return self
def start(self):
"""Starts the process's execution."""
if self._started:
raise ProcessError("Process has already started.")
self._started = True
self._process = None
self._listening_thread = Thread(target=self._exec_loop)
self._listening_thread.start()
time_up_at = time.time() + 1
while self._process is None:
if time.time() > time_up_at:
raise OSError("Unable to open process!")
self._stopped = False
@staticmethod
def _get_timeout_left(timeout, start_time):
return max(0.1, timeout - (time.time() - start_time))
def is_running(self):
"""Checks that the underlying Popen process is still running
Returns:
True if the process is running.
"""
return self._process is not None and self._process.poll() is None
def _join_threads(self):
"""Waits for the threads associated with the process to terminate."""
if self._listening_thread is not None:
self._listening_thread.join()
self._listening_thread = None
if self._redirection_thread is not None:
self._redirection_thread.join()
self._redirection_thread = None
def _kill_process(self):
"""Kills the underlying process/process group. Implementation is
platform-dependent."""
if _on_windows:
subprocess.check_call(f"taskkill /F /T /PID {self._process.pid}")
else:
self.signal(signal.SIGKILL)
def wait(self, kill_timeout=60.0):
"""Waits for the process to finish execution.
If the process has reached the kill_timeout, the process will be killed
instead.
Note: the on_self_terminate callback will NOT be called when calling
this function.
Args:
kill_timeout: The amount of time to wait until killing the process.
"""
if self._stopped:
raise ProcessError("Process is already being stopped.")
self._stopped = True
try:
self._process.wait(kill_timeout)
except subprocess.TimeoutExpired:
self._kill_process()
finally:
self._join_threads()
self._started = False
def signal(self, sig):
"""Sends a signal to the process.
Args:
sig: The signal to be sent.
"""
if _on_windows:
raise ProcessError("Unable to call Process.signal on windows.")
pgid = os.getpgid(self._process.pid)
os.killpg(pgid, sig)
def stop(self):
"""Stops the process.
This command is effectively equivalent to kill, but gives time to clean
up any related work on the process, such as output redirection.
Note: the on_self_terminate callback will NOT be called when calling
this function.
"""
self.wait(0)
def _redirect_output(self):
"""Redirects the output from the command into the on_output_callback."""
if self._binary_output:
while True:
data = self._process.stdout.read(1024)
if not data:
return
else:
self._on_output_callback(data)
else:
while True:
line = self._process.stdout.readline().decode("utf-8", errors="replace")
if not line:
return
else:
# Output the line without trailing \n and whitespace.
self._on_output_callback(line.rstrip())
@staticmethod
def __start_process(command, **kwargs):
"""A convenient wrapper function for starting the process."""
acts_logger = logging.getLogger()
acts_logger.debug('Starting command "%s" with kwargs %s', command, kwargs)
return subprocess.Popen(command, **kwargs)
def _exec_loop(self):
"""Executes Popen in a loop.
When Popen terminates without stop() being called,
self._on_terminate_callback() will be called. The returned value from
_on_terminate_callback will then be used to determine if the loop should
continue and start up the process again. See set_on_terminate_callback()
for more information.
"""
command = self._command
while True:
self._process = self.__start_process(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
**self._subprocess_kwargs,
)
self._redirection_thread = Thread(target=self._redirect_output)
self._redirection_thread.start()
self._process.wait()
if self._stopped:
logging.debug("The process for command %s was stopped.", command)
break
else:
logging.debug("The process for command %s terminated.", command)
# Wait for all output to be processed before sending
# _on_terminate_callback()
self._redirection_thread.join()
logging.debug("Beginning on_terminate_callback for %s.", command)
retry_value = self._on_terminate_callback(self._process)
if retry_value:
if not self._use_shell and isinstance(retry_value, str):
retry_value = shlex.split(retry_value)
command = retry_value
else:
break