/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals
import asyncio class DeadlockError(Exception): """Error raised if DeadlockDetectingLock detects deadlock""" class DeadlockDetectingLock: """ Lock that detects deadlock when it is about to be acquired by the same task that already holds it. """ def __init__(self): self._lock = asyncio.Lock() self._owner = None def locked(self): return self._lock.locked() async def __aenter__(self): curr_task = asyncio.current_task() if self._owner == curr_task: raise DeadlockError() await self._lock.acquire() self._owner = curr_task return self async def __aexit__(self, exc_type, exc, tb): self._owner = None self._lock.release()
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
auth_protocol.py
Edit
cln.py
Edit
deadlock_detecting_lock.py
Edit
global_scope.py
Edit
iaid.py
Edit
lazy_load.py
Edit
logger.py
Edit
logging_protocol.py
Edit
persistent_message.py
Edit
the_sink.py
Edit