/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts
import os import pwd import stat import uuid from pathlib import Path from typing import Dict, List, Optional from defence360agent.contracts.permissions import logger from defence360agent.model import instance from defence360agent.myimunify.model import MyImunify, update_users_protection from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.utils import safe_fileops MYIMUNIFY_ID_FILE_NAME = ".myimunify_id" _BANNER = ( "# DO NOT EDIT\n# This file contains MyImunify id unique to this user\n\n" ) _ID_LEN = 32 _HEX = frozenset("0123456789abcdef") class MyImunifyIdError(Exception): """Exception representing issues related to MyImunify id""" async def add_myimunify_user( sink, user: str, protection: bool ) -> Optional[str]: """Save subscription type to the DB and generate id file""" myimunify, _ = MyImunify.get_or_create(user=user) myimunify.save() await update_users_protection(sink, [user], protection) logger.info("Applied setting MyImunify=%s for user %s", protection, user) try: myimunify_id = await _get_or_generate_id(user) except MyImunifyIdError: # User no longer exists return None return myimunify_id async def get_myimunify_users() -> List[Dict]: """ Get a list of MyImunify users, their subscription types and unique ids """ users = [] user_details = await HostingPanel().get_user_details() myimunify_user_to_id = await _myimunify_user_to_id() with instance.db.transaction(): for user, myimunify_uid in sorted(myimunify_user_to_id.items()): record, _ = MyImunify.get_or_create(user=user) users.append( { "email": user_details.get(user, {}).get("email", ""), "username": user, "myimunify_id": myimunify_uid, "protection": record.protection, "locale": user_details.get(user, {}).get("locale", ""), } ) return users async def _myimunify_user_to_id() -> Dict[str, str]: """Get a list of users and their MyImunify ids""" user_to_id = {} for user in await HostingPanel().get_users(): try: user_to_id[user] = await _get_or_generate_id(user) except MyImunifyIdError: # User does not exist continue except safe_fileops.UnsafeFileOperation as e: logger.error( "Unable to generate id for user=%s, error=%s", user, str(e) ) continue return user_to_id async def _get_or_generate_id(user: str) -> str: """ Read MyImunify id if exists and valid, or generate a new one and write into the file. Malformed files are regenerated. """ id_file = await _get_myimunify_id_file(user) try: return _read_id(id_file) except (FileNotFoundError, MyImunifyIdError): myimunify_id = uuid.uuid1().hex return await _write_id(myimunify_id, id_file) async def _write_id(myimunify_id: str, id_file: Path) -> str: """Write MyImunify id to file""" text = _BANNER + myimunify_id + "\n" try: await safe_fileops.write_text(str(id_file), text) except (OSError, PermissionError) as e: logger.error("Unable to write myimunify_id in user home dir: %s", e) raise MyImunifyIdError from e return myimunify_id def _read_id(id_file: Path) -> str: """Read and validate MyImunify id from file. Raises MyImunifyIdError if malformed. Opens with O_RDONLY | O_NONBLOCK and verifies via fstat() that the fd refers to a regular file before reading. This eliminates the TOCTOU window between a path-level type check and the actual read (e.g. an attacker replacing the file with a FIFO between the two). """ try: fd = os.open(str(id_file), os.O_RDONLY | os.O_NONBLOCK) except FileNotFoundError: raise except OSError: raise MyImunifyIdError try: if not stat.S_ISREG(os.fstat(fd).st_mode): raise MyImunifyIdError data = os.read(fd, 8192) text = data.decode("utf-8") except UnicodeDecodeError: raise MyImunifyIdError finally: os.close(fd) return _parse_id(text) def _parse_id(text: str) -> str: """Read line by line: skip comments (#). First non-comment line must be valid id; nothing after it.""" id_line = None for line in text.splitlines(): s = line.strip() if not s: continue if s.startswith("#"): continue if id_line is not None: raise MyImunifyIdError if len(s) != _ID_LEN or not all(c in _HEX for c in s): raise MyImunifyIdError id_line = s if id_line is None: raise MyImunifyIdError return id_line async def _get_myimunify_id_file(user: str) -> Path: """Get a file with MyImunify id and create it if does not exist""" try: user_pwd = pwd.getpwnam(user) except KeyError as e: logger.error("No such user: %s", user) raise MyImunifyIdError from e else: id_file = Path(user_pwd.pw_dir) / MYIMUNIFY_ID_FILE_NAME try: safe_fileops.ensure_regular_file(str(id_file)) except FileNotFoundError: if not id_file.parent.exists(): logger.error("No such user homedir: %s", user) raise MyImunifyIdError try: await safe_fileops.touch(str(id_file)) except (PermissionError, OSError) as e: logger.error( "Unable to put myimunify_id in user home dir: %s", e ) raise MyImunifyIdError from e except OSError: logger.error("Cannot access identity file: %s", id_file) raise MyImunifyIdError return id_file
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
config.py
Edit
config_provider.py
Edit
eula.py
Edit
hook_events.py
Edit
hooks.py
Edit
license.py
Edit
messages.py
Edit
myimunify_id.py
Edit
permissions.py
Edit
plugins.py
Edit
sentry.py
Edit