/opt/imunify360/venv/lib/python3.11/site-packages/clcommon
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import os PROC_LVE = '/proc/lve' LIMIT_LVP_ID = 4294967295 class ProcLveError(Exception): pass class NoSuchLvp(ProcLveError): pass class ProcLve: def __init__(self, proc_lve=PROC_LVE): self._lvp = 'lvp' self.list = 'list' self.proc_lve = proc_lve self._version = None def proc_lve_list(self): """ Return path to /proc/lve/list """ return os.path.join(self.proc_lve, self.list) def version(self): """ Return /proc/lve filesystem version """ if self._version is not None: return self._version with open(self.proc_lve_list(), encoding='utf-8') as f: line = f.read(3) # we need only first three symbols for parse version self._version = int(line.rsplit(':', 1)[0]) return self._version def proc_lve_map(self): return os.path.join(self.proc_lve, 'map') def proc_lve_resellers(self): """ Return path to /proc/lve/resellers """ return os.path.join(self.proc_lve, 'resellers') def resellers_supported(self): """ Check present /proc/lve/resellers """ return os.path.exists(self.proc_lve_resellers()) def get_list_path(self, lvp_id=0): """ Generate path to list file """ if lvp_id == 0: return self.proc_lve_list() else: return os.path.join(self.proc_lve_resellers(), self._lvp + str(lvp_id), self.list) def _extract_id(self, line): return int(line.split()[0].split(',')[-1]) def _lines(self, lvp_id=0): path = self.get_list_path(lvp_id=lvp_id) try: with open(path, encoding='utf-8') as list_: lines = list_.read().strip() except IOError as e: if e.errno == 2: # 2 code error - No such file or directory raise NoSuchLvp(f'No such lvp id {lvp_id}') from e else: raise lines = lines.split('\n')[1:] for line in lines: yield self._extract_id(line), line.rstrip(os.linesep) def lines(self, lvp_id=0, without_limits=True): for lve_id, line in self._lines(lvp_id=lvp_id): if lve_id == LIMIT_LVP_ID and without_limits: continue yield line def lve_id_list(self, lvp_id=0): for lve_id, _ in self._lines(lvp_id=lvp_id): if lve_id in (LIMIT_LVP_ID, 0): continue yield lve_id def lvp_id_list(self): """ Obtain from /proc/lve/resellers lvp id list """ for lvp_dir in os.listdir(self.proc_lve_resellers()): if lvp_dir.startswith(self._lvp): yield int(lvp_dir[len(self._lvp):]) def check_inside_list(self, id_, lvp_id=0): for lve_id in self.lve_id_list(lvp_id=lvp_id): if lve_id == id_: return True def exist_lvp(self, lvp_id): """ Check present lve top container """ return os.path.exists(self.get_list_path(lvp_id)) def detect_inside_lvp(self, id_): """ Find in which lve top container """ for lvp_id in self.lvp_id_list(): if self.exist_lvp(lvp_id) and self.check_inside_list(id_, lvp_id): return lvp_id def map(self): """ Obtain map from /proc/lve/map as dict """ map_dict = {} with open(self.proc_lve_map(), encoding='utf-8') as map_: lines = map_.read().strip() lines = lines.split('\n')[1:] for line in lines: lve_id_str, lvp_id_str = line.strip().split() map_dict[int(lve_id_str)] = int(lvp_id_str) return map_dict def map_lve_id_list(self, lvp_id): return [lve_id_ for lve_id_, lvp_id_ in list(self.map().items()) if lvp_id_ == lvp_id]
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
clcagefs.py
Edit
clcaptain.py
Edit
clconfig.py
Edit
clconfpars.py
Edit
clcustomscript.py
Edit
cldebug.py
Edit
clemail.py
Edit
clexception.py
Edit
clfunc.py
Edit
clhook.py
Edit
cllog.py
Edit
cloutput.py
Edit
clproc.py
Edit
clpwd.py
Edit
clquota.py
Edit
clsec.py
Edit
clwpos_lib.py
Edit
const.py
Edit
cpapi
Edit
evr_utils.py
Edit
features.py
Edit
group_info_reader.py
Edit
lib
Edit
lock.py
Edit
mail_helper.py
Edit
mysql_lib.py
Edit
php_conf_reader.py
Edit
public_hooks
Edit
sysctl.py
Edit
ui_config.py
Edit
utils.py
Edit
utils_cmd.py
Edit