/opt/cloudlinux/venv/lib/python3.11/site-packages/_pytest
import threading import traceback import warnings from types import TracebackType from typing import Any from typing import Callable from typing import Generator from typing import Optional from typing import Type import pytest # Copied from cpython/Lib/test/support/threading_helper.py, with modifications. class catch_threading_exception: """Context manager catching threading.Thread exception using threading.excepthook. Storing exc_value using a custom hook can create a reference cycle. The reference cycle is broken explicitly when the context manager exits. Storing thread using a custom hook can resurrect it if it is set to an object which is being finalized. Exiting the context manager clears the stored object. Usage: with threading_helper.catch_threading_exception() as cm: # code spawning a thread which raises an exception ... # check the thread exception: use cm.args ... # cm.args attribute no longer exists at this point # (to break a reference cycle) """ def __init__(self) -> None: self.args: Optional["threading.ExceptHookArgs"] = None self._old_hook: Optional[Callable[["threading.ExceptHookArgs"], Any]] = None def _hook(self, args: "threading.ExceptHookArgs") -> None: self.args = args def __enter__(self) -> "catch_threading_exception": self._old_hook = threading.excepthook threading.excepthook = self._hook return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: assert self._old_hook is not None threading.excepthook = self._old_hook self._old_hook = None del self.args def thread_exception_runtest_hook() -> Generator[None, None, None]: with catch_threading_exception() as cm: yield if cm.args: thread_name = "<unknown>" if cm.args.thread is None else cm.args.thread.name msg = f"Exception in thread {thread_name}\n\n" msg += "".join( traceback.format_exception( cm.args.exc_type, cm.args.exc_value, cm.args.exc_traceback, ) ) warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg)) @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_runtest_setup() -> Generator[None, None, None]: yield from thread_exception_runtest_hook() @pytest.hookimpl(hookwrapper=True, tryfirst=True) def pytest_runtest_call() -> Generator[None, None, None]: yield from thread_exception_runtest_hook() @pytest.hookimpl(hookwrapper=True, tryfirst=True) def pytest_runtest_teardown() -> Generator[None, None, None]: yield from thread_exception_runtest_hook()
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
_argcomplete.py
Edit
_code
Edit
_io
Edit
_py
Edit
_version.py
Edit
assertion
Edit
cacheprovider.py
Edit
capture.py
Edit
compat.py
Edit
config
Edit
debugging.py
Edit
deprecated.py
Edit
doctest.py
Edit
faulthandler.py
Edit
fixtures.py
Edit
freeze_support.py
Edit
helpconfig.py
Edit
hookspec.py
Edit
junitxml.py
Edit
legacypath.py
Edit
logging.py
Edit
main.py
Edit
mark
Edit
monkeypatch.py
Edit
nodes.py
Edit
nose.py
Edit
outcomes.py
Edit
pastebin.py
Edit
pathlib.py
Edit
py.typed
Edit
pytester.py
Edit
pytester_assertions.py
Edit
python.py
Edit
python_api.py
Edit
python_path.py
Edit
recwarn.py
Edit
reports.py
Edit
runner.py
Edit
scope.py
Edit
setuponly.py
Edit
setupplan.py
Edit
skipping.py
Edit
stash.py
Edit
stepwise.py
Edit
terminal.py
Edit
threadexception.py
Edit
timing.py
Edit
tmpdir.py
Edit
unittest.py
Edit
unraisableexception.py
Edit
warning_types.py
Edit
warnings.py
Edit