/usr/share/cagefs-skeleton/opt/alt/python-internal/lib64/python3.11/asyncio
"""Event loop mixins.""" import threading from . import events _global_lock = threading.Lock() class _LoopBoundMixin: _loop = None def _get_loop(self): loop = events._get_running_loop() if self._loop is None: with _global_lock: if self._loop is None: self._loop = loop if loop is not self._loop: raise RuntimeError(f'{self!r} is bound to a different event loop') return loop
.
Edit
..
Edit
__init__.py
Edit
__main__.py
Edit
__pycache__
Edit
base_events.py
Edit
base_futures.py
Edit
base_subprocess.py
Edit
base_tasks.py
Edit
constants.py
Edit
coroutines.py
Edit
events.py
Edit
exceptions.py
Edit
format_helpers.py
Edit
futures.py
Edit
locks.py
Edit
log.py
Edit
mixins.py
Edit
proactor_events.py
Edit
protocols.py
Edit
queues.py
Edit
runners.py
Edit
selector_events.py
Edit
sslproto.py
Edit
staggered.py
Edit
streams.py
Edit
subprocess.py
Edit
taskgroups.py
Edit
tasks.py
Edit
threads.py
Edit
timeouts.py
Edit
transports.py
Edit
trsock.py
Edit
unix_events.py
Edit
windows_events.py
Edit
windows_utils.py
Edit