/usr/share/cagefs-skeleton/opt/imunify360/venv/lib/python3.11/site-packages/im360/simple_rpc
import asyncio import json from defence360agent.contracts.config import ( GENERIC_SENSOR_SOCKET_PATH, ) async def send_to_socket(msg, timeout=15, wait_for_response=True): result = b"" try: reader, writer = await asyncio.open_unix_connection( path=GENERIC_SENSOR_SOCKET_PATH ) except ConnectionRefusedError: return "Failed to send to socket, check your socket active" except FileNotFoundError: return "Failed to send to socket, socket file not found" except OSError as e: return f"OSError when connectiong to {GENERIC_SENSOR_SOCKET_PATH}: {e}" try: writer.write(json.dumps(msg).encode() + b"\n") await writer.drain() if not wait_for_response: return {} while True: data = await asyncio.wait_for(reader.read(8192), timeout=timeout) if not data: break result += data if b"\n" in data: break return json.loads(result.decode()) except json.JSONDecodeError: return "Failed to decode json answer from a plugin" except ( asyncio.TimeoutError, asyncio.IncompleteReadError, asyncio.CancelledError, BrokenPipeError, ConnectionResetError, OSError, ) as e: return ( result.decode() if result else ( "Failed to process operation with socket. Reason:" f" {type(e).__name__}" ) ) finally: writer.close() await writer.wait_closed()
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
configuration_management.py
Edit
conflicts.py
Edit
control_panel.py
Edit
countries.py
Edit
csf_imports.py
Edit
custom_lists.py
Edit
disabled_rules.py
Edit
feature.py
Edit
health.py
Edit
hosting_panel.py
Edit
incidents.py
Edit
kcarectl.py
Edit
lists.py
Edit
malware.py
Edit
middleware.py
Edit
proactive.py
Edit
remote_proxy.py
Edit
resident_socket.py
Edit
schema
Edit
schema.py
Edit
schema_responses
Edit
smart_advice.py
Edit
smtp_blocking.py
Edit
unavailable_on_freemium.py
Edit
uninstall_cleanup.py
Edit
validate.py
Edit
whitelist_rbl.py
Edit
whitelisted_crawlers.py
Edit
whitelisted_domains.py
Edit