From 30b0c09d7ca3a2888bcdf256b18adf65f4a69438 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Mon, 2 Feb 2026 14:35:17 -0500 Subject: [PATCH 1/3] Initial structure to support threaded execution --- src/_pytest/fixtures.py | 25 ++- src/_pytest/main.py | 36 ++++- src/_pytest/runner.py | 60 ++++++- testing/test_runner.py | 5 +- testing/threading/test_current_test.py | 39 +++++ testing/threading/test_threading.py | 206 +++++++++++++++++++++++++ testing/threading/utils.py | 46 ++++++ 7 files changed, 409 insertions(+), 8 deletions(-) create mode 100644 testing/threading/test_current_test.py create mode 100644 testing/threading/test_threading.py create mode 100644 testing/threading/utils.py diff --git a/src/_pytest/fixtures.py b/src/_pytest/fixtures.py index 84f90f946be..f0dbc258db9 100644 --- a/src/_pytest/fixtures.py +++ b/src/_pytest/fixtures.py @@ -56,6 +56,8 @@ from _pytest.deprecated import check_ispytest from _pytest.deprecated import YIELD_FIXTURE from _pytest.main import Session +from _pytest.runner import _pytest_thread_id +from _pytest.runner import PytestThreadId from _pytest.mark import ParameterSet from _pytest.mark.structures import MarkDecorator from _pytest.outcomes import fail @@ -1013,12 +1015,31 @@ def __init__( self.argnames: Final = getfuncargnames(func, name=argname) # If the fixture was executed, the current value of the fixture. # Can change if the fixture is executed with different parameters. - self.cached_result: _FixtureCachedResult[FixtureValue] | None = None - self._finalizers: Final[list[Callable[[], object]]] = [] + self._cached_results_by_thread: dict[ + PytestThreadId | None, _FixtureCachedResult[FixtureValue] | None + ] = defaultdict(lambda: None) + self._finalizers_by_thread: dict[ + PytestThreadId | None, list[Callable[[], object]] + ] = defaultdict(list) # only used to emit a deprecationwarning, can be removed in pytest9 self._autouse = _autouse + @property + def cached_result(self) -> _FixtureCachedResult[FixtureValue] | None: + key = _pytest_thread_id.get() + return self._cached_results_by_thread[key] + + @cached_result.setter + def cached_result(self, value: _FixtureCachedResult[FixtureValue] | None) -> None: + key = _pytest_thread_id.get() + self._cached_results_by_thread[key] = value + + @property + def _finalizers(self) -> list[Callable[[], object]]: + key = _pytest_thread_id.get() + return self._finalizers_by_thread[key] + @property def scope(self) -> _ScopeName: """Scope string, one of "function", "class", "module", "package", "session".""" diff --git a/src/_pytest/main.py b/src/_pytest/main.py index 02c7fb373fd..d02d1bffad2 100644 --- a/src/_pytest/main.py +++ b/src/_pytest/main.py @@ -3,6 +3,7 @@ from __future__ import annotations import argparse +from collections import defaultdict from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator @@ -16,6 +17,7 @@ import os from pathlib import Path import sys +from threading import Thread from typing import final from typing import Literal from typing import overload @@ -43,7 +45,9 @@ from _pytest.pathlib import scandir from _pytest.reports import CollectReport from _pytest.reports import TestReport +from _pytest.runner import _pytest_thread_id from _pytest.runner import collect_one_node +from _pytest.runner import PytestThreadId from _pytest.runner import SetupState from _pytest.warning_types import PytestWarning @@ -520,6 +524,14 @@ def __missing__(self, path: Path) -> str: return r +@dataclasses.dataclass +class ThreadInfo: + thread: Thread + # note that this is the id from pytest-thread-n, not get_ident(). + pytest_thread_id: PytestThreadId + current_test_var: str | None = None + + @final class Dir(nodes.Directory): """Collector of files in a file system directory. @@ -581,8 +593,6 @@ class Session(nodes.Collector): Interrupted = Interrupted Failed = Failed - # Set on the session by runner.pytest_sessionstart. - _setupstate: SetupState # Set on the session by fixtures.pytest_sessionstart. _fixturemanager: FixtureManager exitstatus: int | ExitCode @@ -608,11 +618,33 @@ def __init__(self, config: Config) -> None: self._initial_parts: list[CollectionArgument] = [] self._collection_cache: dict[nodes.Collector, CollectReport] = {} self.items: list[nodes.Item] = [] + # info about each thread that is running pytest items + self._thread_info: dict[Thread, ThreadInfo] = {} self._bestrelpathcache: dict[Path, str] = _bestrelpath_cache(config.rootpath) self.config.pluginmanager.register(self, name="session") + self._setupstates: dict[PytestThreadId | None, SetupState] = defaultdict( + SetupState + ) + + def _thread_started(self, thread: Thread, pytest_thread_id: PytestThreadId) -> None: + if thread not in self._thread_info: + self._thread_info[thread] = ThreadInfo( + thread=thread, pytest_thread_id=pytest_thread_id + ) + + @property + def _setupstate(self) -> SetupState: + key = _pytest_thread_id.get() + return self._setupstates[key] + + @_setupstate.setter + def _setupstate(self, setupstate: SetupState) -> None: + key = _pytest_thread_id.get() + self._setupstates[key] = setupstate + @classmethod def from_config(cls, config: Config) -> Session: session: Session = cls._create(config=config) diff --git a/src/_pytest/runner.py b/src/_pytest/runner.py index d1090aace89..de0585d1661 100644 --- a/src/_pytest/runner.py +++ b/src/_pytest/runner.py @@ -5,14 +5,18 @@ import bdb from collections.abc import Callable +from contextvars import ContextVar import dataclasses import os import sys +import threading +import re import types from typing import cast from typing import final from typing import Generic from typing import Literal +from typing import NewType from typing import TYPE_CHECKING from typing import TypeVar @@ -44,6 +48,12 @@ from _pytest.main import Session from _pytest.terminal import TerminalReporter +PytestThreadId = NewType("PytestThreadId", int) + +_pytest_thread_id: ContextVar[PytestThreadId | None] = ContextVar( + "_pytest_thread_id", default=None +) + # # pytest plugin hooks. @@ -128,6 +138,20 @@ def runtestprotocol( # This only happens if the item is re-run, as is done by # pytest-rerunfailures. item._initrequest() # type: ignore[attr-defined] + + thread = threading.current_thread() + if match := re.match(r"^pytest-thread-(\d+)$", thread.name): + thread_id = PytestThreadId(match.group(1)) + _pytest_thread_id.set(thread_id) + item.session._thread_started(thread, thread_id) + elif item.session._thread_info: + raise RuntimeError( + f"pytest threads must follow the naming convention pytest-thread-{{n}}, " + f"but got thread named {thread.name!r}, which does not. " + "(Currently registered thread names: " + f"{[thread.name for thread in item.session._thread_info]!r})" + ) + rep = call_and_report(item, "setup", log) reports = [rep] if rep.passed: @@ -203,10 +227,40 @@ def _update_current_test_var( If ``when`` is None, delete ``PYTEST_CURRENT_TEST`` from the environment. """ var_name = "PYTEST_CURRENT_TEST" + value = f"{item.nodeid} ({when})" + # don't allow null bytes on environment variables (see #2644, #2957) + value = value.replace("\x00", "(null)") + + # We are not synchronizing with other threads that may be registering into + # _thread_info. Copy to avoid races. + thread_info = item.session._thread_info.copy() + if thread_info: + if when is None: + value = None + + thread = threading.current_thread() + # validated by our rejection of invalidly-named threads in runtestprotocol. + assert thread in thread_info + thread_info[thread].current_test_var = value + if any(info.current_test_var is not None for info in thread_info.values()): + os.environ[var_name] = "; ".join( + f"{info.pytest_thread_id}={info.current_test_var}" + for info in thread_info.values() + if info.current_test_var is not None + ) + else: + # all items across all threads have finished. + # + # If python switches threads right after the else, we might have a + # race with two threads executing this code. Guard against this with + # try/except. + try: + os.environ.pop(var_name) + except KeyError: + pass + return + if when: - value = f"{item.nodeid} ({when})" - # don't allow null bytes on environment variables (see #2644, #2957) - value = value.replace("\x00", "(null)") os.environ[var_name] = value else: os.environ.pop(var_name) diff --git a/testing/test_runner.py b/testing/test_runner.py index 0245438a47d..91db45cf921 100644 --- a/testing/test_runner.py +++ b/testing/test_runner.py @@ -1009,9 +1009,12 @@ def test_fix(foo): result.stdout.fnmatch_lines(["*test_fix*", "*fixture*'missing'*not found*"]) -def test_store_except_info_on_error() -> None: +def test_store_except_info_on_error(monkeypatch: MonkeyPatch) -> None: """Test that upon test failure, the exception info is stored on sys.last_traceback and friends.""" + # pytest_runtest_call calls _update_current_test_var which in turn accesses + # item.session, which doesn't exist on our mocked ItemMightRaise. + monkeypatch.setattr(runner, "_update_current_test_var", lambda *args: None) # Simulate item that might raise a specific exception, depending on `raise_error` class var class ItemMightRaise: diff --git a/testing/threading/test_current_test.py b/testing/threading/test_current_test.py new file mode 100644 index 00000000000..7128270cf2f --- /dev/null +++ b/testing/threading/test_current_test.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from testing.threading.utils import threaded_conftest, n_conftest_threads + + +def test_current_test_envvar_shows_all_threads(pytester): + pytester.makeconftest(threaded_conftest) + + # PYTEST_CURRENT_TEST should be popped from os.environ after pytest finishes + pytester.makeconftest( + """ + import os + import pytest + + def pytest_sessionfinish(): + assert "PYTEST_CURRENT_TEST" not in os.environ + """ + ) + + pytester.makepyfile( + f""" + import os + import pytest + from threading import Barrier + + barrier = Barrier({n_conftest_threads}) + + def test_1(): + assert "test_1 (call)" in os.environ["PYTEST_CURRENT_TEST"] + + def test_2(): + assert "test_2 (call)" in os.environ["PYTEST_CURRENT_TEST"] + + def test_3(): + assert "test_3 (call)" in os.environ["PYTEST_CURRENT_TEST"] + """ + ) + result = pytester.runpytest() + result.assert_outcomes(passed=3) diff --git a/testing/threading/test_threading.py b/testing/threading/test_threading.py new file mode 100644 index 00000000000..9018f39f7b6 --- /dev/null +++ b/testing/threading/test_threading.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +import sys + +import pytest +from _pytest.pytester import Pytester +from testing.threading.utils import threaded_conftest + + +def test_invalid_thread_name_raises(pytester: Pytester) -> None: + # note that there's a race condition if a thread switch happens at + # exactly the right time in the runtestprotocol thread registration / error + # checking that would make this test flaky. We force thread 0 to finish before + # starting thread 1 to deflake this. + # + # This race is not a practical concern since this warning is a proactive one + # for misguided users. + + pytester.makeconftest( + """ +import threading +from threading import Event, Barrier +from concurrent.futures import ThreadPoolExecutor + +def pytest_runtestloop(session): + thread_0_ran = Event() + barrier = Barrier(2, timeout=5) + + def worker(worker_id, item): + threading.current_thread().name = ( + f"pytest-thread-{worker_id}" + if worker_id == 0 + else "invalid_thread_name" + ) + + barrier.wait() + if worker_id == 1: + thread_0_ran.wait(timeout=5) + + item.config.hook.pytest_runtest_protocol(item=item, nextitem=None) + + if worker_id == 0: + thread_0_ran.set() + + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(worker, worker_id=i, item=session.items[i]) + for i in range(2) + ] + for future in futures: + future.result() + return True +""" + ) + + pytester.makepyfile( + """ +def test_1(): pass +def test_2(): pass +""" + ) + result = pytester.runpytest() + result.stdout.fnmatch_lines( + ["*pytest threads must follow the naming convention pytest-thread-*"] + ) + + +def test_concurrent(pytester: Pytester) -> None: + pytester.makeconftest(threaded_conftest) + pytester.makepyfile( + """ + import pytest + + def do_work(): + # arbitrary moderately-expensive work + for x in range(500): + _y = x**x + + def test1(): do_work() + def test2(): do_work() + def test3(): do_work() + def test4(): do_work() + + @pytest.fixture + def a_fixture(): + yield + + def test_fixture1(a_fixture): do_work() + def test_fixture2(a_fixture): do_work() + def test_fixture3(a_fixture): do_work() + def test_fixture4(a_fixture): do_work() + + """ + ) + result = pytester.runpytest() + result.assert_outcomes(passed=8) + + +def test_each_thread_gets_fresh_fixture_value(pytester: Pytester) -> None: + pytester.makeconftest(threaded_conftest) + pytester.makepyfile( + """ + import itertools + import pytest + import threading + + seen_values = set() + seen_values_lock = threading.Lock() + counter = itertools.count() + + @pytest.fixture + def a_fixture(): + value = next(counter) + with seen_values_lock: + assert value not in seen_values + seen_values.add(value) + return value + + def test1(a_fixture): pass + def test2(a_fixture): pass + def test3(a_fixture): pass + def test4(a_fixture): pass + """ + ) + result = pytester.runpytest() + result.assert_outcomes(passed=4) + + +def test_child_thread_gets_cached_fixture(pytester: Pytester) -> None: + pytester.makepyfile( + """ + import threading + import pytest + import itertools + + counter = itertools.count() + @pytest.fixture + def my_fixture(): + return next(counter) + + def test_child_thread_shares_fixture(request): + main_value = request.getfixturevalue("my_fixture") + child_value = None + + def child_work(): + nonlocal child_value + child_value = request.getfixturevalue("my_fixture") + + child = threading.Thread(target=child_work) + child.start() + child.join() + + assert child_value is main_value + """ + ) + result = pytester.runpytest("-v") + result.assert_outcomes(passed=1) + + +@pytest.mark.skipif( + # - thread_inherit_context is new in 3.14 + # - thread_inherit_context defaults to 1 on 3.14t+ if not passed + # - thread_inherit_context defaults to 0 on 3.14+ if not passed + ( + sys.version_info < (3, 14) + or ( + sys._xoptions["thread_inherit_context"] != "1" + if "thread_inherit_context" in sys._xoptions + else sys._is_gil_enabled() + ) + ), + reason="requires thread_inherit_context=1", +) +def test_child_thread_gets_cached_value_threaded(pytester: Pytester) -> None: + # Threads spawned under threaded execution should still inherit the fixture + # caching of the parent thread. + pytester.makeconftest(threaded_conftest) + pytester.makepyfile( + """ + import threading + import pytest + import itertools + + counter = itertools.count() + @pytest.fixture + def my_fixture(): + return next(counter) + + def test_child_thread_shares_fixture(request): + main_value = request.getfixturevalue("my_fixture") + + child_value = None + + def child_work(): + nonlocal child_value + child_value = request.getfixturevalue("my_fixture") + + child = threading.Thread(target=child_work) + child.start() + child.join() + + assert child_value is main_value + """ + ) + result = pytester.runpytest("-v") + result.assert_outcomes(passed=1) diff --git a/testing/threading/utils.py b/testing/threading/utils.py new file mode 100644 index 00000000000..c61af8c2e66 --- /dev/null +++ b/testing/threading/utils.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +n_conftest_threads = 3 + +# a conftest that runs tests in a threadpool. You can think of this as "like pytest-xdist, +# but using threads instead of processes". This is a nice conftest for flushing out +# concurrency bugs in pytest itself. +threaded_conftest = f""" +import sys +import threading +from queue import Queue, Empty +from concurrent.futures import ThreadPoolExecutor + +# make thread switches more common +sys.setswitchinterval(0.000001) +n = {n_conftest_threads} + +def pytest_runtestloop(session): + queue = Queue() + for item in session.items: + queue.put(item) + + def worker(n): + threading.current_thread().name = f"pytest-thread-{{n}}" + try: + item = queue.get_nowait() + except Empty: + return + + while item is not None: + try: + next_item = queue.get_nowait() + except Empty: + next_item = None + + item.config.hook.pytest_runtest_protocol( + item=item, nextitem=next_item + ) + item = next_item + + with ThreadPoolExecutor(max_workers=n) as executor: + futures = [executor.submit(worker, n=i) for i in range(n)] + for future in futures: + future.result() + return True +""" From 8fc0192f55e0d2a683934d487223919865ed0d01 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Mon, 2 Feb 2026 15:00:24 -0500 Subject: [PATCH 2/3] Initial structure to support threaded execution --- changelog/14158.misc.rst | 1 + src/_pytest/fixtures.py | 4 ++-- src/_pytest/runner.py | 9 +++------ testing/threading/test_current_test.py | 7 +++++-- testing/threading/test_threading.py | 5 +++-- testing/threading/utils.py | 1 + 6 files changed, 15 insertions(+), 12 deletions(-) create mode 100644 changelog/14158.misc.rst diff --git a/changelog/14158.misc.rst b/changelog/14158.misc.rst new file mode 100644 index 00000000000..50ea7e941ad --- /dev/null +++ b/changelog/14158.misc.rst @@ -0,0 +1 @@ +Initial restructuring of internals to support threaded execution (:issue:`13768`). diff --git a/src/_pytest/fixtures.py b/src/_pytest/fixtures.py index f0dbc258db9..e3d8fe73346 100644 --- a/src/_pytest/fixtures.py +++ b/src/_pytest/fixtures.py @@ -56,8 +56,6 @@ from _pytest.deprecated import check_ispytest from _pytest.deprecated import YIELD_FIXTURE from _pytest.main import Session -from _pytest.runner import _pytest_thread_id -from _pytest.runner import PytestThreadId from _pytest.mark import ParameterSet from _pytest.mark.structures import MarkDecorator from _pytest.outcomes import fail @@ -65,6 +63,8 @@ from _pytest.outcomes import TEST_OUTCOME from _pytest.pathlib import absolutepath from _pytest.pathlib import bestrelpath +from _pytest.runner import _pytest_thread_id +from _pytest.runner import PytestThreadId from _pytest.scope import _ScopeName from _pytest.scope import HIGH_SCOPES from _pytest.scope import Scope diff --git a/src/_pytest/runner.py b/src/_pytest/runner.py index de0585d1661..c957605ac93 100644 --- a/src/_pytest/runner.py +++ b/src/_pytest/runner.py @@ -8,9 +8,9 @@ from contextvars import ContextVar import dataclasses import os +import re import sys import threading -import re import types from typing import cast from typing import final @@ -141,7 +141,7 @@ def runtestprotocol( thread = threading.current_thread() if match := re.match(r"^pytest-thread-(\d+)$", thread.name): - thread_id = PytestThreadId(match.group(1)) + thread_id = PytestThreadId(int(match.group(1))) _pytest_thread_id.set(thread_id) item.session._thread_started(thread, thread_id) elif item.session._thread_info: @@ -235,13 +235,10 @@ def _update_current_test_var( # _thread_info. Copy to avoid races. thread_info = item.session._thread_info.copy() if thread_info: - if when is None: - value = None - thread = threading.current_thread() # validated by our rejection of invalidly-named threads in runtestprotocol. assert thread in thread_info - thread_info[thread].current_test_var = value + thread_info[thread].current_test_var = None if when is None else value if any(info.current_test_var is not None for info in thread_info.values()): os.environ[var_name] = "; ".join( f"{info.pytest_thread_id}={info.current_test_var}" diff --git a/testing/threading/test_current_test.py b/testing/threading/test_current_test.py index 7128270cf2f..edc26d8200a 100644 --- a/testing/threading/test_current_test.py +++ b/testing/threading/test_current_test.py @@ -1,9 +1,12 @@ from __future__ import annotations -from testing.threading.utils import threaded_conftest, n_conftest_threads +from testing.threading.utils import n_conftest_threads +from testing.threading.utils import threaded_conftest +from _pytest.pytester import Pytester -def test_current_test_envvar_shows_all_threads(pytester): + +def test_current_test_envvar_shows_all_threads(pytester: Pytester) -> None: pytester.makeconftest(threaded_conftest) # PYTEST_CURRENT_TEST should be popped from os.environ after pytest finishes diff --git a/testing/threading/test_threading.py b/testing/threading/test_threading.py index 9018f39f7b6..e6b8cfa7296 100644 --- a/testing/threading/test_threading.py +++ b/testing/threading/test_threading.py @@ -2,10 +2,11 @@ import sys -import pytest -from _pytest.pytester import Pytester from testing.threading.utils import threaded_conftest +from _pytest.pytester import Pytester +import pytest + def test_invalid_thread_name_raises(pytester: Pytester) -> None: # note that there's a race condition if a thread switch happens at diff --git a/testing/threading/utils.py b/testing/threading/utils.py index c61af8c2e66..41fb8205a6e 100644 --- a/testing/threading/utils.py +++ b/testing/threading/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations + n_conftest_threads = 3 # a conftest that runs tests in a threadpool. You can think of this as "like pytest-xdist, From cf9a880e406a80016985a2e909a5723f997487e5 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Mon, 2 Feb 2026 15:00:39 -0500 Subject: [PATCH 3/3] fix mypy not being able to disambiguate identically-named files in different modules --- testing/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 testing/__init__.py diff --git a/testing/__init__.py b/testing/__init__.py new file mode 100644 index 00000000000..e69de29bb2d