blob: 537419f39866a843add7fb9b902e9042b807b508 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2025 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import typing as T
from concurrent.futures import ThreadPoolExecutor
U = T.TypeVar("U")
V = T.TypeVar("V")
def filter_threaded(
predicate: T.Callable[[U], bool], values: T.Iterable[U]
) -> list[U]:
"""An version of filter() that runs the filter operations in a threadpool
This asynchronously calls the predicate function for each value, and then returns the filtered
list.
"""
def _wrapper(
value: U,
) -> tuple[U, bool]:
return value, predicate(value)
with ThreadPoolExecutor() as pool:
results = pool.map(_wrapper, values)
return [value for value, result in results if result]
def map_threaded(
func: T.Callable[..., V], values: T.Iterable[T.Any]
) -> T.Iterable[V]:
"""A version of map() that executes the operations using a threadpool.
This uses a threadpool to asynchronously call the given function on the given values,
returning the results after all calls have completed.
"""
with ThreadPoolExecutor() as pool:
return pool.map(func, values)
def starmap_threaded(
func: T.Callable[..., V], args_list: T.Iterable[T.Iterable[T.Any]]
) -> T.Iterable[V]:
"""An version of map() that executes the operations using a threadpool, for functions that take multiple arguments.
This uses a threadpool to asynchronously call the given function on the given values,
returning the results after all calls have completed.
"""
def _wrapper(args: T.Iterable[T.Any]) -> V:
return func(*args)
with ThreadPoolExecutor() as pool:
return pool.map(_wrapper, args_list)