/opt/imunify360/venv/lib/python3.11/site-packages/im360/subsys
from __future__ import annotations from asyncio import AbstractEventLoop, Event from collections.abc import Callable from logging import getLogger from pathlib import Path from typing import Any from imav.malwarelib.subsys.ainotify import Event as IEvent from imav.malwarelib.subsys.ainotify import Inotify, Watcher from defence360agent.utils import recurring_check log = getLogger(__name__) _PLUGIN_NAMES = ( "cphulk", "lfd", "modsec", "ossec", ) _DEFAULT_PATH = Path("/etc/imunify360/rules/disabled-rules") _WAIT_DIR_TIMEOUT = 10 class _RuleParsingError(Exception): pass def _parse_rule(line: str) -> tuple[str, int]: if ":" not in line: raise _RuleParsingError("Delimiter ':' is not found in rule:") fields = line.split(":", maxsplit=2) if len(fields) != 3: raise _RuleParsingError( f"Wrong amount of fields, 3 expected but {len(fields)} found:" ) plugin_id = fields[0].strip().lower() if plugin_id not in _PLUGIN_NAMES: raise _RuleParsingError(f"Unknown plugin ID value '{plugin_id!s}':") rule_value = fields[1] try: rule_id = int(rule_value) except ValueError as error: raise _RuleParsingError( f"Invalid rule ID value '{rule_value!s}':" ) from error return plugin_id, rule_id def _load_rules(path: Path) -> dict[str, set[int]]: if not path.is_file(): log.debug( "Config '%s' with shared disabled rules is not found.", path, ) return {} result = {} with path.open(mode="rt") as rules_file: for line_no, raw_line in enumerate(rules_file, start=1): if not (line := raw_line.strip()): continue try: plugin_id, rule_id = _parse_rule(line) except _RuleParsingError as error: log.warning( "%s:%d: %s.", path, line_no, str(error), ) except Exception: log.exception("%s:%d", path, line_no) else: result.setdefault(plugin_id, set()).add(rule_id) return result def get_shared_disabled_modsec_rules_ids( *, path: Path | None = None ) -> set[int]: return _load_rules(path or _DEFAULT_PATH).get("modsec", set()) def get_shared_disabled_rules_list( *, path: Path | None = None ) -> list[dict[str, Any]]: """ Returns list of the rules, extracted from "disabled-rules" file in the format, like {"plugin": "modsec", "rule_id": 1234} """ rules: list[dict[str, Any]] = [] for plugin_name, plugin_rules in _load_rules( path or _DEFAULT_PATH ).items(): rules.extend( {"plugin": plugin_name, "rule_id": rule_id} for rule_id in plugin_rules ) return rules class DisabledRulesWatcher: def __init__( self, loop: AbstractEventLoop, *, path: Path = None, on_change_cb: Callable[..., None] = None, ): self.__cb = on_change_cb self.__event = Event() self.__path = path or _DEFAULT_PATH self.__name = self.__path.name.encode("ascii") self.__rules = {} self.__watcher = None self.__task = None self.__start(loop) def __start(self, loop: AbstractEventLoop): if not (dir_path := self.__path.parent).is_dir(): log.error( "Shared disabled rules directory '%s' does not exist.", dir_path, ) return self.__rules = _load_rules(self.__path) self.__watcher = Watcher(loop, coro_callback=self.__on_io_notify) self.__watcher.watch( str(dir_path).encode("ascii"), Inotify.CLOSE_WRITE | Inotify.MOVED_TO | Inotify.DELETE, ) self.__task = loop.create_task(self.__process_events()) async def __on_io_notify(self, io_event: IEvent): # Squash many inotify events into one asyncio event. # It allows to prevent too fast rules reloading. if io_event.name == self.__name: self.__event.set() @recurring_check(0) async def __process_events(self): try: await self.__event.wait() finally: self.__event.clear() self.__rules = _load_rules(self.__path) if self.__cb is not None: self.__cb() def close(self): if self.__task is not None: self.__task.cancel() if self.__watcher is not None: self.__watcher.close() def match(self, plugin_id: str, rule_id: int) -> bool: return rule_id in self.__rules.get(plugin_id, set()) def count(self) -> int: return sum(map(len, self.__rules.values()))
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
csf.py
Edit
fail2ban.py
Edit
features
Edit
int_config.py
Edit
modsec_app_version_detector.py
Edit
modsec_audit_log.py
Edit
modsec_cache_dir.py
Edit
ossec.py
Edit
pam.py
Edit
panels
Edit
proactive.py
Edit
remoteip.py
Edit
running_ids.py
Edit
shared_disabled_rules.py
Edit
smtp_blocking.py
Edit
waf_rules_configurator.py
Edit
webshield.py
Edit
webshield_mode.py
Edit
whitelist_rbl.py
Edit