diff --git a/pysnmp/smi/builder.py b/pysnmp/smi/builder.py index 6dc39e546..c5662e18c 100644 --- a/pysnmp/smi/builder.py +++ b/pysnmp/smi/builder.py @@ -13,6 +13,7 @@ try: import importlib + import importlib.util try: PY_MAGIC_NUMBER = importlib.util.MAGIC_NUMBER @@ -26,10 +27,8 @@ import imp PY_MAGIC_NUMBER = imp.get_magic() - SOURCE_SUFFIXES = [s[0] for s in imp.get_suffixes() - if s[2] == imp.PY_SOURCE] - BYTECODE_SUFFIXES = [s[0] for s in imp.get_suffixes() - if s[2] == imp.PY_COMPILED] + SOURCE_SUFFIXES = [s[0] for s in imp.get_suffixes() if s[2] == imp.PY_SOURCE] + BYTECODE_SUFFIXES = [s[0] for s in imp.get_suffixes() if s[2] == imp.PY_COMPILED] PY_SUFFIXES = SOURCE_SUFFIXES + BYTECODE_SUFFIXES @@ -49,26 +48,26 @@ class __AbstractMibSource: def __init__(self, srcName): self._srcName = srcName self.__inited = None - debug.logger & debug.flagBld and debug.logger('trying %s' % self) + debug.logger & debug.flagBld and debug.logger("trying %s" % self) def __repr__(self): - return f'{self.__class__.__name__}({self._srcName!r})' + return f"{self.__class__.__name__}({self._srcName!r})" def _uniqNames(self, files): u = set() for f in files: - if f.startswith('__init__.'): + if f.startswith("__init__."): continue - u.update(f[:-len(sfx)] for sfx in PY_SUFFIXES if f.endswith(sfx)) + u.update(f[: -len(sfx)] for sfx in PY_SUFFIXES if f.endswith(sfx)) return tuple(u) # MibSource API follows - def fullPath(self, f='', sfx=''): - return self._srcName + (f and (os.sep + f + sfx) or '') + def fullPath(self, f="", sfx=""): + return self._srcName + (f and (os.sep + f + sfx) or "") def init(self): if self.__inited is None: @@ -88,35 +87,37 @@ def read(self, f): pycTime = pyTime = -1 for pycSfx in BYTECODE_SUFFIXES: - try: - pycData, pycPath = self._getData(f + pycSfx, 'rb') + pycData, pycPath = self._getData(f + pycSfx, "rb") except OSError: why = sys.exc_info()[1] if ENOENT == -1 or why.errno == ENOENT: debug.logger & debug.flagBld and debug.logger( - f'file {f + pycSfx} access error: {why}' + f"file {f + pycSfx} access error: {why}" ) else: - raise error.MibLoadError(f'MIB file {f + pycSfx} access error: {why}') + raise error.MibLoadError( + f"MIB file {f + pycSfx} access error: {why}" + ) else: if PY_MAGIC_NUMBER == pycData[:4]: pycData = pycData[4:] - pycTime = struct.unpack('= pyTime: return marshal.loads(pycData), pycSfx if pyTime != -1: - modData, pyPath = self._getData(f + pySfx, 'r') - return compile(modData, pyPath, 'exec'), pyPath + modData, pyPath = self._getData(f + pySfx, "r") + return compile(modData, pyPath, "exec"), pyPath - raise OSError(ENOENT, 'No suitable module found', f) + raise OSError(ENOENT, "No suitable module found", f) # Interfaces for subclasses def _init(self): @@ -160,16 +165,16 @@ def _getData(self, f, mode): class ZipMibSource(__AbstractMibSource): def _init(self): try: - p = __import__(self._srcName, globals(), locals(), ['__init__']) - if hasattr(p, '__loader__') and hasattr(p.__loader__, '_files'): + p = __import__(self._srcName, globals(), locals(), ["__init__"]) + if hasattr(p, "__loader__") and hasattr(p.__loader__, "_files"): self.__loader = p.__loader__ - self._srcName = self._srcName.replace('.', os.sep) + self._srcName = self._srcName.replace(".", os.sep) return self - elif hasattr(p, '__file__'): + elif hasattr(p, "__file__"): # Dir relative to PYTHONPATH return DirMibSource(os.path.split(p.__file__)[0]).init() else: - raise error.MibLoadError(f'{p} access error') + raise error.MibLoadError(f"{p} access error") except ImportError: # Dir relative to CWD @@ -177,15 +182,17 @@ def _init(self): @staticmethod def _parseDosTime(dosdate, dostime): - t = (((dosdate >> 9) & 0x7f) + 1980, # year - ((dosdate >> 5) & 0x0f), # month - dosdate & 0x1f, # mday - (dostime >> 11) & 0x1f, # hour - (dostime >> 5) & 0x3f, # min - (dostime & 0x1f) * 2, # sec - -1, # wday - -1, # yday - -1) # dst + t = ( + ((dosdate >> 9) & 0x7F) + 1980, # year + ((dosdate >> 5) & 0x0F), # month + dosdate & 0x1F, # mday + (dostime >> 11) & 0x1F, # hour + (dostime >> 5) & 0x3F, # min + (dostime & 0x1F) * 2, # sec + -1, # wday + -1, # yday + -1, + ) # dst return time.mktime(t) def _listdir(self): @@ -206,7 +213,7 @@ def _getTimestamp(self, f): self.__loader._files[p][6], self.__loader._files[p][5] ) else: - raise OSError(ENOENT, 'No such file in ZIP archive', p) + raise OSError(ENOENT, "No such file in ZIP archive", p) def _getData(self, f, mode=None): p = os.path.join(self._srcName, f) @@ -215,7 +222,7 @@ def _getData(self, f, mode=None): except Exception: # ZIP code seems to return all kinds of errors why = sys.exc_info() - raise OSError(ENOENT, f'File or ZIP archive {p} access error: {why[1]}') + raise OSError(ENOENT, f"File or ZIP archive {p} access error: {why[1]}") class DirMibSource(__AbstractMibSource): @@ -229,7 +236,8 @@ def _listdir(self): except OSError: why = sys.exc_info() debug.logger & debug.flagBld and debug.logger( - f'listdir() failed for {self._srcName}: {why[1]}') + f"listdir() failed for {self._srcName}: {why[1]}" + ) return () def _getTimestamp(self, f): @@ -237,10 +245,10 @@ def _getTimestamp(self, f): try: return os.stat(p)[8] except OSError: - raise OSError(ENOENT, 'No such file: %s' % sys.exc_info()[1], p) + raise OSError(ENOENT, "No such file: %s" % sys.exc_info()[1], p) def _getData(self, f, mode): - p = os.path.join(self._srcName, '*') + p = os.path.join(self._srcName, "*") try: if f in os.listdir(self._srcName): # make FS case-sensitive p = os.path.join(self._srcName, f) @@ -251,20 +259,19 @@ def _getData(self, f, mode): except OSError: why = sys.exc_info() - msg = f'File or directory {p} access error: {why[1]}' + msg = f"File or directory {p} access error: {why[1]}" else: - msg = 'No such file or directory: %s' % p + msg = "No such file or directory: %s" % p raise OSError(ENOENT, msg) + class MibBuilder: - defaultCoreMibs = os.pathsep.join( - ('pysnmp.smi.mibs.instances', 'pysnmp.smi.mibs') - ) - defaultMiscMibs = 'pysnmp_mibs' + defaultCoreMibs = os.pathsep.join(("pysnmp.smi.mibs.instances", "pysnmp.smi.mibs")) + defaultMiscMibs = "pysnmp_mibs" - moduleID = 'PYSNMP_MODULE_ID' + moduleID = "PYSNMP_MODULE_ID" loadTexts = False @@ -274,7 +281,7 @@ class MibBuilder: def __init__(self): self.lastBuildId = self._autoName = 0 sources = [] - for ev in 'PYSNMP_MIB_PKGS', 'PYSNMP_MIB_DIRS', 'PYSNMP_MIB_DIR': + for ev in "PYSNMP_MIB_PKGS", "PYSNMP_MIB_DIRS", "PYSNMP_MIB_DIR": if ev in os.environ: for m in os.environ[ev].split(os.pathsep): sources.append(ZipMibSource(m)) @@ -304,11 +311,15 @@ def setMibCompiler(self, mibCompiler, destDir): def addMibSources(self, *mibSources): self.__mibSources.extend([s.init() for s in mibSources]) - debug.logger & debug.flagBld and debug.logger(f'addMibSources: new MIB sources {self.__mibSources}') + debug.logger & debug.flagBld and debug.logger( + f"addMibSources: new MIB sources {self.__mibSources}" + ) def setMibSources(self, *mibSources): self.__mibSources = [s.init() for s in mibSources] - debug.logger & debug.flagBld and debug.logger(f'setMibSources: new MIB sources {self.__mibSources}') + debug.logger & debug.flagBld and debug.logger( + f"setMibSources: new MIB sources {self.__mibSources}" + ) def getMibSources(self): return tuple(self.__mibSources) @@ -324,34 +335,41 @@ def getMibPath(self): paths += (mibSource.fullPath(),) else: raise error.MibLoadError( - f'MIB source is not a plain directory: {mibSource}' + f"MIB source is not a plain directory: {mibSource}" ) return paths def loadModule(self, modName, **userCtx): """Load and execute MIB modules as Python code""" for mibSource in self.__mibSources: - debug.logger & debug.flagBld and debug.logger(f'loadModule: trying {modName} at {mibSource}') + debug.logger & debug.flagBld and debug.logger( + f"loadModule: trying {modName} at {mibSource}" + ) try: codeObj, sfx = mibSource.read(modName) except OSError: debug.logger & debug.flagBld and debug.logger( - f'loadModule: read {modName} from {mibSource} failed: {sys.exc_info()[1]}') + f"loadModule: read {modName} from {mibSource} failed: {sys.exc_info()[1]}" + ) continue modPath = mibSource.fullPath(modName, sfx) if modPath in self.__modPathsSeen: - debug.logger & debug.flagBld and debug.logger('loadModule: seen %s' % modPath) + debug.logger & debug.flagBld and debug.logger( + "loadModule: seen %s" % modPath + ) break else: self.__modPathsSeen.add(modPath) - debug.logger & debug.flagBld and debug.logger('loadModule: evaluating %s' % modPath) + debug.logger & debug.flagBld and debug.logger( + "loadModule: evaluating %s" % modPath + ) - g = {'mibBuilder': self, 'userCtx': userCtx} + g = {"mibBuilder": self, "userCtx": userCtx} try: exec(codeObj, g) @@ -359,19 +377,23 @@ def loadModule(self, modName, **userCtx): except Exception: self.__modPathsSeen.remove(modPath) raise error.MibLoadError( - f'MIB module \'{modPath}\' load error: {traceback.format_exception(*sys.exc_info())}' + f"MIB module '{modPath}' load error: {traceback.format_exception(*sys.exc_info())}" ) self.__modSeen[modName] = modPath - debug.logger & debug.flagBld and debug.logger('loadModule: loaded %s' % modPath) + debug.logger & debug.flagBld and debug.logger( + "loadModule: loaded %s" % modPath + ) break if modName not in self.__modSeen: raise error.MibNotFoundError( - 'MIB file \"{}\" not found in search path ({})'.format( - modName and modName + ".py[co]", ', '.join([str(x) for x in self.__mibSources])) + 'MIB file "{}" not found in search path ({})'.format( + modName and modName + ".py[co]", + ", ".join([str(x) for x in self.__mibSources]), + ) ) return self @@ -387,9 +409,7 @@ def loadModules(self, *modNames, **userCtx): modNames = list(modNames) if not modNames: - raise error.MibNotFoundError( - f'No MIB module to load at {self}' - ) + raise error.MibNotFoundError(f"No MIB module to load at {self}") for modName in modNames: try: @@ -397,12 +417,23 @@ def loadModules(self, *modNames, **userCtx): except error.MibNotFoundError: if self.__mibCompiler: - debug.logger & debug.flagBld and debug.logger('loadModules: calling MIB compiler for %s' % modName) - status = self.__mibCompiler.compile(modName, genTexts=self.loadTexts) - errs = '; '.join([hasattr(x, 'error') and str(x.error) or x for x in status.values() if - x in ('failed', 'missing')]) + debug.logger & debug.flagBld and debug.logger( + "loadModules: calling MIB compiler for %s" % modName + ) + status = self.__mibCompiler.compile( + modName, genTexts=self.loadTexts + ) + errs = "; ".join( + [ + hasattr(x, "error") and str(x.error) or x + for x in status.values() + if x in ("failed", "missing") + ] + ) if errs: - raise error.MibNotFoundError(f'{modName} compilation error(s): {errs}') + raise error.MibNotFoundError( + f"{modName} compilation error(s): {errs}" + ) # compilation succeeded, MIB might load now self.loadModule(modName, **userCtx) @@ -414,34 +445,26 @@ def unloadModules(self, *modNames): modNames = list(self.mibSymbols.keys()) for modName in modNames: if modName not in self.mibSymbols: - raise error.MibNotFoundError( - f'No module {modName} at {self}' - ) + raise error.MibNotFoundError(f"No module {modName} at {self}") self.unexportSymbols(modName) self.__modPathsSeen.remove(self.__modSeen[modName]) del self.__modSeen[modName] - debug.logger & debug.flagBld and debug.logger('unloadModules: %s' % modName) + debug.logger & debug.flagBld and debug.logger("unloadModules: %s" % modName) return self def importSymbols(self, modName, *symNames, **userCtx): if not modName: - raise error.SmiError( - 'importSymbols: empty MIB module name' - ) + raise error.SmiError("importSymbols: empty MIB module name") r = () for symName in symNames: if modName not in self.mibSymbols: self.loadModules(modName, **userCtx) if modName not in self.mibSymbols: - raise error.MibNotFoundError( - f'No module {modName} loaded at {self}' - ) + raise error.MibNotFoundError(f"No module {modName} loaded at {self}") if symName not in self.mibSymbols[modName]: - raise error.SmiError( - f'No symbol {modName}::{symName} at {self}' - ) + raise error.SmiError(f"No symbol {modName}::{symName} at {self}") r = r + (self.mibSymbols[modName][symName],) return r @@ -452,17 +475,16 @@ def exportSymbols(self, modName, *anonymousSyms, **namedSyms): for symObj in anonymousSyms: debug.logger & debug.flagBld and debug.logger( - 'exportSymbols: anonymous symbol %s::__pysnmp_%ld' % (modName, self._autoName)) - mibSymbols['__pysnmp_%ld' % self._autoName] = symObj + "exportSymbols: anonymous symbol %s::__pysnmp_%ld" + % (modName, self._autoName) + ) + mibSymbols["__pysnmp_%ld" % self._autoName] = symObj self._autoName += 1 for symName, symObj in namedSyms.items(): if symName in mibSymbols: - raise error.SmiError( - f'Symbol {symName} already exported at {modName}' - ) + raise error.SmiError(f"Symbol {symName} already exported at {modName}") - if symName != self.moduleID and \ - not isinstance(symObj, classTypes): + if symName != self.moduleID and not isinstance(symObj, classTypes): label = symObj.getLabel() if label: symName = label @@ -471,24 +493,26 @@ def exportSymbols(self, modName, *anonymousSyms, **namedSyms): mibSymbols[symName] = symObj - debug.logger & debug.flagBld and debug.logger(f'exportSymbols: symbol {modName}::{symName}') + debug.logger & debug.flagBld and debug.logger( + f"exportSymbols: symbol {modName}::{symName}" + ) self.lastBuildId += 1 def unexportSymbols(self, modName, *symNames): if modName not in self.mibSymbols: - raise error.SmiError(f'No module {modName} at {self}') + raise error.SmiError(f"No module {modName} at {self}") mibSymbols = self.mibSymbols[modName] if not symNames: symNames = list(mibSymbols.keys()) for symName in symNames: if symName not in mibSymbols: - raise error.SmiError( - f'No symbol {modName}::{symName} at {self}' - ) + raise error.SmiError(f"No symbol {modName}::{symName} at {self}") del mibSymbols[symName] - debug.logger & debug.flagBld and debug.logger(f'unexportSymbols: symbol {modName}::{symName}') + debug.logger & debug.flagBld and debug.logger( + f"unexportSymbols: symbol {modName}::{symName}" + ) if not self.mibSymbols[modName]: del self.mibSymbols[modName]