/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins
from defence360agent.contracts.plugins import MessageSink from defence360agent.utils import ( check_run, recurring_check, RecurringCheckStop, ) from defence360agent.utils.resource_limits import is_lve_active, has_lvectl class LveUtilsAutoInstaller(MessageSink): """ Install lve-utils package on CL with LVE automatically (according to DEF-11452) to provide tools to limit CPU/IO. Used tools: /usr/sbin/lvectl - provided by lve-utils package /bin/lve_suwrapper - provided by lve-wrappers package (which is a dependency of lve-utils) lve-utils package is installed by default on CL, but for some reason may not exist. """ def __init__(self, *, check_period=3600): self._check_period = check_period self._task = None async def create_sink(self, loop): self._loop = loop self._task = self._loop.create_task( recurring_check(self._check_period)( self._install_lve_utils_if_needed )() ) async def shutdown(self): if self._task is not None: self._task.cancel() await self._task self._task = None async def _install_lve_utils_if_needed(self): if not is_lve_active(): # kernel doesn't support lve or it is disabled # no point trying to install lve-utils raise RecurringCheckStop() # suppose that lve should be actived on CL only if not has_lvectl(): # utilities might have been removed await check_run(["yum", "-y", "install", "lve-utils"])
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
accumulate.py
Edit
analyst_cleanup_update.py
Edit
backup_info_sender.py
Edit
cagefs.py
Edit
checkpoint.py
Edit
client.py
Edit
config_merger.py
Edit
config_watcher.py
Edit
event_hook_executor.py
Edit
event_monitor.py
Edit
event_monitor_message_processor.py
Edit
files_recurring_update.py
Edit
icontact_sender.py
Edit
idle_time_out.py
Edit
lve_utils_install.py
Edit
myimunify.py
Edit
ping.py
Edit
send_domain_list.py
Edit
send_server_config.py
Edit
wordpress.py
Edit