blob: d460abfc95c6f13a4af25cb64bf3f570c3af9739 [file] [log] [blame]
# Copyright 2025 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper class to capture and process stdio streams from Python."""
import os
import pty
import sys
import threading
import typing as T
class OutputSink(object):
"""Abstract interface for redirection sinks, must be derived."""
def __init__(self) -> None:
pass
def open(self) -> None:
"""Called when opening the sink before writing anything to it."""
def write(self, data: bytes) -> bool:
"""Called repeatedly to write data to the sink.
Args:
data: data bytes to write to the sink.
Returns:
True on success, False on error, which will notify the caller
to stop writing immediately.
"""
raise NotImplementedError
def close(self) -> None:
"""Called when closing the sink after writing has completed."""
def __enter__(self) -> "OutputSink":
self.open()
return self
def __exit__(
self, exc_type: T.Any, exc_val: T.Any, exc_tb: T.Any
) -> T.Literal[False]:
self.close()
return False
class BytesOutputSink(OutputSink):
"""A OutputSink that stores all output in a bytes array."""
def __init__(self) -> None:
self.data = bytes()
def write(self, data: bytes) -> bool:
self.data += data
return True
class FileOutputSink(OutputSink):
"""A OutputSink that writes all outputs directly to a file."""
def __init__(self, output_path: str) -> None:
self._output_path = output_path
self._output_file: T.BinaryIO | None = None
def open(self) -> None:
os.makedirs(os.path.dirname(self._output_path), exist_ok=True)
self._output_file = open(self._output_path, "wb")
def write(self, data: bytes) -> bool:
assert self._output_file
self._output_file.write(data)
return True
def close(self) -> None:
if self._output_file:
self._output_file.close()
self._output_file = None
class StdoutOutputSink(OutputSink):
"""A OutputSink that writes to sys.stdout."""
def write(self, data: bytes) -> bool:
sys.stdout.buffer.write(data)
sys.stdout.flush()
return True
class StderrOutputSink(OutputSink):
"""A OutputSink that writes to sys.stderr."""
def write(self, data: bytes) -> bool:
sys.stderr.buffer.write(data)
sys.stderr.flush()
return True
class PipeOutputSink(OutputSink):
"""A OutputSink that wraps a pipe or pty to write to another sink.
This is mostly useful when launching a subprocess and redirecting
its stdout or stderr to the pipe's write end (or the pty slave fd).
Usage is the following:
1) Create instance, passing a final output sink. This creates
a pipe or a pty, and a background thread to read from it
and send the results to the output sink.
2) Call get_write_fd() to retrieve the write end of the pipe
(or the slave pty descriptor). This can be passed to other
function that need to it directly.
2b) Alternatively, use this as a regular sink by calling its
write() method directly.
3) The background thread is stopped automatically on close().
For example, here's how to launch a subprocess and have its stdout
redirected to a pty while recording all output to a byte buffer:
byte_sink = BytesOutputSink()
with PipeOutputSink(byte_sync, use_pty=True) as pty_sink:
subprocess.run([...], stdout=pty_sink.get_write_fd(), check=True)
... The collected output will be in byte_sink.data
"""
def __init__(self, output_sink: OutputSink, use_pty: bool = False):
"""Create instance
Args:
output_sink: Output sink that will receive data read from
the read end of the pipe. Its write() method will be
called from a background thread.
use_pty: Set to True to use a pty instead of a regular pipe.
"""
self._output_sink = output_sink
self._thread: None | threading.Thread = None
self._pipe_read_fd: int = -1
self._pipe_write_fd: int = -1
self._use_pty = use_pty
def open(self) -> None:
self._output_sink.open()
self._pipe_read_fd, self._pipe_write_fd = (
pty.openpty() if self._use_pty else os.pipe()
)
self._thread = threading.Thread(target=self._reader_thread, args=())
self._thread.start()
def get_read_fd(self) -> int:
return self._pipe_read_fd
def get_write_fd(self) -> int:
return self._pipe_write_fd
def write(self, data: bytes) -> bool:
try:
os.write(self._pipe_write_fd, data)
return True
except OSError:
return False
def _reader_thread(self) -> None:
while True:
try:
data = os.read(self._pipe_read_fd, 8192)
if not data:
return
if not self._output_sink.write(data):
return
except OSError:
return
def close(self) -> None:
if self._pipe_write_fd >= 0:
os.close(self._pipe_write_fd)
# NOTE(https://fxbug.dev/466166329): Wait for thread to finish
# reading all pipe inputs before closing the read descriptor.
if self._thread:
self._thread.join()
if self._pipe_read_fd >= 0:
os.close(self._pipe_read_fd)
self._output_sink.close()