blob: 64a8c85f101db01320f3158ffdbeccf0b9687a90 [file] [log] [blame]
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""
A pytest plugin to record test times and then use those times to divide tests
into evenly balanced workloads for each xdist worker.
Two things are hard-coded here that shouldn't be:
- The timing data is written to the tmp directory, but should use the pytest
cache (https://docs.pytest.org/en/latest/how-to/cache.html).
- The number of xdist workers is hard-coded to 8 because I couldn't figure out
how to find the number. Would it be crazy to read the -n argument directly?
You can force some tests to run on the same worker by setting the
`balanced_clumps` setting in your pytest config file. Each line is a substring
of a test name. All tests with that substring (like -k) will run on the
worker:
balanced_clumps =
LongRunningFixture
some_other_test_substring
"""
import collections
import csv
import os
import shutil
import time
from pathlib import Path
import pytest
import xdist.scheduler
def pytest_addoption(parser):
"""Auto-called to define ini-file settings."""
parser.addini(
"balanced_clumps",
type="linelist",
help="Test substrings to assign to the same worker",
)
@pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
"""Registers our pytest plugin."""
config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin")
class BalanceXdistPlugin: # pragma: debugging
"""The plugin"""
def __init__(self, config):
self.config = config
self.running_all = (self.config.getoption("-k") == "")
self.times = collections.defaultdict(float)
self.worker = os.getenv("PYTEST_XDIST_WORKER", "none")
self.tests_csv = None
def pytest_sessionstart(self, session):
"""Called once before any tests are run, but in every worker."""
if not self.running_all:
return
tests_csv_dir = session.startpath.resolve() / "tmp/tests_csv"
self.tests_csv = tests_csv_dir / f"{self.worker}.csv"
if self.worker == "none":
if tests_csv_dir.exists():
for csv_file in tests_csv_dir.iterdir():
with csv_file.open(newline="") as fcsv:
reader = csv.reader(fcsv)
for row in reader:
self.times[row[1]] += float(row[3])
shutil.rmtree(tests_csv_dir)
def write_duration_row(self, item, phase, duration):
"""Helper to write a row to the tracked-test csv file."""
if self.running_all:
self.tests_csv.parent.mkdir(parents=True, exist_ok=True)
with self.tests_csv.open("a", newline="") as fcsv:
csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration])
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item):
"""Run once for each test."""
start = time.time()
yield
self.write_duration_row(item, "setup", time.time() - start)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(self, item):
"""Run once for each test."""
start = time.time()
yield
self.write_duration_row(item, "call", time.time() - start)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(self, item):
"""Run once for each test."""
start = time.time()
yield
self.write_duration_row(item, "teardown", time.time() - start)
@pytest.hookimpl(trylast=True)
def pytest_xdist_make_scheduler(self, config, log):
"""Create our BalancedScheduler using time data from the last run."""
# Assign tests to chunks
nchunks = 8
totals = [0] * nchunks
tests = collections.defaultdict(set)
# first put the difficult ones all in one worker
clumped = set()
clumps = config.getini("balanced_clumps")
for i, clump_word in enumerate(clumps):
clump_nodes = {nodeid for nodeid in self.times.keys() if clump_word in nodeid}
i %= nchunks
tests[i].update(clump_nodes)
totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes)
clumped.update(clump_nodes)
# Then assign the rest in descending order
rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped]
rest.sort(key=lambda item: item[1], reverse=True)
for nodeid, t in rest:
lightest = min(enumerate(totals), key=lambda pair: pair[1])[0]
tests[lightest].add(nodeid)
totals[lightest] += t
test_chunks = {}
for chunk_id, nodeids in tests.items():
for nodeid in nodeids:
test_chunks[nodeid] = chunk_id
return BalancedScheduler(config, log, clumps, test_chunks)
class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging
"""A balanced-chunk test scheduler for pytest-xdist."""
def __init__(self, config, log, clumps, test_chunks):
super().__init__(config, log)
self.clumps = clumps
self.test_chunks = test_chunks
def _split_scope(self, nodeid):
"""Assign a chunk id to a test node."""
# If we have a chunk assignment for this node, return it.
scope = self.test_chunks.get(nodeid)
if scope is not None:
return scope
# If this is a node that should be clumped, clump it.
for i, clump_word in enumerate(self.clumps):
if clump_word in nodeid:
return f"clump{i}"
# Otherwise every node is a separate chunk.
return nodeid
# Run this with:
# python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()"
def show_worker_times(): # pragma: debugging
"""Ad-hoc utility to show data from the last tracked-test run."""
times = collections.defaultdict(float)
tests = collections.defaultdict(int)
tests_csv_dir = Path("tmp/tests_csv")
for csv_file in tests_csv_dir.iterdir():
with csv_file.open(newline="") as fcsv:
reader = csv.reader(fcsv)
for row in reader:
worker = row[0]
duration = float(row[3])
times[worker] += duration
if row[2] == "call":
tests[worker] += 1
for worker in sorted(tests.keys()):
print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}")
total = sum(times.values())
avg = total / len(times)
print(f"total: {total:.2f}, avg: {avg:.2f}")
lo = min(times.values())
hi = max(times.values())
print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}")