| # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
| # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt |
| |
| """ |
| A pytest plugin to record test times and then use those times to divide tests |
| into evenly balanced workloads for each xdist worker. |
| |
| Two things are hard-coded here that shouldn't be: |
| |
| - The timing data is written to the tmp directory, but should use the pytest |
| cache (https://docs.pytest.org/en/latest/how-to/cache.html). |
| |
| - The number of xdist workers is hard-coded to 8 because I couldn't figure out |
| how to find the number. Would it be crazy to read the -n argument directly? |
| |
| You can force some tests to run on the same worker by setting the |
| `balanced_clumps` setting in your pytest config file. Each line is a substring |
| of a test name. All tests with that substring (like -k) will run on the |
| worker: |
| |
| balanced_clumps = |
| LongRunningFixture |
| some_other_test_substring |
| |
| """ |
| |
| import collections |
| import csv |
| import os |
| import shutil |
| import time |
| |
| from pathlib import Path |
| |
| import pytest |
| import xdist.scheduler |
| |
| |
| def pytest_addoption(parser): |
| """Auto-called to define ini-file settings.""" |
| parser.addini( |
| "balanced_clumps", |
| type="linelist", |
| help="Test substrings to assign to the same worker", |
| ) |
| |
| @pytest.hookimpl(tryfirst=True) |
| def pytest_configure(config): |
| """Registers our pytest plugin.""" |
| config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin") |
| |
| |
| class BalanceXdistPlugin: # pragma: debugging |
| """The plugin""" |
| |
| def __init__(self, config): |
| self.config = config |
| self.running_all = (self.config.getoption("-k") == "") |
| self.times = collections.defaultdict(float) |
| self.worker = os.getenv("PYTEST_XDIST_WORKER", "none") |
| self.tests_csv = None |
| |
| def pytest_sessionstart(self, session): |
| """Called once before any tests are run, but in every worker.""" |
| if not self.running_all: |
| return |
| |
| tests_csv_dir = session.startpath.resolve() / "tmp/tests_csv" |
| self.tests_csv = tests_csv_dir / f"{self.worker}.csv" |
| |
| if self.worker == "none": |
| if tests_csv_dir.exists(): |
| for csv_file in tests_csv_dir.iterdir(): |
| with csv_file.open(newline="") as fcsv: |
| reader = csv.reader(fcsv) |
| for row in reader: |
| self.times[row[1]] += float(row[3]) |
| shutil.rmtree(tests_csv_dir) |
| |
| def write_duration_row(self, item, phase, duration): |
| """Helper to write a row to the tracked-test csv file.""" |
| if self.running_all: |
| self.tests_csv.parent.mkdir(parents=True, exist_ok=True) |
| with self.tests_csv.open("a", newline="") as fcsv: |
| csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration]) |
| |
| @pytest.hookimpl(hookwrapper=True) |
| def pytest_runtest_setup(self, item): |
| """Run once for each test.""" |
| start = time.time() |
| yield |
| self.write_duration_row(item, "setup", time.time() - start) |
| |
| @pytest.hookimpl(hookwrapper=True) |
| def pytest_runtest_call(self, item): |
| """Run once for each test.""" |
| start = time.time() |
| yield |
| self.write_duration_row(item, "call", time.time() - start) |
| |
| @pytest.hookimpl(hookwrapper=True) |
| def pytest_runtest_teardown(self, item): |
| """Run once for each test.""" |
| start = time.time() |
| yield |
| self.write_duration_row(item, "teardown", time.time() - start) |
| |
| @pytest.hookimpl(trylast=True) |
| def pytest_xdist_make_scheduler(self, config, log): |
| """Create our BalancedScheduler using time data from the last run.""" |
| # Assign tests to chunks |
| nchunks = 8 |
| totals = [0] * nchunks |
| tests = collections.defaultdict(set) |
| |
| # first put the difficult ones all in one worker |
| clumped = set() |
| clumps = config.getini("balanced_clumps") |
| for i, clump_word in enumerate(clumps): |
| clump_nodes = {nodeid for nodeid in self.times.keys() if clump_word in nodeid} |
| i %= nchunks |
| tests[i].update(clump_nodes) |
| totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes) |
| clumped.update(clump_nodes) |
| |
| # Then assign the rest in descending order |
| rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped] |
| rest.sort(key=lambda item: item[1], reverse=True) |
| for nodeid, t in rest: |
| lightest = min(enumerate(totals), key=lambda pair: pair[1])[0] |
| tests[lightest].add(nodeid) |
| totals[lightest] += t |
| |
| test_chunks = {} |
| for chunk_id, nodeids in tests.items(): |
| for nodeid in nodeids: |
| test_chunks[nodeid] = chunk_id |
| |
| return BalancedScheduler(config, log, clumps, test_chunks) |
| |
| |
| class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging |
| """A balanced-chunk test scheduler for pytest-xdist.""" |
| def __init__(self, config, log, clumps, test_chunks): |
| super().__init__(config, log) |
| self.clumps = clumps |
| self.test_chunks = test_chunks |
| |
| def _split_scope(self, nodeid): |
| """Assign a chunk id to a test node.""" |
| # If we have a chunk assignment for this node, return it. |
| scope = self.test_chunks.get(nodeid) |
| if scope is not None: |
| return scope |
| |
| # If this is a node that should be clumped, clump it. |
| for i, clump_word in enumerate(self.clumps): |
| if clump_word in nodeid: |
| return f"clump{i}" |
| |
| # Otherwise every node is a separate chunk. |
| return nodeid |
| |
| |
| # Run this with: |
| # python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()" |
| def show_worker_times(): # pragma: debugging |
| """Ad-hoc utility to show data from the last tracked-test run.""" |
| times = collections.defaultdict(float) |
| tests = collections.defaultdict(int) |
| tests_csv_dir = Path("tmp/tests_csv") |
| |
| for csv_file in tests_csv_dir.iterdir(): |
| with csv_file.open(newline="") as fcsv: |
| reader = csv.reader(fcsv) |
| for row in reader: |
| worker = row[0] |
| duration = float(row[3]) |
| times[worker] += duration |
| if row[2] == "call": |
| tests[worker] += 1 |
| |
| for worker in sorted(tests.keys()): |
| print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}") |
| |
| total = sum(times.values()) |
| avg = total / len(times) |
| print(f"total: {total:.2f}, avg: {avg:.2f}") |
| lo = min(times.values()) |
| hi = max(times.values()) |
| print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}") |