Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/14158.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Initial restructuring of internals to support threaded execution (:issue:`13768`).
25 changes: 23 additions & 2 deletions src/_pytest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
from _pytest.outcomes import TEST_OUTCOME
from _pytest.pathlib import absolutepath
from _pytest.pathlib import bestrelpath
from _pytest.runner import _pytest_thread_id
from _pytest.runner import PytestThreadId
from _pytest.scope import _ScopeName
from _pytest.scope import HIGH_SCOPES
from _pytest.scope import Scope
Expand Down Expand Up @@ -1013,12 +1015,31 @@ def __init__(
self.argnames: Final = getfuncargnames(func, name=argname)
# If the fixture was executed, the current value of the fixture.
# Can change if the fixture is executed with different parameters.
self.cached_result: _FixtureCachedResult[FixtureValue] | None = None
self._finalizers: Final[list[Callable[[], object]]] = []
self._cached_results_by_thread: dict[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really want this to move to setupstate

PytestThreadId | None, _FixtureCachedResult[FixtureValue] | None
] = defaultdict(lambda: None)
self._finalizers_by_thread: dict[
PytestThreadId | None, list[Callable[[], object]]
] = defaultdict(list)

# only used to emit a deprecationwarning, can be removed in pytest9
self._autouse = _autouse

@property
def cached_result(self) -> _FixtureCachedResult[FixtureValue] | None:
key = _pytest_thread_id.get()
return self._cached_results_by_thread[key]

@cached_result.setter
def cached_result(self, value: _FixtureCachedResult[FixtureValue] | None) -> None:
key = _pytest_thread_id.get()
self._cached_results_by_thread[key] = value

@property
def _finalizers(self) -> list[Callable[[], object]]:
key = _pytest_thread_id.get()
return self._finalizers_by_thread[key]

@property
def scope(self) -> _ScopeName:
"""Scope string, one of "function", "class", "module", "package", "session"."""
Expand Down
36 changes: 34 additions & 2 deletions src/_pytest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import argparse
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
Expand All @@ -16,6 +17,7 @@
import os
from pathlib import Path
import sys
from threading import Thread
from typing import final
from typing import Literal
from typing import overload
Expand Down Expand Up @@ -43,7 +45,9 @@
from _pytest.pathlib import scandir
from _pytest.reports import CollectReport
from _pytest.reports import TestReport
from _pytest.runner import _pytest_thread_id
from _pytest.runner import collect_one_node
from _pytest.runner import PytestThreadId
from _pytest.runner import SetupState
from _pytest.warning_types import PytestWarning

Expand Down Expand Up @@ -520,6 +524,14 @@ def __missing__(self, path: Path) -> str:
return r


@dataclasses.dataclass
class ThreadInfo:
thread: Thread
# note that this is the id from pytest-thread-n, not get_ident().
pytest_thread_id: PytestThreadId
current_test_var: str | None = None


@final
class Dir(nodes.Directory):
"""Collector of files in a file system directory.
Expand Down Expand Up @@ -581,8 +593,6 @@ class Session(nodes.Collector):

Interrupted = Interrupted
Failed = Failed
# Set on the session by runner.pytest_sessionstart.
_setupstate: SetupState
# Set on the session by fixtures.pytest_sessionstart.
_fixturemanager: FixtureManager
exitstatus: int | ExitCode
Expand All @@ -608,11 +618,33 @@ def __init__(self, config: Config) -> None:
self._initial_parts: list[CollectionArgument] = []
self._collection_cache: dict[nodes.Collector, CollectReport] = {}
self.items: list[nodes.Item] = []
# info about each thread that is running pytest items
self._thread_info: dict[Thread, ThreadInfo] = {}

self._bestrelpathcache: dict[Path, str] = _bestrelpath_cache(config.rootpath)

self.config.pluginmanager.register(self, name="session")

self._setupstates: dict[PytestThreadId | None, SetupState] = defaultdict(
SetupState
)

def _thread_started(self, thread: Thread, pytest_thread_id: PytestThreadId) -> None:
if thread not in self._thread_info:
self._thread_info[thread] = ThreadInfo(
thread=thread, pytest_thread_id=pytest_thread_id
)

@property
def _setupstate(self) -> SetupState:
key = _pytest_thread_id.get()
return self._setupstates[key]

@_setupstate.setter
def _setupstate(self, setupstate: SetupState) -> None:
key = _pytest_thread_id.get()
self._setupstates[key] = setupstate

@classmethod
def from_config(cls, config: Config) -> Session:
session: Session = cls._create(config=config)
Expand Down
57 changes: 54 additions & 3 deletions src/_pytest/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

import bdb
from collections.abc import Callable
from contextvars import ContextVar
import dataclasses
import os
import re
import sys
import threading
import types
from typing import cast
from typing import final
from typing import Generic
from typing import Literal
from typing import NewType
from typing import TYPE_CHECKING
from typing import TypeVar

Expand Down Expand Up @@ -44,6 +48,12 @@
from _pytest.main import Session
from _pytest.terminal import TerminalReporter

PytestThreadId = NewType("PytestThreadId", int)

_pytest_thread_id: ContextVar[PytestThreadId | None] = ContextVar(
"_pytest_thread_id", default=None
)

#
# pytest plugin hooks.

Expand Down Expand Up @@ -128,6 +138,20 @@ def runtestprotocol(
# This only happens if the item is re-run, as is done by
# pytest-rerunfailures.
item._initrequest() # type: ignore[attr-defined]

thread = threading.current_thread()
if match := re.match(r"^pytest-thread-(\d+)$", thread.name):
thread_id = PytestThreadId(int(match.group(1)))
_pytest_thread_id.set(thread_id)
item.session._thread_started(thread, thread_id)
elif item.session._thread_info:
raise RuntimeError(
f"pytest threads must follow the naming convention pytest-thread-{{n}}, "
f"but got thread named {thread.name!r}, which does not. "
"(Currently registered thread names: "
f"{[thread.name for thread in item.session._thread_info]!r})"
)

rep = call_and_report(item, "setup", log)
reports = [rep]
if rep.passed:
Expand Down Expand Up @@ -203,10 +227,37 @@ def _update_current_test_var(
If ``when`` is None, delete ``PYTEST_CURRENT_TEST`` from the environment.
"""
var_name = "PYTEST_CURRENT_TEST"
value = f"{item.nodeid} ({when})"
# don't allow null bytes on environment variables (see #2644, #2957)
value = value.replace("\x00", "(null)")

# We are not synchronizing with other threads that may be registering into
# _thread_info. Copy to avoid races.
thread_info = item.session._thread_info.copy()
if thread_info:
thread = threading.current_thread()
# validated by our rejection of invalidly-named threads in runtestprotocol.
assert thread in thread_info
thread_info[thread].current_test_var = None if when is None else value
if any(info.current_test_var is not None for info in thread_info.values()):
os.environ[var_name] = "; ".join(
f"{info.pytest_thread_id}={info.current_test_var}"
for info in thread_info.values()
if info.current_test_var is not None
)
else:
# all items across all threads have finished.
#
# If python switches threads right after the else, we might have a
# race with two threads executing this code. Guard against this with
# try/except.
try:
os.environ.pop(var_name)
except KeyError:
pass
return

if when:
value = f"{item.nodeid} ({when})"
# don't allow null bytes on environment variables (see #2644, #2957)
value = value.replace("\x00", "(null)")
os.environ[var_name] = value
else:
os.environ.pop(var_name)
Expand Down
Empty file added testing/__init__.py
Empty file.
5 changes: 4 additions & 1 deletion testing/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,9 +1009,12 @@ def test_fix(foo):
result.stdout.fnmatch_lines(["*test_fix*", "*fixture*'missing'*not found*"])


def test_store_except_info_on_error() -> None:
def test_store_except_info_on_error(monkeypatch: MonkeyPatch) -> None:
"""Test that upon test failure, the exception info is stored on
sys.last_traceback and friends."""
# pytest_runtest_call calls _update_current_test_var which in turn accesses
# item.session, which doesn't exist on our mocked ItemMightRaise.
monkeypatch.setattr(runner, "_update_current_test_var", lambda *args: None)

# Simulate item that might raise a specific exception, depending on `raise_error` class var
class ItemMightRaise:
Expand Down
42 changes: 42 additions & 0 deletions testing/threading/test_current_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

from testing.threading.utils import n_conftest_threads
from testing.threading.utils import threaded_conftest

from _pytest.pytester import Pytester


def test_current_test_envvar_shows_all_threads(pytester: Pytester) -> None:
pytester.makeconftest(threaded_conftest)

# PYTEST_CURRENT_TEST should be popped from os.environ after pytest finishes
pytester.makeconftest(
"""
import os
import pytest

def pytest_sessionfinish():
assert "PYTEST_CURRENT_TEST" not in os.environ
"""
)

pytester.makepyfile(
f"""
import os
import pytest
from threading import Barrier

barrier = Barrier({n_conftest_threads})

def test_1():
assert "test_1 (call)" in os.environ["PYTEST_CURRENT_TEST"]

def test_2():
assert "test_2 (call)" in os.environ["PYTEST_CURRENT_TEST"]

def test_3():
assert "test_3 (call)" in os.environ["PYTEST_CURRENT_TEST"]
"""
)
result = pytester.runpytest()
result.assert_outcomes(passed=3)
Loading
Loading