blob: b383fc9dc1454ba0253b2fd00f65b51aa9f33bb1 [file] [log] [blame]
"""Basic introspection of modules."""
from __future__ import annotations
import importlib
import inspect
import os
import pkgutil
import queue
import sys
from multiprocessing import Process, Queue
from types import ModuleType
class ModuleProperties:
# Note that all __init__ args must have default values
def __init__(
self,
name: str = "",
file: str | None = None,
path: list[str] | None = None,
all: list[str] | None = None,
is_c_module: bool = False,
subpackages: list[str] | None = None,
) -> None:
self.name = name # __name__ attribute
self.file = file # __file__ attribute
self.path = path # __path__ attribute
self.all = all # __all__ attribute
self.is_c_module = is_c_module
self.subpackages = subpackages or []
def is_c_module(module: ModuleType) -> bool:
if module.__dict__.get("__file__") is None:
# Could be a namespace package. These must be handled through
# introspection, since there is no source file.
return True
return os.path.splitext(module.__dict__["__file__"])[-1] in [".so", ".pyd", ".dll"]
class InspectError(Exception):
pass
def get_package_properties(package_id: str) -> ModuleProperties:
"""Use runtime introspection to get information about a module/package."""
try:
package = importlib.import_module(package_id)
except BaseException as e:
raise InspectError(str(e)) from e
name = getattr(package, "__name__", package_id)
file = getattr(package, "__file__", None)
path: list[str] | None = getattr(package, "__path__", None)
if not isinstance(path, list):
path = None
pkg_all = getattr(package, "__all__", None)
if pkg_all is not None:
try:
pkg_all = list(pkg_all)
except Exception:
pkg_all = None
is_c = is_c_module(package)
if path is None:
# Object has no path; this means it's either a module inside a package
# (and thus no sub-packages), or it could be a C extension package.
if is_c:
# This is a C extension module, now get the list of all sub-packages
# using the inspect module
subpackages = [
package.__name__ + "." + name
for name, val in inspect.getmembers(package)
if inspect.ismodule(val) and val.__name__ == package.__name__ + "." + name
]
else:
# It's a module inside a package. There's nothing else to walk/yield.
subpackages = []
else:
all_packages = pkgutil.walk_packages(
path, prefix=package.__name__ + ".", onerror=lambda r: None
)
subpackages = [qualified_name for importer, qualified_name, ispkg in all_packages]
return ModuleProperties(
name=name, file=file, path=path, all=pkg_all, is_c_module=is_c, subpackages=subpackages
)
def worker(tasks: Queue[str], results: Queue[str | ModuleProperties], sys_path: list[str]) -> None:
"""The main loop of a worker introspection process."""
sys.path = sys_path
while True:
mod = tasks.get()
try:
prop = get_package_properties(mod)
except InspectError as e:
results.put(str(e))
continue
results.put(prop)
class ModuleInspect:
"""Perform runtime introspection of modules in a separate process.
Reuse the process for multiple modules for efficiency. However, if there is an
error, retry using a fresh process to avoid cross-contamination of state between
modules.
We use a separate process to isolate us from many side effects. For example, the
import of a module may kill the current process, and we want to recover from that.
Always use in a with statement for proper clean-up:
with ModuleInspect() as m:
p = m.get_package_properties('urllib.parse')
"""
def __init__(self) -> None:
self._start()
def _start(self) -> None:
self.tasks: Queue[str] = Queue()
self.results: Queue[ModuleProperties | str] = Queue()
self.proc = Process(target=worker, args=(self.tasks, self.results, sys.path))
self.proc.start()
self.counter = 0 # Number of successful roundtrips
def close(self) -> None:
"""Free any resources used."""
self.proc.terminate()
def get_package_properties(self, package_id: str) -> ModuleProperties:
"""Return some properties of a module/package using runtime introspection.
Raise InspectError if the target couldn't be imported.
"""
self.tasks.put(package_id)
res = self._get_from_queue()
if res is None:
# The process died; recover and report error.
self._start()
raise InspectError(f"Process died when importing {package_id!r}")
if isinstance(res, str):
# Error importing module
if self.counter > 0:
# Also try with a fresh process. Maybe one of the previous imports has
# corrupted some global state.
self.close()
self._start()
return self.get_package_properties(package_id)
raise InspectError(res)
self.counter += 1
return res
def _get_from_queue(self) -> ModuleProperties | str | None:
"""Get value from the queue.
Return the value read from the queue, or None if the process unexpectedly died.
"""
max_iter = 600
n = 0
while True:
if n == max_iter:
raise RuntimeError("Timeout waiting for subprocess")
try:
return self.results.get(timeout=0.05)
except queue.Empty:
if not self.proc.is_alive():
return None
n += 1
def __enter__(self) -> ModuleInspect:
return self
def __exit__(self, *args: object) -> None:
self.close()