/usr/share/cagefs-skeleton/opt/cloudlinux/venv/lib/python3.11/site-packages
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import grp import os import re import sys import syslog import functools from typing import Tuple, Optional, List # NOQA import cldetectlib as detect from clcommon import ClPwd, mysql_lib from clcommon.clproc import ProcLve from clcommon.utils import grep from clcommon.cpapi.plugins import cpanel, directadmin, ispmanager, plesk BAD_CODING_ERROR_CODE = 48 class ConfigDataError(Exception): """ Should be raised when something went wrong during user's config data loading/parsing. Supposed to be used when we want to skip some user with broken configuration files """ pass def detect_panelclass(): detect.getCPName() if detect.CP_NAME == 'Plesk': return Plesk() if detect.CP_NAME == 'cPanel': return Cpanel() if detect.CP_NAME == 'InterWorx': return InterWorx() if detect.CP_NAME == 'ISPManager': return ISPManager() if detect.CP_NAME == 'DirectAdmin': return DirectAdmin() if detect.CP_NAME == 'HostingNG': return HostingNG() return Unknown() class GeneralPanel: def __init__(self): # verifying proper use panel class self.users_data = ClPwd() self.class_name = self.__class__.__name__ self.current_panel_name = detect.getCPName() if self.class_name.lower() != self.current_panel_name.lower(): raise UserWarning( f'Class "{self.class_name}" not should be used on panel "{self.current_panel_name}"' ) self.cpapi_plugin = None def list_admins(self, debug=False): """ List all admins names in given control panel :param bool debug: Do produce debug output or don't :return: list of strings """ return self.cpapi_plugin.admins() def is_admin(self, username): """ Return True if username is in admin names :param str username: user to check :return: bool """ return self.cpapi_plugin.is_admin(username) def _warning(self, debug, method_name): if debug: sys.stderr.write(f"{method_name} is not implemented for {self.__class__.__name__}\n") @staticmethod def _slog_warning(msg): syslog.syslog(syslog.LOG_WARNING, f'CL_CONTROLLIB: {msg}') def _check_and_get_user(self, uid=None, username=None, debug=False, syslog_=True): """ Check that provided user is really exists in system and return it as: - uid if username was provided - username if uid was provided - None if user doesn't exists Produce syslog/debug output depending on flags if user isn't exists. This helper function is mostly needed because it's possible that user is already deleted in system but his entity is still present in control panel files (like DA_USERS_DIR) so we should do additional checks to skip such users. :return: (uid | username) or None """ if uid is not None and username is not None: raise ValueError('This function accepts either uid or username, ' 'but not both simultaneously') if uid is None and username is None: raise ValueError('Incorrect call: uid or username is not specified') if username is not None: try: # will return uid return self.users_data.get_uid(username) except ClPwd.NoSuchUserException as e: if debug: print(e) if syslog_: self._slog_warning( f'User with username="{username}" is not present in the system' ) return None try: # will return username return self.users_data.get_names(int(uid))[0] except ClPwd.NoSuchUserException as e: if debug: print(e) if syslog_: self._slog_warning( f'User with uid="{uid}" is not present in the system' ) return None except ValueError: print("Incorrect user id") return None @staticmethod def get_file_lines(file_name): """ Safely read file content and close it before return the content string. No exceptions will have caught! :param string file_name: Absolute or relative path to file :rtype: string :return: Content of given file path """ with open(file_name, 'r', encoding='utf-8') as f: content = f.readlines() return content @staticmethod def write_file_lines(file_name, content, open_method='w'): """ Safely write to file and close it before the function end. No exceptions will have caught! :param string file_name: Absolute or relative path to file :param string content: What to write there :param string open_method: Either "w" or "a" or "b" :return: None """ with open(file_name, open_method, encoding='utf-8') as f: f.writelines(content) def list_all(self, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages and /usr/bin/getcontrolpaneluserspackages --list-all commands Result is stdout output: uid package name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List all") def list_users_in_package(self, package, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --package=package_name Result is stdout output: uid :param string package: Package name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List users in given package") def list_users_in_reseller_package(self, package, reseller, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --package=package_name --reseller=reseller_name Result is stdout output: uid :param string package: Package name :param string reseller: Reseller name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List users in given reseller's package") def list_user_package(self, uid, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --userid=id Result is stdout output: uid package name :param int uid: User ID :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List given user's package") def list_packages(self, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --list-packages Result is stdout output: package name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List all packages") def list_resellers_packages(self, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --list-resellers-packages Result is stdout output: reseller_name package name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List packages belong to resellers") def list_reseller_packages(self, reseller, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --list-reseller-packages=reseller Result is stdout output: package name :param string reseller: Reseller name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List packages belong to given reseller") def list_users(self, debug=False): """ Implements /usr/bin/getcontrolpaneluserspackages --list-users Result is stdout output: uid,package name,reseller_name :param bool debug: Do produce debug output or don't :return: None """ self._warning(debug, "List users") def _get_user_packages(self, debug=False, reseller_name=None): """ Generate triple with uid, it's package, it's reseller :param bool debug: Do produce debug output or don't :param string reseller_name: filter by this reseller :rtype: generator of (int, string, string) :return: (uid, package name, reseller name) """ self._warning(debug, "List reseler's users") yield 0, '', '' def list_reseller_users(self, debug=False, reseller=None): """ Print uid and package for defined reseller :param bool debug: Do produce debug output or don't :param reseller: username of reseller :return: None """ if self.class_name != 'GeneralPanel': if reseller is None: if debug: print('Reseller is not defined') return for uid, package, _ in self._get_user_packages(debug=debug, reseller_name=reseller): print(f'{uid},{package}') def get_reseller(self, uid, debug=False): """ Determine user's reseller :param string uid: uid of user :param bool debug: Do produce debug output or don't :rtype: string :return: reseller name """ self._warning(debug, "Get user's reseller") class Cpanel(GeneralPanel): USERDIR = '/var/cpanel/users' PACKAGEDIR = '/var/cpanel/packages' RESELLERS_PATH = '/var/cpanel/resellers' RESELLERS_AND_USERS = '/etc/trueuserowners' users_data = None def __init__(self): super().__init__() self.min_uid = self.users_data.get_sys_min_uid() self.cpapi_plugin = cpanel.PanelPlugin() def get_file_lines(self, file_name): """ Safely read file content and close it before return the content string. No exceptions will have caught! :param string file_name: Absolute or relative path to file :rtype: string :return: Content of given file path """ ENCODING_LINK = "http://kb.cloudlinux.com/2017/08/how-to-fix-the-issue-" \ "with-non-utf-symbols-in-packages-names/" try: with open(file_name, 'r', encoding='utf-8') as f: content = f.readlines() except UnicodeDecodeError as e: message = f"Unable to read file '{file_name}', error is: {e}. " \ f"It looks like you use not an UTF-8 charset." if file_name.startswith(self.USERDIR): message += f"\nPlease read the instruction: {ENCODING_LINK}" print(message, file=sys.stderr) self._slog_warning(message) sys.exit(BAD_CODING_ERROR_CODE) return content def _get_users_resellers_dict(self): """ Get dict, where keys - user's names, values - reseller's names :return: dict {user1: reseller1, ...} """ result = {} # The file has a following structure: # ------------ # #userowners v1 # asdf123: root # t2est2: root # ------------ # We used to just skip the first line, but for some reason it wasn't # there on my machine, so we would skip an actual reseller info. # Now we're using a regular expression. # In cPanel a username can only contain letters and digits. pattern = re.compile(r"^(?P<name>\w+):\s*(?P<owner>\w+)$") if os.path.isfile(self.RESELLERS_AND_USERS): file_content = self.get_file_lines(self.RESELLERS_AND_USERS) for line in file_content: stripped = line.strip() match = re.match(pattern, stripped) if match is None: continue result[match.group('name')] = match.group('owner') return result def _get_resellers(self): """ Get resellers list from RESELLERS_PATH :rtype: list :return: list of resellers names """ resellers = [] if os.path.isfile(self.RESELLERS_PATH): file_content = self.get_file_lines(self.RESELLERS_PATH) resellers = [line.strip().split(':', 1)[0] for line in file_content] return resellers def get_package(self, path): """ Get package name from user's config file :param string path: Path to file contains info about cPanel user :rtype: string :return: Package name """ package = '' if os.path.isfile(path): file_content = self.get_file_lines(path) result = list(grep( r'PLAN\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=file_content )) if result: package = result[0].strip().split('=')[1] return package def _get_reseller(self, uid, debug=False): """ Determine user's reseller based on `owner` field from user's config :param string uid: uid of user :param bool debug: Do produce debug output or don't :rtype: string :return: reseller name """ reseller = '' username = self._check_and_get_user(uid=uid, debug=debug) if username: path = os.path.join(self.USERDIR, username) if os.path.isfile(path): file_content = self.get_file_lines(path) result = list(grep( r'OWNER\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=file_content )) if result: reseller = result[0].strip().split('=')[1] return reseller if reseller and reseller in self._get_resellers() else '' def get_reseller(self, uid, debug=False): """ Determine user's reseller based on `owner` field from user's config :param string uid: uid of user :param bool debug: Do produce debug output or don't :rtype: string :return: reseller name """ print(self._get_reseller(uid, debug=debug)) def _get_packages(self): """ Generate list of packages in current panel :rtype: generator of (string, bool) :return: (name of package, True) for normal files and (/path/to/wrang/file, False) otherwise """ if os.path.isdir(self.PACKAGEDIR): for filename in os.listdir(self.PACKAGEDIR): if os.path.isfile(os.path.join(self.PACKAGEDIR, filename)): yield (filename, True) else: yield (os.path.join(self.PACKAGEDIR, filename), False) def list_packages(self, debug=False): # --list-packages resellers = self._get_resellers() for filename, is_file in self._get_packages(): if is_file: if '_' in filename and filename.split('_', 1)[0] in resellers: continue print(filename) elif debug: print(f"Warning: {filename} is not a file.") def _get_user_packages(self, debug=False, reseller_name=None): """ Generate triple with uid, it's package and it's reseller for every user in USERDIR which uid is greater than MIN_UID :param bool debug: Do produce debug output or don't :param string reseller_name: filter by this reseller :rtype: generator of (int, string, string) :return: (uid, package name, reseller name) """ users_resellers_dict = self._get_users_resellers_dict() # users_resellers_dict example: # {'r1': 'root', 'user1': 'res1res1', 'res1res1': 'res1res1', # 'r': 'root', 'res1root': 'root', 'cltest1': 'root'} if not users_resellers_dict: return if os.path.isdir(self.USERDIR): for filename in os.listdir(self.USERDIR): try: uid = self.users_data.get_uid(filename) if uid < self.min_uid: # skip all systemusers with uid < MIN_UID continue except ClPwd.NoSuchUserException as e: if debug: print(e) else: path = os.path.join(self.USERDIR, filename) reseller = users_resellers_dict.get(filename, self._get_reseller(uid, debug)) reseller = '' if self.is_admin(reseller) else reseller if reseller_name is not None and reseller != reseller_name: continue yield (uid, self.get_package(path), reseller) def list_all(self, package=None, debug=False, reseller=None): # pylint: disable=arguments-renamed for uid, self_package, self_reseller in self._get_user_packages(debug, reseller_name=reseller): if self_package != '': if package is None: print(uid, self_package) elif package == self_package: print(uid) def list_user_package(self, uid, debug=False): # --userid try: users = self.users_data.get_names(int(uid)) except ClPwd.NoSuchUserException: if debug: print("getcontrolpaneluserspackages: User not found") except ValueError: print("Incorrect user id") else: for user in users: path = self.USERDIR + '/' + user if os.path.isfile(path): package = self.get_package(path) if package != '': print(package) def list_resellers_packages(self, debug=False, reseller=None): resellers = self._get_resellers() for filename, is_file in self._get_packages(): if is_file and '_' in filename: res_name = filename.split('_', 1)[0] if res_name not in resellers: continue if reseller is not None and reseller != res_name: continue print(f"{res_name} {filename}") elif not is_file and debug: print(f"Warning: {filename} is not a file.") def list_reseller_packages(self, reseller, debug=False): resellers = self._get_resellers() if reseller not in resellers: if debug: print(f"Error: {reseller} is not reseller") return self.list_resellers_packages(debug=debug, reseller=reseller) def list_users_in_package(self, package, debug=False): # --package resellers = self._get_resellers() if "_" in package: parts = package.split('_') if any('_'.join(parts[:i]) in resellers for i in range(1, len(parts))): return if package is not None: self.list_all(package) elif debug: print("getcontrolpaneluserspackages: Undefined package") def list_users_in_reseller_package(self, package, reseller, debug=False): resellers = self._get_resellers() if reseller in resellers: # and package.startswith("%s_" % reseller): self.list_all(package, debug=debug, reseller=reseller) elif debug: print("getcontrolpaneluserspackages: Undefined reseller package") def list_users(self, debug=False): for uid, self_package, reseller in self._get_user_packages(debug): print(f"{uid},{self_package},{reseller}") class DirectAdmin(GeneralPanel): DA_USERS_DIR = '/usr/local/directadmin/data/users/' DA_CONF = '/usr/local/directadmin/conf/directadmin.conf' DA_ADMIN_LIST = '/usr/local/directadmin/data/admin/admin.list' DA_RESELLERS_LIST = '/usr/local/directadmin/data/admin/reseller.list' DA_ADMINS_PACKAGES = '/usr/local/directadmin/data/admin/packages.list' DA_ADMIN = "" ENCODING = "" users_data = None proc_lve = ProcLve() def __init__(self): super().__init__() self.DA_ADMIN = detect.detect_DA_admin() self.cpapi_plugin = directadmin.PanelPlugin() # Detect DA native encoding (see LU-1334) self.ENCODING = self.cpapi_plugin.get_encoding_name() def get_file_lines(self, file_name): try: with open(file_name, 'r', encoding=self.ENCODING) as f: content = f.readlines() return content except UnicodeDecodeError as e: message = f"Unable to read file '{file_name}'. " \ f"It looks like you use not an {self.ENCODING} charset." self._slog_warning(message + f" Error is: {e}.") raise RuntimeError(message) from e def _get_user_info(self, username): # type: (str) -> Tuple[int, str, str] """ Just a wrapper around _get_user_info_inner to send errors to syslog """ try: return self._get_user_info_inner(username) # We catch all possible errors because anything can happen during # user's data loading and/or parsing/processing and all that we can do # about this is just log error and skip this user on upper level: except Exception as e: tpl = "Skipping user '{}' with bad configuration files due to '{}'" self._slog_warning(tpl.format(username, e)) # Should be used on higher levels to skip user if it's applicable: raise ConfigDataError() from e def _get_user_info_inner(self, username): # type: (str) -> Tuple[int, str, str] """ Return uid, package name and owner of user (reseller) """ userprofile = os.path.join(self.DA_USERS_DIR, username, 'user.conf') try: userprofile_content = self.get_file_lines(userprofile) except Exception: userprofile_content = None if not userprofile_content: # not exists or empty raise ValueError('Unable to read any data from user.conf') # TODO: maybe it is better to parse config file instead of grep? result_usertype = list(grep( r'usertype\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=userprofile_content, )) result_creator = list(grep( r'creator\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=userprofile_content, )) try: usertype = result_usertype[0].strip().split('=')[1] except IndexError as e: raise ValueError('No "usertype" record in user config') from e try: reseller = result_creator[0].strip().split('=')[1] except IndexError as e: raise ValueError('No "creator" record in user config') from e package = self._get_user_package(username, userprofile_content) # DA stores 'creator' in config but we need # 'owner' or 'vendor', so additional checks needed if usertype == 'reseller': # reseller has another kind of package # so we mark owner as 'root' reseller = '' # when reseller has active limits his user ignores package limits # otherwise we have problems with limits inheritance # (yay, backwards compatibility!) lvp_id = self.users_data.get_uid(username) if self._is_reseller_limits_enabled(lvp_id): package = 'DEFAULT' reseller = username # and finally get userid from passwd file uid = self.users_data.get_uid(username) return uid, package, reseller def _is_reseller_limits_enabled(self, lvp_id): return self.proc_lve.exist_lvp(lvp_id) def _get_user_package(self, username: str, userprofile_content: Optional[List[str]] = None) -> str: """ Reads user's package just as in the `_get_user_info_inner` function, but without masking to DEFAULT package Be aware that it may return different package than the `_get_user_info_inner` function """ if not userprofile_content: userprofile = os.path.join(self.DA_USERS_DIR, username, 'user.conf') try: userprofile_content = self.get_file_lines(userprofile) except Exception: userprofile_content = None if not userprofile_content: # not exists or empty raise ValueError('Unable to read any data from user.conf') result_package = list(grep( r'package\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=userprofile_content, )) result_original_package = list(grep( r'original_package\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=userprofile_content, )) # yay, sometimes directadmin stores package using other option name # https://www.directadmin.com/features.php?id=1180 try: package = result_package[0].strip().split('=')[1] except IndexError as e: raise ValueError('No "package" record in user config') from e if package == 'custom' and result_original_package: try: package = result_original_package[0].strip().split('=')[1] except IndexError as e: raise ValueError('No "original_package" record in user config') from e return package def _get_user_packages(self, debug=False, reseller_name=None): """ Generate triple with uid, it's package, it's reseller :param bool debug: Do produce debug output or don't :param string reseller_name: filter by this reseller :rtype: generator of (int, string, string) :return: (uid, package name, reseller name) """ list_admins = set(self.list_admins()) for username in os.listdir(self.DA_USERS_DIR): # skip all DA admins, check added long-long time ago # you can find some details in LU-290 if username in list_admins: continue if self._check_and_get_user(username=username, debug=debug) is None: continue try: uid, package, reseller = self._get_user_info(username) except ConfigDataError: continue # filter users by reseller if reseller_name is not None and reseller != reseller_name: continue yield uid, package, reseller def list_reseller_users(self, debug=False, reseller=None): # type: (bool, str) -> None """ Print uid and package for users of a specified reseller :param debug: whether to produce debug output :param reseller: username of a reseller :return: None """ path = os.path.join(self.DA_USERS_DIR, reseller, 'users.list') if not os.path.exists(path): return reseller_users = self.get_file_lines(path) for username in reseller_users: try: uid, package, _ = self._get_user_info(username.rstrip('\n')) except ConfigDataError: continue print(f'{uid},{package}') # do not forget about reseller's own user if not self.is_admin(reseller): try: uid, package, _ = self._get_user_info(reseller) except ConfigDataError: return print(f'{uid},{package}') def get_reseller(self, uid, debug=False): """ Determine user's reseller based on package from user's config :param string uid: uid of user :param bool debug: Do produce debug output or don't :rtype: string :return: reseller name """ username = self._check_and_get_user(uid=uid, debug=debug) if username: try: _, _, reseller = self._get_user_info(username) except ConfigDataError: return print(reseller) def is_da_admin(self, login_name): """ Check login_name is da admin. Backward compatibility wrapper :param login_name: Admin name :return: """ return self.is_admin(login_name) def is_da_admin_for_hook(self, login_name): """ Check login_name is da admin for using in hook return True - if login_name is admin False - otherwise """ # Attention!!! # In hook we can't search login_name in list, which returns by self.admins(), # because DA calls hook before (!) updating /usr/local/directadmin/data/admin/admin.list # So in hook we never find login_name in this file. And we should to determine # user type from his user.conf file. This is a universal method. result = False # TODO this functionality partially duplicates self._get_user_info path = os.path.join(self.DA_USERS_DIR, login_name, 'user.conf') if os.path.isfile(path): file_content = self.get_file_lines(path) result_usertype = list(grep( r'usertype\=.*$', fixed_string=False, match_any_position=False, multiple_search=False, data_from_file=file_content, )) if result_usertype: usertype = result_usertype[0].strip().split('=')[1] if usertype == 'admin': result = True return result def add_all_da_admins_to_sudoers(self, s_cmd): """ Add all DA admins to sudoers. Now calls only from CageFS plugin installer :param s_cmd: String such as "Clsudo.add_cagefs_user(user)" :return: """ try: # Do not remove Clsudo from import list # NOQA - local disable pyflakes unused import checks from clsudo import (Clsudo, NoSuchUser, UnableToReadFile, UnableToWriteFile) # NOQA except ImportError: print("Cannot import Clsudo. Check if alt-python27-cllib package installed\n") sys.exit(1) da_admins = self.list_admins() for user in da_admins: try: eval(s_cmd) except (NoSuchUser, UnableToReadFile, UnableToWriteFile) as e: print(f'{e}\n') def get_all_da_admins(self, debug=False): """ Wrapper for compatibility with old code """ return self.list_admins(debug) def list_admins(self, debug=False): """ Get all DA admins list :rtype: list of strings :return: List of DA admins names """ da_admins = [] if os.path.isfile(self.DA_ADMIN_LIST): file_content = self.get_file_lines(self.DA_ADMIN_LIST) da_admins = [line.rstrip('\n') for line in file_content] return da_admins def _get_packages(self): file_content = self.get_file_lines(self.DA_ADMINS_PACKAGES) for line in file_content: yield line.rstrip('\n') def list_packages(self, debug=False): for line in self._get_packages(): print(line) def list_all(self, debug=False): for uid, package, _ in self._get_user_packages(): print(uid, package) def list_users(self, debug=False): for uid, package, reseller in self._get_user_packages(): print(f"{uid},{package},{reseller}") def list_user_package(self, uid, debug=False): username = self._check_and_get_user(uid=uid, debug=debug) if username: try: _, package, _ = self._get_user_info(username) except ConfigDataError: return print(package) def list_users_in_package(self, package, debug=False): # --package # here we work only with root packages for uid, package_, reseller in \ self._get_user_packages(reseller_name=''): if package_ == package: print(uid) def list_users_in_reseller_package(self, package, reseller, debug=False): # --package --reseller # here we work only with end user's packages for uid, package_, _ in \ self._get_user_packages(reseller_name=reseller): if package_ == package: print(uid) def _get_resellers(self): """ Return list of resellers names """ resellers_list = [] if os.path.isfile(self.DA_RESELLERS_LIST): file_content = self.get_file_lines(self.DA_RESELLERS_LIST) resellers_list = [line.rstrip() for line in file_content if line.rstrip()] resellers_list.extend(self.list_admins()) return list(set(resellers_list)) def _get_reseller_packages(self, reseller): """ Get all packages for giver reseller :param string reseller: Reseller name :rtype: list of strings :return: List of packages names that belongs to given reseller """ packages_list = [] package_path = os.path.join(self.DA_USERS_DIR, reseller, "packages.list") if os.path.isfile(package_path): file_content = self.get_file_lines(package_path) packages_list = [line.rstrip() for line in file_content if line.rstrip()] return packages_list def list_resellers_packages(self, debug=False): # --list-resellers-packages first_exception = None for reseller in self._get_resellers(): try: packages = self._get_reseller_packages(reseller) except Exception as e: if first_exception is None: first_exception = e continue # it's work faster when cycle in cycle or previous variant # out_string = 'res1 pkg1\n res1 pkg2\n res1 pkg3' out_string = functools.reduce( lambda result, pkg, res=reseller: f'{result}{res} {pkg}\n', # processing function packages, # list for processing '' # accumulator for result ).strip() # remove last \n print(out_string) if first_exception is not None: raise first_exception def list_reseller_packages(self, reseller, debug=False): # --list-reseller-packages print('\n'.join(self._get_reseller_packages(reseller))) class ISPManagerGetPackagesException(Exception): def __init__(self, message): Exception.__init__(self, message) class ISPManager(GeneralPanel): ISP4_CONF = '/usr/local/ispmgr/etc/ispmgr.conf' users_data = None ISP5_GRP = 'mgrsecure' def __init__(self): super().__init__() detect.getCP() ver_parts = detect.CP_VERSION.split('.') self.isp_major_ver = int(ver_parts[0]) # ISP5 type - master/slave self.isp5_is_master = detect.ispmanager5_is_master() self.cpapi_plugin = ispmanager.PanelPlugin() def _get_user_packages_dict(self): """ Retrives list of pairs "uid - package" :return: Dictionary: uid -> package_name Example: {512: 'custom', 513: 'test1'} Case 2: (Error): (1, "error message") """ dict_uid_package = {} if self.isp_major_ver == 4: # ISPManager 4 if not os.path.isfile(self.ISP4_CONF): raise ISPManagerGetPackagesException(f"ERROR: Can't read {self.ISP4_CONF}") try: with open(self.ISP4_CONF, 'r', encoding='utf-8') as f: lines = f.read() lines = lines.split('Account') lines.pop(0) for line in lines: uid = line.split()[0] uid = int(self.users_data.get_uid(uid.replace("\"", ""))) line = line.split('\n') for line1 in line: if line1.strip().startswith("Preset"): pkg_name = ' '.join(line1.strip().split()[1:]) if uid in dict_uid_package: dict_uid_package[uid] = dict_uid_package[uid] + ' ' + str(pkg_name) else: dict_uid_package[uid] = str(pkg_name) except (OSError, IOError) as e: # raise ISPManagerGetPackagesException("ERROR: Can't read %s - %s" % (self.ISP4_CONF, str(e))) raise ISPManagerGetPackagesException( f"ERROR: Can't read {self.ISP4_CONF} - {str(e)}" ) from e else: # ISP Manager 5 # We emmulate don`t retrive packages from isp 5. All users have default package # see more https://cloudlinux.atlassian.net/browse/LU-256 # Isp5 use lvectl and THIS script when DB is locked. So we can`t get packages from isp 5 db. try: panel_users = grp.getgrnam(self.ISP5_GRP).gr_mem for user in panel_users: try: uid = self.users_data.get_uid(user) dict_uid_package[uid] = 'default' except ClPwd.NoSuchUserException: # skip user without UID pass except KeyError: # group self.ISP5_GRP not found pass return dict_uid_package @classmethod def get_reseller(cls, uid, debug=False): """ Get reseller name from user's config file :param string uid: uid of user :param bool debug: Do produce debug output or don't :rtype: string :return: reseller name """ reseller = 'admin' print(reseller) def list_packages(self, debug=False): # --list-packages try: dict_uid_package = self._get_user_packages_dict() for package in set(dict_uid_package.values()): print(package) except ISPManagerGetPackagesException as e: print(e) sys.exit(1) def list_all(self, debug=False): # list all try: dict_uid_package = self._get_user_packages_dict() for uid, package in dict_uid_package.items(): print(str(uid) + ' ' + package) except ISPManagerGetPackagesException as e: print(e) sys.exit(1) def list_user_package(self, uid, debug=False): # --userid try: uid = int(uid) dict_uid_package = self._get_user_packages_dict() if uid in dict_uid_package: print(dict_uid_package[uid]) except ISPManagerGetPackagesException as e: print(e) sys.exit(1) except ValueError: print("Incorrect user id") def list_users_in_package(self, package, debug=False): # --package try: dict_uid_package = self._get_user_packages_dict() for uid, package_ in dict_uid_package.items(): if package == package_: print(str(uid)) except ISPManagerGetPackagesException as e: print(e) sys.exit(1) def list_users(self, debug=False): # --list-users try: dict_uid_package = self._get_user_packages_dict() for uid, package in dict_uid_package.items(): print(f"{uid},{package},admin") except ISPManagerGetPackagesException as e: print(e) sys.exit(1) class Plesk(GeneralPanel): """ See following link for information about database: https://github.com/plesk/db-schemas/blob/master/psadb.xml """ PSA_SHADOW = '/etc/psa/.psa.shadow' ADMIN_ID = '1' NO_PACKAGE = 'None' users_data = None def __init__(self): super().__init__() self.cpapi_plugin = plesk.PanelPlugin() def fetch_data_from_db(self, sql, data=None): if not os.path.isfile(self.PSA_SHADOW): return False with open(self.PSA_SHADOW, 'r', encoding='utf-8') as f: passwd = f.read().strip() connector = mysql_lib.MySQLConnector(host='localhost', user='admin', passwd=str(passwd), db='psa', use_unicode=True, charset='utf8') with connector.connect() as db: return db.execute_query(sql, args=data) def _get_user_packages(self, debug=False, reseller_name=None, username=None): """ Generate triple with uid, it's package, it's reseller :param bool debug: Do produce debug output or don't :param string reseller_name: filter by this reseller :param string username: filter by this username (ignored if reseller_name set) :rtype: generator of (int, string, string) :return: (uid, package name, reseller name) """ # users in plesk can have no package! # please, be careful when you change this SQL query # always run tests from QA repository! query = """ SELECT sys_user.login, template.name, reseller.login FROM sys_users AS sys_user JOIN hosting AS hosting ON hosting.sys_user_id = sys_user.id JOIN domains AS domain ON hosting.dom_id = domain.id AND domain.webspace_id = 0 JOIN clients AS reseller ON domain.vendor_id = reseller.id LEFT JOIN Subscriptions AS subscription ON subscription.object_id = domain.id AND subscription.object_type = 'domain' LEFT JOIN PlansSubscriptions AS plan ON plan.subscription_id = subscription.id LEFT JOIN Templates AS template ON plan.plan_id = template.id WHERE sys_user.mapped_to IS NULL AND (template.type = 'domain' OR template.type IS NULL) """ if reseller_name is not None: query = f"{query} AND reseller.login = %s" result = self.fetch_data_from_db(query, [reseller_name]) elif username is not None: query = f"{query} AND sys_user.login = %s" result = self.fetch_data_from_db(query, [username]) else: result = self.fetch_data_from_db(query) for username_, package, reseller in result: package = package or self.NO_PACKAGE reseller = '' if reseller == 'admin' else reseller uid = self._check_and_get_user(username=username_, debug=debug) if username_ and uid is not None: yield uid, package, reseller def list_packages(self, debug=False): # --list-packages """ Print packages (exclude reseller's) :param bool debug: Do produce debug output or don't :return: packages names """ query = f""" SELECT name FROM psa.Templates WHERE owner_id = {self.ADMIN_ID} AND type = 'domain'; """ for line in self.fetch_data_from_db(query): print(line[0]) def get_reseller(self, uid: str, debug: bool = False): # --get-user-reseller """ Get reseller name from DB :param string uid: uid of user :param bool debug: Do produce debug output or don't :rtype: string :return: reseller name """ reseller = '' usernames = self.users_data.get_names(int(uid)) if usernames: format_strings = ','.join(['%s'] * len(usernames)) query = f"""SELECT reseller.login FROM sys_users AS sys_user JOIN hosting AS hosting ON hosting.sys_user_id = sys_user.id JOIN domains AS domain ON hosting.dom_id = domain.id JOIN clients AS reseller ON domain.vendor_id = reseller.id WHERE sys_user.login IN ({format_strings})""" result = self.fetch_data_from_db(query, tuple(usernames)) if result: reseller = result[0][0] print('' if reseller == 'admin' else reseller) def list_all(self, debug=False): # --list-all """ Print info about user's packages (include resellers) :param bool debug: Do produce debug output or don't :return: pairs "uid package", where uid - unique id of user package - name of user's package """ users = self._get_user_packages(debug=debug) for (uid, package, _) in users: print(f'{uid} {package}') def list_users(self, debug=False): # --list-users """ Print info about user's packages (include resellers) and thouse resellers :param bool debug: Do produce debug output or don't :return: triples "uid,package,reseller", where uid - unique id of user package - name of user's package reseller - name of package's reseller """ users = self._get_user_packages(debug=debug) for (uid, package, reseller) in users: print(f'{uid},{package},{reseller}') def list_user_package(self, uid, debug=False): # --userid """ Print all packages for user's uid :param uid: user's unique id :param bool debug: Do produce debug output or don't :return: package name for user's uid """ packages = [] try: names = self.users_data.get_names(int(uid)) except ClPwd.NoSuchUserException as e: if debug: print(e) except ValueError: print("Incorrect user id") else: for name in names: result = self._get_user_packages(debug=debug, username=name) packages.extend([line[1] for line in result]) for package in packages: print(package) def list_users_in_package(self, package, debug=False): # --package """ Print all users in package :param package: name of package :param bool debug: Do produce debug output or don't :return: user's uid """ query = f""" SELECT t5.login FROM psa.Templates AS t1 JOIN psa.PlansSubscriptions AS t2 ON t2.plan_id = t1.id JOIN psa.Subscriptions AS t3 ON t3.id = t2.subscription_id JOIN psa.domains AS t4 ON t4.id = t3.object_id JOIN psa.hosting AS t6 ON t6.dom_id = t4.id RIGHT JOIN psa.sys_users AS t5 ON t6.sys_user_id = t5.id WHERE t1.name = '{package}' AND t1.owner_id = {self.ADMIN_ID} AND t1.type = 'domain'; """ for login in self.fetch_data_from_db(query): print(self.users_data.get_uid(login[0])) def list_users_in_reseller_package(self, package, reseller, debug=False): # --package --reseller """ Print info about users in resellers package :param package: package name :param reseller: name of package's reseller :param bool debug: Do produce debug output or don't :return: users uids """ reseller_query = f"""SELECT id FROM psa.clients WHERE type = 'reseller' AND login = '{reseller}'""" reseller_id = self.fetch_data_from_db(reseller_query) if len(reseller_id) != 1: return reseller_id = int(reseller_id[0][0]) query = f""" SELECT t5.login FROM psa.Templates AS t1 JOIN psa.PlansSubscriptions AS t2 ON t2.plan_id = t1.id JOIN psa.Subscriptions AS t3 ON t3.id = t2.subscription_id JOIN psa.domains AS t4 ON t4.id = t3.object_id JOIN psa.hosting AS t6 ON t6.dom_id = t4.id RIGHT JOIN psa.sys_users AS t5 ON t6.sys_user_id = t5.id WHERE t1.type = 'domain' AND t1.name = '{package}' AND t1.owner_id = '{reseller_id}'; """ result = self.fetch_data_from_db(query) for line in result: try: luid = self.users_data.get_uid(line[0]) except ClPwd.NoSuchUserException as e: if debug: print(e) else: print(str(luid)) def list_resellers_packages(self, debug=False): # --list-resellers-packages """ Print list of resellers packages :param bool debug: Do produce debug output or don't :return: pairs "reseller package", where package - package name reseller - package's reseller """ query = f"""SELECT t2.login, t1.name FROM (SELECT name, owner_id, type FROM psa.Templates) AS t1 JOIN psa.clients AS t2 ON t1.owner_id = t2.id WHERE t1.owner_id != {self.ADMIN_ID} AND t1.type = 'domain';""" for line in self.fetch_data_from_db(query): package = line[1] reseller = 'root' if line[0] == 'admin' else line[0] print(f'{reseller} {package}') def list_domain_packages_with_id(self) -> List[Tuple[Optional[str], str, int]]: """ Return list of non-reseller packages with the plesk DB id [(reseller, package, plesk_id), (None, package, plesk_id)] """ res = [] query = """SELECT t2.login, t1.name, t1.id FROM (SELECT id, name, owner_id, type FROM psa.Templates) AS t1 JOIN psa.clients AS t2 ON t1.owner_id = t2.id WHERE t1.type = 'domain';""" for line in self.fetch_data_from_db(query): package = line[1] reseller = 'root' if line[0] == 'admin' else line[0] _id = line[2] res.append((reseller, package, int(_id))) return res def get_package_name_by_id_from_plesk_db(self, package_id: int) -> str: """ Return package name by the plesk DB id """ query = "SELECT id, name, type FROM psa.Templates WHERE id = %s;" data = self.fetch_data_from_db(query, (package_id,)) return data[0][1] def get_package_id_by_name_from_plesk_db(self, package_name: str) -> int: """ Return package id by the plesk DB name """ query = "SELECT id, name, type FROM psa.Templates WHERE name = %s;" data = self.fetch_data_from_db(query, (package_name,)) return data[0][0] def list_reseller_packages(self, reseller, debug=False): # --list-reseller-packages """ Print list reseller's packages :param reseller: name of reseller :param bool debug: Do produce debug output or don't :return: packages names """ if reseller == "root": reseller = "admin" if not re.match(r"^[\w_]*$", reseller): return None query = f"""SELECT t1.name FROM (SELECT name, owner_id, type FROM psa.Templates) as t1 JOIN psa.clients as t2 ON t1.owner_id = t2.id AND t2.login = '{reseller}' WHERE t1.type = 'domain' AND t1.owner_id != {self.ADMIN_ID};""" for line in self.fetch_data_from_db(query): print(line[0]) def list_admins(self, debug=False): return ["admin"] class InterWorx(GeneralPanel): def _warning(self, debug, method_name): if debug: sys.stderr.write("Doesn't support InterWorx anymore\n") sys.exit(0) class Unknown(GeneralPanel): def _warning(self, debug, method_name): raise NotImplementedError(f"{method_name} wasn't implemented for Unknown panel") class HostingNG(GeneralPanel): def _warning(self, debug, method_name): raise NotImplementedError(f"{method_name} wasn't implemented for HostingNG panel")
.
Edit
..
Edit
GitPython-3.1.32.dist-info
Edit
Jinja2-3.0.3.dist-info
Edit
Mako-1.2.4.dist-info
Edit
MarkupSafe-2.1.3.dist-info
Edit
PyJWT-2.8.0.dist-info
Edit
PyMySQL-1.1.0.dist-info
Edit
PyVirtualDisplay-3.0.dist-info
Edit
PyYAML-6.0.1.dist-info
Edit
__pycache__
Edit
_cffi_backend.cpython-311-x86_64-linux-gnu.so
Edit
_distutils_hack
Edit
_pyrsistent_version.py
Edit
_pytest
Edit
_yaml
Edit
aiohttp
Edit
aiohttp-3.9.2.dist-info
Edit
aiohttp_jinja2
Edit
aiohttp_jinja2-1.5.dist-info
Edit
aiohttp_security
Edit
aiohttp_security-0.4.0.dist-info
Edit
aiohttp_session
Edit
aiohttp_session-2.9.0.dist-info
Edit
aiosignal
Edit
aiosignal-1.3.1.dist-info
Edit
alembic
Edit
alembic-1.11.1.dist-info
Edit
astroid
Edit
astroid-2.15.6.dist-info
Edit
attr
Edit
attrs
Edit
attrs-23.1.0.dist-info
Edit
backports
Edit
certifi
Edit
certifi-2023.7.22.dist-info
Edit
cffi
Edit
cffi-1.15.1.dist-info
Edit
chardet
Edit
chardet-5.2.0.dist-info
Edit
charset_normalizer
Edit
charset_normalizer-2.1.1.dist-info
Edit
cl_dom_collector
Edit
cl_proc_hidepid.py
Edit
cl_website_collector
Edit
clcagefslib
Edit
clcommon
Edit
clconfig
Edit
clconfigure
Edit
clcontrollib.py
Edit
cldashboard
Edit
cldetectlib.py
Edit
cldiaglib.py
Edit
clevents
Edit
clflags
Edit
clhooklib.py
Edit
cli_utils.py
Edit
cllicense
Edit
cllicenselib.py
Edit
cllimits
Edit
cllimits_validator
Edit
cllimitslib_v2
Edit
cllvectl
Edit
clpackages
Edit
clquota
Edit
clselect
Edit
clselector
Edit
clsentry
Edit
clsetuplib.py
Edit
clsudo.py
Edit
clsummary
Edit
clveconfig
Edit
clwizard
Edit
clwpos
Edit
configparser-5.0.2.dist-info
Edit
configparser.py
Edit
contextlib2
Edit
contextlib2-21.6.0.dist-info
Edit
coverage
Edit
coverage-7.2.7.dist-info
Edit
cryptography
Edit
cryptography-41.0.2.dist-info
Edit
ddt-1.4.4.dist-info
Edit
ddt.py
Edit
dill
Edit
dill-0.3.7.dist-info
Edit
distlib
Edit
distlib-0.3.8.dist-info
Edit
distutils-precedence.pth
Edit
docopt-0.6.2.dist-info
Edit
docopt.py
Edit
dodgy
Edit
dodgy-0.2.1.dist-info
Edit
filelock
Edit
filelock-3.13.1.dist-info
Edit
flake8
Edit
flake8-5.0.4.dist-info
Edit
flake8_polyfill
Edit
flake8_polyfill-1.0.2.dist-info
Edit
frozenlist
Edit
frozenlist-1.4.0.dist-info
Edit
future
Edit
future-0.18.3.dist-info
Edit
git
Edit
gitdb
Edit
gitdb-4.0.10.dist-info
Edit
guppy
Edit
guppy3-3.1.3.dist-info
Edit
idna
Edit
idna-3.4.dist-info
Edit
iniconfig
Edit
iniconfig-2.0.0.dist-info
Edit
isort
Edit
isort-5.12.0.dist-info
Edit
jinja2
Edit
jsonschema
Edit
jsonschema-3.2.0.dist-info
Edit
jwt
Edit
lazy_object_proxy
Edit
lazy_object_proxy-1.9.0.dist-info
Edit
libfuturize
Edit
libpasteurize
Edit
lve_stats-2.0.dist-info
Edit
lve_utils
Edit
lveapi.py
Edit
lvectllib.py
Edit
lvemanager
Edit
lvestat.py
Edit
lvestats
Edit
lxml
Edit
lxml-4.9.2.dist-info
Edit
mako
Edit
markupsafe
Edit
mccabe-0.7.0.dist-info
Edit
mccabe.py
Edit
mock
Edit
mock-5.1.0.dist-info
Edit
multidict
Edit
multidict-6.0.4.dist-info
Edit
numpy
Edit
numpy-1.25.1.dist-info
Edit
numpy.libs
Edit
packaging
Edit
packaging-23.1.dist-info
Edit
pam.py
Edit
past
Edit
pep8_naming-0.10.0.dist-info
Edit
pep8ext_naming.py
Edit
pip
Edit
pip-25.0.1.dist-info
Edit
pkg_resources
Edit
platformdirs
Edit
platformdirs-3.11.0.dist-info
Edit
pluggy
Edit
pluggy-1.2.0.dist-info
Edit
prettytable
Edit
prettytable-3.8.0.dist-info
Edit
prometheus_client
Edit
prometheus_client-0.8.0.dist-info
Edit
prospector
Edit
prospector-1.10.2.dist-info
Edit
psutil
Edit
psutil-5.9.5.dist-info
Edit
psycopg2
Edit
psycopg2_binary-2.9.6.dist-info
Edit
psycopg2_binary.libs
Edit
py.py
Edit
pycodestyle-2.9.1.dist-info
Edit
pycodestyle.py
Edit
pycparser
Edit
pycparser-2.21.dist-info
Edit
pydocstyle
Edit
pydocstyle-6.3.0.dist-info
Edit
pyfakefs
Edit
pyfakefs-5.2.3.dist-info
Edit
pyflakes
Edit
pyflakes-2.5.0.dist-info
Edit
pylint
Edit
pylint-2.17.4.dist-info
Edit
pylint_celery
Edit
pylint_celery-0.3.dist-info
Edit
pylint_django
Edit
pylint_django-2.5.3.dist-info
Edit
pylint_flask
Edit
pylint_flask-0.6.dist-info
Edit
pylint_plugin_utils
Edit
pylint_plugin_utils-0.7.dist-info
Edit
pylve-2.1-py3.11.egg-info
Edit
pylve.cpython-311-x86_64-linux-gnu.so
Edit
pymysql
Edit
pyparsing
Edit
pyparsing-3.0.9.dist-info
Edit
pyrsistent
Edit
pyrsistent-0.19.3.dist-info
Edit
pytest
Edit
pytest-7.4.0.dist-info
Edit
pytest_check
Edit
pytest_check-2.5.3.dist-info
Edit
pytest_snapshot
Edit
pytest_snapshot-0.9.0.dist-info
Edit
pytest_subprocess
Edit
pytest_subprocess-1.5.0.dist-info
Edit
pytest_tap
Edit
pytest_tap-3.5.dist-info
Edit
python_pam-1.8.4.dist-info
Edit
pyvirtualdisplay
Edit
raven
Edit
raven-6.10.0.dist-info
Edit
remove_ubc.py
Edit
requests
Edit
requests-2.31.0.dist-info
Edit
requirements_detector
Edit
requirements_detector-1.2.2.dist-info
Edit
schema-0.7.5.dist-info
Edit
schema.py
Edit
secureio.py
Edit
semver
Edit
semver-3.0.1.dist-info
Edit
sentry_sdk
Edit
sentry_sdk-1.29.2.dist-info
Edit
setoptconf
Edit
setoptconf_tmp-0.3.1.dist-info
Edit
setuptools
Edit
setuptools-78.1.0.dist-info
Edit
simple_rpm.so
Edit
simplejson
Edit
simplejson-3.19.1.dist-info
Edit
six-1.16.0.dist-info
Edit
six.py
Edit
smmap
Edit
smmap-5.0.0.dist-info
Edit
snowballstemmer
Edit
snowballstemmer-2.2.0.dist-info
Edit
sqlalchemy
Edit
sqlalchemy-1.3.24.dist-info
Edit
ssa
Edit
svgwrite
Edit
svgwrite-1.4.3.dist-info
Edit
tap
Edit
tap_py-3.2.1.dist-info
Edit
testfixtures
Edit
testfixtures-7.1.0.dist-info
Edit
toml
Edit
toml-0.10.2.dist-info
Edit
tomlkit
Edit
tomlkit-0.11.8.dist-info
Edit
typing_extensions-4.7.1.dist-info
Edit
typing_extensions.py
Edit
unshare-0.22.dist-info
Edit
unshare.cpython-311-x86_64-linux-gnu.so
Edit
urllib3
Edit
urllib3-2.0.4.dist-info
Edit
vendors_api
Edit
virtualenv
Edit
virtualenv-20.21.1.dist-info
Edit
wcwidth
Edit
wcwidth-0.2.6.dist-info
Edit
wmt
Edit
wrapt
Edit
wrapt-1.15.0.dist-info
Edit
xray
Edit
yaml
Edit
yarl
Edit
yarl-1.9.2.dist-info
Edit