/usr/share/cagefs-skeleton/opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk
import os import threading from time import sleep, time from sentry_sdk._compat import check_thread_support from sentry_sdk._queue import Queue, Full from sentry_sdk.utils import logger from sentry_sdk._types import MYPY if MYPY: from typing import Any from typing import Optional from typing import Callable _TERMINATOR = object() class BackgroundWorker(object): def __init__(self, queue_size=30): # type: (int) -> None check_thread_support() self._queue = Queue(queue_size) # type: Queue self._lock = threading.Lock() self._thread = None # type: Optional[threading.Thread] self._thread_for_pid = None # type: Optional[int] @property def is_alive(self): # type: () -> bool if self._thread_for_pid != os.getpid(): return False if not self._thread: return False return self._thread.is_alive() def _ensure_thread(self): # type: () -> None if not self.is_alive: self.start() def _timed_queue_join(self, timeout): # type: (float) -> bool deadline = time() + timeout queue = self._queue queue.all_tasks_done.acquire() try: while queue.unfinished_tasks: delay = deadline - time() if delay <= 0: return False queue.all_tasks_done.wait(timeout=delay) return True finally: queue.all_tasks_done.release() def start(self): # type: () -> None with self._lock: if not self.is_alive: self._thread = threading.Thread( target=self._target, name="raven-sentry.BackgroundWorker" ) self._thread.setDaemon(True) self._thread.start() self._thread_for_pid = os.getpid() def kill(self): # type: () -> None """ Kill worker thread. Returns immediately. Not useful for waiting on shutdown for events, use `flush` for that. """ logger.debug("background worker got kill request") with self._lock: if self._thread: try: self._queue.put_nowait(_TERMINATOR) except Full: logger.debug("background worker queue full, kill failed") self._thread = None self._thread_for_pid = None def flush(self, timeout, callback=None): # type: (float, Optional[Any]) -> None logger.debug("background worker got flush request") with self._lock: if self.is_alive and timeout > 0.0: self._wait_flush(timeout, callback) logger.debug("background worker flushed") def _wait_flush(self, timeout, callback): # type: (float, Optional[Any]) -> None initial_timeout = min(0.1, timeout) if not self._timed_queue_join(initial_timeout): pending = self._queue.qsize() logger.debug("%d event(s) pending on flush", pending) if callback is not None: callback(pending, timeout) self._timed_queue_join(timeout - initial_timeout) def submit(self, callback): # type: (Callable[[], None]) -> None self._ensure_thread() try: self._queue.put_nowait(callback) except Full: logger.debug("background worker queue full, dropping event") def _target(self): # type: () -> None while True: callback = self._queue.get() try: if callback is _TERMINATOR: break try: callback() except Exception: logger.error("Failed processing job", exc_info=True) finally: self._queue.task_done() sleep(0)
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
_compat.py
Edit
_functools.py
Edit
_queue.py
Edit
_types.py
Edit
api.py
Edit
attachments.py
Edit
client.py
Edit
consts.py
Edit
debug.py
Edit
envelope.py
Edit
hub.py
Edit
integrations
Edit
py.typed
Edit
scope.py
Edit
serializer.py
Edit
sessions.py
Edit
tracing.py
Edit
transport.py
Edit
utils.py
Edit
worker.py
Edit