blob: 59ba6a3b97d668fa5ebf52de9cf6f3fe0fc3b234 [file] [edit]
#!/usr/bin/env python
"""
Stress test pyjson5 from many concurrent OS threads.
Designed primarily to exercise the free-threaded CPython build (PEP 703).
On a GIL build it still runs as a soak test for re-entrancy / refcount safety,
just without true parallelism.
Exits 0 on success, 1 on result mismatch, 2 on exception.
"""
from __future__ import annotations
import io
import os
import random
import sys
import threading
import time
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
import pyjson5
def _make_payload(rng: random.Random, depth: int) -> Any:
"""Random nested data structure that round-trips through JSON5."""
if depth <= 0:
choice = rng.randint(0, 5)
if choice == 0:
return None
if choice == 1:
return rng.choice((True, False))
if choice == 2:
return rng.randint(-(2**40), 2**40)
if choice == 3:
# Avoid NaN/Inf here so equality checks work cleanly.
return rng.uniform(-1e9, 1e9)
if choice == 4:
# Mix of ASCII, escape-needing, and non-ASCII codepoints.
return "".join(
chr(rng.choice((0x20, 0x22, 0x5C, 0x7F, 0xE9, 0x2603, 0x1F600)))
for _ in range(rng.randint(0, 12))
)
return rng.choice(("hello", "wörld", "snow☃man", "", "quote\"and\\back"))
width = rng.randint(0, 5)
if rng.random() < 0.5:
return [_make_payload(rng, depth - 1) for _ in range(width)]
return {f"k{i}": _make_payload(rng, depth - 1) for i in range(width)}
def _worker(worker_id: int, iterations: int, seed: int, barrier: threading.Barrier) -> tuple[int, int]:
"""Round-trip random payloads through encode/decode. Returns (worker_id, ok_count)."""
rng = random.Random(seed)
# All workers start at the same moment so contention is maximised.
barrier.wait()
ok = 0
for _ in range(iterations):
payload = _make_payload(rng, depth=rng.randint(0, 4))
encoded = pyjson5.encode(payload)
decoded = pyjson5.decode(encoded)
if decoded != payload:
raise AssertionError(
f"worker {worker_id}: round-trip mismatch:\n"
f" payload={payload!r}\n encoded={encoded!r}\n decoded={decoded!r}"
)
# encode_bytes path
encoded_b = pyjson5.encode_bytes(payload)
decoded_b = pyjson5.decode_utf8(encoded_b)
if decoded_b != payload:
raise AssertionError(
f"worker {worker_id}: bytes round-trip mismatch:\n"
f" payload={payload!r}\n encoded_b={encoded_b!r}\n decoded_b={decoded_b!r}"
)
# encode_io / decode_io path
sio = io.StringIO()
pyjson5.encode_io(payload, sio, supply_bytes=False)
sio.seek(0)
decoded_io = pyjson5.decode_io(sio)
if decoded_io != payload:
raise AssertionError(
f"worker {worker_id}: io round-trip mismatch:\n"
f" payload={payload!r}\n encoded_io={sio.getvalue()!r}\n decoded_io={decoded_io!r}"
)
ok += 1
return worker_id, ok
def _shared_object_worker(shared: Any, iterations: int, barrier: threading.Barrier) -> int:
"""Encode the *same* nested object from many threads to exercise concurrent reads."""
barrier.wait()
expected = pyjson5.encode(shared)
for _ in range(iterations):
if pyjson5.encode(shared) != expected:
raise AssertionError("shared-object encode produced inconsistent output")
# Also re-decode and compare.
if pyjson5.decode(expected) != shared:
raise AssertionError("shared-object decode produced inconsistent output")
return iterations
def main() -> int:
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"--threads",
type=int,
default=max(4, (os.cpu_count() or 4) * 2),
help="number of worker threads (default: 2 * CPU count, min 4)",
)
parser.add_argument(
"--iterations",
type=int,
default=2000,
help="round-trip iterations per worker (default: 2000)",
)
parser.add_argument(
"--seed",
type=int,
default=0xDEADBEEF,
help="base RNG seed (default: 0xDEADBEEF)",
)
args = parser.parse_args()
print(f"Python: {sys.version}")
gil_disabled = getattr(sys, "_is_gil_enabled", lambda: True)() is False
print(f"GIL disabled: {gil_disabled}")
print(f"pyjson5: {pyjson5.__version__}")
print(f"Workers: {args.threads} Iterations/worker: {args.iterations}")
barrier = threading.Barrier(args.threads)
start = time.perf_counter()
try:
# Round 1: each worker has its own RNG and builds independent payloads.
with ThreadPoolExecutor(max_workers=args.threads) as pool:
futures = [
pool.submit(_worker, i, args.iterations, args.seed + i, barrier)
for i in range(args.threads)
]
total_ok = 0
for fut in as_completed(futures):
_wid, ok = fut.result()
total_ok += ok
# Round 2: all workers hammer the SAME shared object concurrently.
shared = {
"list": list(range(50)),
"nested": {"a": [1, 2, {"b": "héllo ☃"}], "c": None, "d": True},
"string": "the quick brown fox jumps over the lazy dog " * 10,
}
barrier2 = threading.Barrier(args.threads)
with ThreadPoolExecutor(max_workers=args.threads) as pool:
futures = [
pool.submit(_shared_object_worker, shared, args.iterations // 2, barrier2)
for _ in range(args.threads)
]
for fut in as_completed(futures):
fut.result()
except AssertionError as exc:
print(f"\nFAIL: {exc}", file=sys.stderr)
return 1
except Exception:
import traceback
traceback.print_exc()
return 2
elapsed = time.perf_counter() - start
print(
f"\nOK: {total_ok} round-trips across {args.threads} threads in {elapsed:.2f}s "
f"({total_ok / elapsed:.0f} ops/s)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())