/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils
class LineBuffer(object): """ Allows to accumulate data, and than iterate over it getting tokens split by line breaks '\n'. If at the end there is no line break, the data will sit in the line buffer until more data with line break comes in. """ def __init__(self): self.buf = "" def append(self, data): self.buf += data def __iter__(self): return self def __next__(self): pos = self.buf.find("\n") if pos != -1: result = self.buf[0:pos] self.buf = self.buf[pos + 1 :] return result raise StopIteration def clean(self): self.buf = "" class SizeBuffer: def __init__(self, size_len=2): self._buf = b"" self._size_len = size_len def append(self, data): self._buf += data def __iter__(self): return self def __next__(self): if not self._buf: raise StopIteration size = int.from_bytes(self._buf[: self._size_len], "big") if len(self._buf[self._size_len :]) >= size: data = self._buf[self._size_len : self._size_len + size] self._buf = self._buf[self._size_len + size :] return data raise StopIteration
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
_shutil.py
Edit
antivirus_mode.py
Edit
async_utils.py
Edit
benchmark.py
Edit
buffer.py
Edit
check_db.py
Edit
check_lock.py
Edit
cli.py
Edit
common.py
Edit
completions.py
Edit
config.py
Edit
cronjob.py
Edit
doctor.py
Edit
fd_ops.py
Edit
hyperscan.py
Edit
importer.py
Edit
ipecho.py
Edit
json.py
Edit
kwconfig.py
Edit
net_transport.py
Edit
parsers.py
Edit
resource_limits.py
Edit
safe_fileops.py
Edit
safe_sequence.py
Edit
serialization.py
Edit
sshutil.py
Edit
subprocess.py
Edit
support.py
Edit
threads.py
Edit
validate.py
Edit
whmcs.py
Edit
wordpress_mu_plugin.py
Edit