/usr/share/cagefs-skeleton/opt/imunify360/venv/lib/python3.11/site-packages/im360/subsys
import asyncio import json import logging from collections import OrderedDict from pathlib import Path from typing import Dict, Set, Tuple from defence360agent.model.simplification import run_in_executor from defence360agent.utils import CheckRunError, atomic_rewrite, check_run from im360.contracts.config import WebServices as WebServicesConfig from im360.contracts.config import Webshield from im360.model.country import CountryList from im360.model.firewall import RemoteProxy from im360.subsys.panels.hosting_panel import HostingPanel from im360.subsys.webshield_mode import Mode as WebshieldMode from im360.utils import is_apache2nginx_enabled from im360.subsys.int_config import is_force_use_coraza logger = logging.getLogger(__name__) _HEADER = "# AUTOGENERATED, DO NOT EDIT\n" _CONFIG = "/etc/imunify360-webshield/agent-proxies.conf" _CONFIG_COUNTRY_BLACKLIST = ( "/etc/imunify360-webshield/blocked_country_codes.conf" ) _COMPOSE_LISTS_SCRIPT = "/usr/sbin/imunify360-webshield-compose-lists" #: if file exists then no traffic should be redirected to webshield _WS_NO_REDIRECTION_FLAG_PATH = Path("/var/imunify360/webshield_broken") class Error(Exception): """Base exception for the module.""" async def is_ssl_cache_configured() -> bool: """ Return True if ssl cache is configured in csf :return: bool """ cmd = ["im360-ssl-cache", "--json"] return bool(json.loads((await check_run(cmd)).decode())) def expects_traffic(): """Whether webshield expects traffic.""" return not _WS_NO_REDIRECTION_FLAG_PATH.exists() async def is_running() -> bool: """Whether webshield is running.""" if Path("/usr/bin/imunify360-wsctl").is_file(): try: stdout = await check_run(["imunify360-wsctl", "status"]) except CheckRunError as e: raise Error( "failed to find out whether webshield is running" ) from e status = json.loads(stdout.decode("ascii") or "{}") unit = status.get("units", {}).get("imunify360-webshield", {}) # TODO: DEF-40500 return unit.get("active", False) else: try: await check_run( ["/usr/share/imunify360-webshield/webshieldctl", "is-active"] ) except CheckRunError as e: if e.returncode == 1: return False raise Error( "failed to find out whether webshield is running" ) from e return True async def service_reload(): if Path("/usr/bin/imunify360-wsctl").is_file(): args = ["imunify360-wsctl", "reload"] else: args = ["/usr/share/imunify360-webshield/webshieldctl", "reload"] try: await check_run(args) except CheckRunError as e: raise Error("Unable to reload settings.") from e async def update_internal_whitelist(_, is_updated): """Updates whitelists for webshield and reloads service. Should be run after static whitelist updates.""" if is_updated: logger.info( "Updating webshield internal whitelist using " "imunify360-webshield-compose-lists script" ) await compose_lists([]) async def compose_lists(script_args): await check_run([_COMPOSE_LISTS_SCRIPT] + script_args) if Webshield.ENABLE: await service_reload() async def _rewrite_webshield_config(gather_items, config_file): """Rewrite config file with autogenerated data from database.""" items = await run_in_executor(asyncio.get_event_loop(), gather_items) content = [_HEADER] for item in sorted(items): content.append("{} 1;\n".format(item)) atomic_rewrite(config_file, "".join(content), backup=False) if Webshield.ENABLE: await service_reload() async def update_country_blacklist_config(): """Fill webshield config file with blacklisted country (from CountryList) Should be called after add/delete record from CountryList. """ await _rewrite_webshield_config( lambda: CountryList.country_codes(CountryList.BLACK), _CONFIG_COUNTRY_BLACKLIST, ) async def update_remote_proxy_config(): """Fill webshield config file with remote proxies (from RemoteProxy)""" await _rewrite_webshield_config( lambda: set( item["network"] for item in RemoteProxy.list(None, None, True) ), _CONFIG, ) def port_range() -> Tuple[int, int]: """Return a range of ports used by webshield to provide captcha and assist in remote proxy processing, as a tuple (first, last). (last value is not included in that range).""" return (52220, 52240) def destination_webshield_ports() -> Set[int]: """Return a set of Webshield ports which could be redirected to.""" redirection_map = port_redirect_map() current_mode = WebshieldMode.get() from_ports = redirected_to_webshield_ports(current_mode) unexpected_from_ports = from_ports - redirection_map.keys() if unexpected_from_ports: logger.warning( "Got unexpected ports to redirect from: %s" ", which are not present in the redirect map: %s", unexpected_from_ports, redirection_map, ) dest_ports = { redirection_map[port] for port in from_ports & redirection_map.keys() } return dest_ports def redirected_to_webshield_ports( mode: WebshieldMode = WebshieldMode.STANDALONE, ) -> Set[int]: """Return a set of TCP destination ports that can be redirected to Webshield.""" # Apache mode implies that apache uses our wafd module and works like # webshield. So no redirection from ports 80/443 is required. # Same for apache2nginx mode - standard ports should NOT be redirected. # Same when Coraza usage is forced - no ports should be redirected. ports = ( set() if mode == WebshieldMode.APACHE or is_force_use_coraza() or is_apache2nginx_enabled() else {80, 443} ) ports |= set(WebServicesConfig.HTTP_PORTS) | set( WebServicesConfig.HTTPS_PORTS ) ports |= HostingPanel().http_ports() | HostingPanel().https_ports() return ports def port_redirect_map() -> Dict[int, int]: """Return a mapping of destination TCP ports to their redirect target port of Webshield.""" m = OrderedDict() # type: Dict[int, int] m[80] = 52224 m[443] = 52223 m[2082] = 52230 m[2086] = 52228 m[2083] = 52229 m[2087] = 52227 m[2095] = 52232 m[2096] = 52231 m[2222] = 52235 m[8443] = 52233 m[8880] = 52234 return m
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
csf.py
Edit
fail2ban.py
Edit
features
Edit
int_config.py
Edit
modsec_app_version_detector.py
Edit
modsec_audit_log.py
Edit
modsec_cache_dir.py
Edit
ossec.py
Edit
pam.py
Edit
panels
Edit
proactive.py
Edit
remoteip.py
Edit
running_ids.py
Edit
shared_disabled_rules.py
Edit
smtp_blocking.py
Edit
waf_rules_configurator.py
Edit
webshield.py
Edit
webshield_mode.py
Edit
whitelist_rbl.py
Edit