blob: 88cc733a399f7e396ca2d180371800ba95fa7a03 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
class AssemblyLine(object):
"""A class for passing data through a chain of threads or processes,
assembly-line style.
Attributes:
nodes: A list of AssemblyLine.Nodes that pass data from one node to the
next.
"""
class Node(object):
"""A Node in an AssemblyLine.
Each node is composed of the following:
input_stream output_stream
==============> [ transformer ] ===============>
Attributes:
transformer: The Transformer that takes input from the input
stream, transforms the data, and sends it to the output stream.
input_stream: The stream of data to be taken in as input to this
transformer. This stream is the stream to be registered as the
previous node's output stream.
Properties:
output_stream: The stream of data to be passed to the next node.
"""
def __init__(self, transformer=None, input_stream=None):
self.transformer = transformer
self.input_stream = input_stream
@property
def output_stream(self):
return self.transformer.output_stream
@output_stream.setter
def output_stream(self, value):
self.transformer.output_stream = value
def __init__(self, nodes):
"""Initializes an AssemblyLine class.
nodes:
A list of AssemblyLine.Node objects.
"""
self.nodes = nodes
def run(self):
"""Runs the AssemblyLine, passing the data between each work node."""
raise NotImplementedError()
class ProcessAssemblyLine(AssemblyLine):
"""An AssemblyLine that uses processes to schedule work on nodes."""
def run(self):
"""Runs the AssemblyLine within a process pool."""
if not self.nodes:
# If self.nodes is empty, it will create a multiprocessing.Pool of
# 0 nodes, which raises a ValueError.
return
process_pool = multiprocessing.Pool(processes=len(self.nodes))
for node in self.nodes:
process_pool.apply_async(node.transformer.transform,
[node.input_stream])
process_pool.close()
process_pool.join()
class ThreadAssemblyLine(AssemblyLine):
"""An AssemblyLine that uses threading to schedule work on nodes."""
def run(self):
"""Runs the AssemblyLine within a thread pool."""
with ThreadPoolExecutor(max_workers=len(self.nodes)) as thread_pool:
for node in self.nodes:
thread_pool.submit(node.transformer.transform,
node.input_stream)
class AssemblyLineBuilder(object):
"""An abstract class that builds an AssemblyLine object.
Attributes:
_assembly_line_generator: The callable that creates the AssemblyLine.
Should be in the form of:
Args:
A list of AssemblyLine.Node objects.
Returns:
An AssemblyLine object.
_queue_generator: The callable that creates new queues to be used for
BufferStreams. Should be in the form of:
Args:
None.
Returns:
A Queue object.
"""
def __init__(self, queue_generator, assembly_line_generator):
"""Creates an AssemblyLineBuilder.
Args:
queue_generator: A callable of type lambda: Queue().
assembly_line_generator: A callable of type
lambda list<AssemblyLine.Node>: AssemblyLine.
"""
super().__init__()
self._assembly_line_generator = assembly_line_generator
self._queue_generator = queue_generator
self.nodes = []
self._built = False
@property
def built(self):
return self._built
def __generate_queue(self):
"""Returns a new Queue object for passing information between nodes."""
return self._queue_generator()
@property
def queue_generator(self):
"""Returns the callable used for generating queues."""
return self._queue_generator
def source(self, transformer, input_stream=None):
"""Adds a SourceTransformer to the AssemblyLine.
Must be the first function call on the AssemblyLineBuilder.
Args:
transformer: The SourceTransformer that generates data for the
AssemblyLine to process.
input_stream: The input stream to use, if necessary.
Raises:
ValueError if source is not the first transformer to be added to
the AssemblyLine, or the AssemblyLine has been built.
"""
if self.nodes:
raise ValueError('AssemblyLines can only have a single source.')
if input_stream is None:
input_stream = DevNullBufferStream()
self.nodes.append(AssemblyLine.Node(transformer, input_stream))
return self
def into(self, transformer):
"""Adds the given transformer next in the AssemblyLine.
Args:
transformer: The transformer next in the AssemblyLine.
Raises:
ValueError if no source node is set, or the AssemblyLine has been
built.
"""
if not self.nodes:
raise ValueError('The source transformer must be set first.')
if self.built:
raise ValueError('Cannot add additional nodes after the '
'AssemblyLine has been built.')
stream = BufferStream(self.__generate_queue())
self.nodes[-1].transformer.set_output_stream(stream)
self.nodes.append(AssemblyLine.Node(transformer, stream))
return self
def build(self, output_stream=None):
"""Builds the AssemblyLine object.
Note that after this function is called this AssemblyLineBuilder cannot
be used again, as it is already marked as built.
"""
if self.built:
raise ValueError('The AssemblyLine is already built.')
if not self.nodes:
raise ValueError('Cannot create an empty assembly line.')
self._built = True
if output_stream is None:
output_stream = DevNullBufferStream()
self.nodes[-1].output_stream = output_stream
return self._assembly_line_generator(self.nodes)
class ThreadAssemblyLineBuilder(AssemblyLineBuilder):
"""An AssemblyLineBuilder for generating ThreadAssemblyLines."""
def __init__(self, queue_generator=queue.Queue):
super().__init__(queue_generator, ThreadAssemblyLine)
class ProcessAssemblyLineBuilder(AssemblyLineBuilder):
"""An AssemblyLineBuilder for ProcessAssemblyLines.
Attributes:
manager: The multiprocessing.Manager used for having queues communicate
with one another over multiple processes.
"""
def __init__(self):
self.manager = multiprocessing.Manager()
super().__init__(self.manager.Queue, ProcessAssemblyLine)
class IndexedBuffer(object):
"""A buffer indexed with the order it was generated in."""
def __init__(self, index, size_or_buffer):
"""Creates an IndexedBuffer.
Args:
index: The integer index associated with the buffer.
size_or_buffer:
either:
An integer specifying the number of slots in the buffer OR
A list to be used as a buffer.
"""
self.index = index
if isinstance(size_or_buffer, int):
self.buffer = [None] * size_or_buffer
else:
self.buffer = size_or_buffer
class BufferList(list):
"""A list of Buffers.
This type is useful for differentiating when a buffer has been returned
from a transformer, vs when a list of buffers has been returned from a
transformer.
"""
class BufferStream(object):
"""An object that acts as a stream between two transformers."""
# The object passed to the buffer queue to signal the end-of-stream.
END = None
def __init__(self, buffer_queue):
"""Creates a new BufferStream.
Args:
buffer_queue: A Queue object used to pass data along the
BufferStream.
"""
self._buffer_queue = buffer_queue
def initialize(self):
"""Initializes the stream.
When running BufferStreams through multiprocessing, initialize must
only be called on the process using the BufferStream.
"""
# Here we need to make any call to the stream to initialize it. This
# makes read and write times for the first buffer faster, preventing
# the data at the beginning from being dropped.
self._buffer_queue.qsize()
def end_stream(self):
"""Closes the stream.
By convention, a None object is used, mirroring file reads returning
an empty string when the end of file is reached.
"""
self._buffer_queue.put(None, block=False)
def add_indexed_buffer(self, buffer):
"""Adds the given buffer to the buffer stream."""
self._buffer_queue.put(buffer, block=False)
def remove_indexed_buffer(self):
"""Removes an indexed buffer from the array.
This operation blocks until data is received.
Returns:
an IndexedBuffer.
"""
return self._buffer_queue.get()
class DevNullBufferStream(BufferStream):
"""A BufferStream that is always empty."""
def __init__(self, *_):
super().__init__(None)
def initialize(self):
"""Does nothing. Nothing to initialize."""
def end_stream(self):
"""Does nothing. The stream always returns end-of-stream when read."""
def add_indexed_buffer(self, buffer):
"""Imitating /dev/null, nothing will be written to the stream."""
def remove_indexed_buffer(self):
"""Always returns the end-of-stream marker."""
return None