/opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins
"""PAM module management plugin. Changes PAM module state (enabled/disabled) to match imunify360 config. """ import asyncio import logging from defence360agent.contracts import config from defence360agent.contracts.config import SystemConfig from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSink, expect from defence360agent.utils import recurring_check, safe_cancel_task from im360.subsys import ossec, pam logger = logging.getLogger(__name__) class PAMManager(MessageSink): _CONFIG_PERIODIC_CHECK = 3600 # seconds _SSHD_ENABLED = config.FromConfig("PAM", "enable") _DOVECOT_PROTECTION_ENABLED = config.FromConfig( "PAM", "exim_dovecot_protection" ) _DOVECOT_NATIVE_ENABLED = config.FromConfig("PAM", "exim_dovecot_native") _FTP_ENABLED = config.FromConfig("PAM", "ftp_protection") def __init__(self): self._tasks = [] self._status_check_required = asyncio.Event() self._loop = None async def create_sink(self, loop) -> None: self._loop = loop self._tasks.append(loop.create_task(self._status_checker())) self._tasks.append(loop.create_task(self._initiate_status_check())) async def shutdown(self) -> None: for task in self._tasks: if task is not None: await safe_cancel_task(task) async def _ensure_status(self) -> None: status = await pam.get_status() await self._ensure_status_for_dovecot( desired_dovecot_status=pam.DovecotStatus.DISABLED if not self._DOVECOT_PROTECTION_ENABLED else pam.DovecotStatus.PAM if not self._DOVECOT_NATIVE_ENABLED else pam.DovecotStatus.NATIVE, pam_status=status, ) await self._ensure_status_for_service( self._FTP_ENABLED, status, pam.PamService.FTP ) await self._ensure_status_for_service( self._SSHD_ENABLED, status, pam.PamService.SSHD ) # ensure OSSEC status status = dict(await pam.get_status()) # . merge dovecot status for ossec status[ossec.DOVECOT] = ( pam.PamServiceStatusValue.disabled if status[pam.PamService.DOVECOT_NATIVE] == status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.disabled else pam.PamServiceStatusValue.enabled ) del status[pam.PamService.DOVECOT_NATIVE] del status[pam.PamService.DOVECOT_PAM] try: await ossec.configure_for_pam(status) except ossec.OssecRulesError as exc: logger.error("Failed to update OSSEC configuration: %s", exc) async def _ensure_status_for_dovecot( self, desired_dovecot_status: pam.DovecotStatus, pam_status: dict ) -> bool: """Ensure pam status corresponds to the desired dovecot status. Special handling for 3 states. Return whether pam/native modules were enabled. """ if desired_dovecot_status is pam.DovecotStatus.DISABLED: if not ( pam_status[pam.PamService.DOVECOT_NATIVE] == pam_status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.disabled ): # something is enabled # disable dovecot # note: either pam/native will do here; both should be disabled await pam.disable(pam.PamService.DOVECOT_NATIVE) logger.info("PAM module has been disabled for dovecot") elif desired_dovecot_status is pam.DovecotStatus.PAM: if ( pam_status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.enabled ): # already enabled if ( pam_status[pam.PamService.DOVECOT_NATIVE] == pam.PamServiceStatusValue.enabled ): # pragma: no cover # shouldn't happen, report to Sentry logger.error( "Unexpected PAM state: both pam/native are enabled." " Status: %s", pam_status, ) else: # enable dovecot pam pam_service = pam.PamService.DOVECOT_PAM await pam.enable(pam_service) logger.info("PAM module has been enabled for %s", pam_service) return True elif desired_dovecot_status is pam.DovecotStatus.NATIVE: if ( pam_status[pam.PamService.DOVECOT_NATIVE] == pam.PamServiceStatusValue.enabled ): # already enabled if ( pam_status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.enabled ): # pragma: no cover # shouldn't happen, report to Sentry logger.error( "Unexpected PAM state: both pam/native are enabled." " Status: %s", pam_status, ) else: # enable dovecot native pam_service = pam.PamService.DOVECOT_NATIVE await pam.enable(pam_service) logger.info("PAM module has been enabled for %s", pam_service) return True else: # pragma: no cover assert 0, "can't happen" return False # nothing has been enabled async def _ensure_status_for_service( self, should_be_enabled, status, pam_service ): expected_service_status = ( pam.PamServiceStatusValue.enabled if should_be_enabled else pam.PamServiceStatusValue.disabled ) if expected_service_status != status[pam_service]: if should_be_enabled: await pam.enable(pam_service) logger.info("PAM module has been enabled for %s", pam_service) return True await pam.disable(pam_service) logger.info("PAM module has been disabled for %s", pam_service) return False @recurring_check(0) async def _status_checker(self): await self._status_check_required.wait() self._status_check_required.clear() await self._ensure_status() @recurring_check(_CONFIG_PERIODIC_CHECK) async def _initiate_status_check(self): self._status_check_required.set() @expect(MessageType.ConfigUpdate) async def on_config_update(self, message: MessageType.ConfigUpdate): if isinstance(message["conf"], SystemConfig): self._status_check_required.set()
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
cpanel_uploader.py
Edit
export_wblist.py
Edit
fgw.py
Edit
lfd.py
Edit
modsec_ruleset_checker.py
Edit
ossec_rules_checker.py
Edit
pam_manager.py
Edit
php_immunity.py
Edit
remoteip_install.py
Edit
repeater.py
Edit
send_server_config.py
Edit
service_manager.py
Edit
startup_actions.py
Edit
strategy_getter.py
Edit
waf_rules_configurator.py
Edit
whitelist_current_user.py
Edit