From 4c308694666d3e4649545153b3f02488ce0b3d47 Mon Sep 17 00:00:00 2001 From: Lex Li Date: Tue, 13 Feb 2024 02:53:35 -0500 Subject: [PATCH] Added and revised annotations. --- pysnmp/entity/engine.py | 12 +- pysnmp/entity/rfc3413/cmdgen.py | 674 +++++++++++++++++++++--------- pysnmp/entity/rfc3413/config.py | 261 +++++++----- pysnmp/hlapi/asyncio/cmdgen.py | 169 +++++--- pysnmp/hlapi/asyncio/slim.py | 51 ++- pysnmp/hlapi/asyncio/transport.py | 8 +- pysnmp/hlapi/transport.py | 8 +- pysnmp/proto/rfc3412.py | 543 +++++++++++++++--------- 8 files changed, 1152 insertions(+), 574 deletions(-) diff --git a/pysnmp/entity/engine.py b/pysnmp/entity/engine.py index eb272ce11..535eeff7d 100644 --- a/pysnmp/entity/engine.py +++ b/pysnmp/entity/engine.py @@ -8,7 +8,7 @@ import shutil import sys import tempfile -from typing import Tuple +from typing import Any, Dict, Tuple from pyasn1.compat.octets import str2octs from pysnmp.carrier.base import AbstractTransportAddress, AbstractTransportDispatcher from pysnmp.proto.rfc3412 import MsgAndPduDispatcher @@ -58,7 +58,7 @@ class SnmpEngine: """ - transportDispatcher: AbstractTransportDispatcher + transportDispatcher: AbstractTransportDispatcher | None def __init__( self, snmpEngineID=None, maxMessageSize: int = 65507, msgAndPduDsp=None @@ -92,7 +92,7 @@ def __init__( raise error.PySnmpError("MIB instrumentation does not yet exist") ( snmpEngineMaxMessageSize, - ) = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + ) = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( # type: ignore "__SNMP-FRAMEWORK-MIB", "snmpEngineMaxMessageSize" ) snmpEngineMaxMessageSize.syntax = snmpEngineMaxMessageSize.syntax.clone( @@ -100,13 +100,13 @@ def __init__( ) ( snmpEngineBoots, - ) = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + ) = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( # type: ignore "__SNMP-FRAMEWORK-MIB", "snmpEngineBoots" ) snmpEngineBoots.syntax += 1 ( origSnmpEngineID, - ) = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + ) = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( # type: ignore "__SNMP-FRAMEWORK-MIB", "snmpEngineID" ) @@ -215,7 +215,7 @@ def getMibBuilder(self): def setUserContext(self, **kwargs): self.cache.update({"__%s" % k: kwargs[k] for k in kwargs}) - def getUserContext(self, arg): + def getUserContext(self, arg) -> Dict[str, Any] | None: # TODO: fix this type check return self.cache.get("__%s" % arg) def delUserContext(self, arg): diff --git a/pysnmp/entity/rfc3413/cmdgen.py b/pysnmp/entity/rfc3413/cmdgen.py index 38c0c1dc2..70f0999ad 100644 --- a/pysnmp/entity/rfc3413/cmdgen.py +++ b/pysnmp/entity/rfc3413/cmdgen.py @@ -5,6 +5,7 @@ # License: https://www.pysnmp.com/pysnmp/license.html # import sys +from pysnmp.entity.engine import SnmpEngine from pysnmp.entity.rfc3413 import config from pysnmp.proto import rfc1905, errind from pysnmp.proto.api import v2c @@ -13,9 +14,9 @@ from pysnmp.proto.error import StatusInformation from pyasn1.type import univ -getNextHandle = nextid.Integer(0x7fffffff) +getNextHandle = nextid.Integer(0x7FFFFFFF) -__null = univ.Null('') +__null = univ.Null("") def getNextVarBinds(varBinds, origVarBinds=None): @@ -24,12 +25,17 @@ def getNextVarBinds(varBinds, origVarBinds=None): rspVarBinds = [] while idx: idx -= 1 - if varBinds[idx][1].tagSet in (rfc1905.NoSuchObject.tagSet, - rfc1905.NoSuchInstance.tagSet, - rfc1905.EndOfMibView.tagSet): + if varBinds[idx][1].tagSet in ( + rfc1905.NoSuchObject.tagSet, + rfc1905.NoSuchInstance.tagSet, + rfc1905.EndOfMibView.tagSet, + ): nonNulls -= 1 elif origVarBinds is not None: - if v2c.ObjectIdentifier(origVarBinds[idx][0]).asTuple() >= varBinds[idx][0].asTuple(): + if ( + v2c.ObjectIdentifier(origVarBinds[idx][0]).asTuple() + >= varBinds[idx][0].asTuple() + ): errorIndication = errind.oidNotIncreasing rspVarBinds.insert(0, (varBinds[idx][0], __null)) @@ -41,37 +47,59 @@ def getNextVarBinds(varBinds, origVarBinds=None): class CommandGenerator: - _null = univ.Null('') + _null = univ.Null("") def __init__(self, **options): self.__options = options self.__pendingReqs = {} - def processResponsePdu(self, snmpEngine, messageProcessingModel, - securityModel, securityName, securityLevel, - contextEngineId, contextName, pduVersion, - PDU, statusInformation, sendPduHandle, cbCtx): + def processResponsePdu( + self, + snmpEngine, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + statusInformation, + sendPduHandle, + cbCtx, + ): origSendRequestHandle, cbFun, cbCtx = cbCtx # 3.1.1 if sendPduHandle not in self.__pendingReqs: - raise error.PySnmpError('Missing sendPduHandle %s' % sendPduHandle) - - (origTransportDomain, origTransportAddress, - origMessageProcessingModel, origSecurityModel, - origSecurityName, origSecurityLevel, origContextEngineId, - origContextName, origPduVersion, origPdu, - origTimeout, origRetryCount, - origRetries, origDiscoveryRetries) = self.__pendingReqs.pop(sendPduHandle) + raise error.PySnmpError("Missing sendPduHandle %s" % sendPduHandle) + + ( + origTransportDomain, + origTransportAddress, + origMessageProcessingModel, + origSecurityModel, + origSecurityName, + origSecurityLevel, + origContextEngineId, + origContextName, + origPduVersion, + origPdu, + origTimeout, + origRetryCount, + origRetries, + origDiscoveryRetries, + ) = self.__pendingReqs.pop(sendPduHandle) snmpEngine.transportDispatcher.jobFinished(id(self)) # 3.1.3 if statusInformation: debug.logger & debug.flagApp and debug.logger( - f'processResponsePdu: sendPduHandle {sendPduHandle}, statusInformation {statusInformation}') + f"processResponsePdu: sendPduHandle {sendPduHandle}, statusInformation {statusInformation}" + ) - errorIndication = statusInformation['errorIndication'] + errorIndication = statusInformation["errorIndication"] if errorIndication in (errind.notInTimeWindow, errind.unknownEngineID): origDiscoveryRetries += 1 @@ -80,9 +108,14 @@ def processResponsePdu(self, snmpEngine, messageProcessingModel, origDiscoveryRetries = 0 origRetries += 1 - if origRetries > origRetryCount or origDiscoveryRetries > self.__options.get('discoveryRetries', 4): + if ( + origRetries > origRetryCount + or origDiscoveryRetries > self.__options.get("discoveryRetries", 4) + ): debug.logger & debug.flagApp and debug.logger( - 'processResponsePdu: sendPduHandle %s, retry count %d exceeded' % (sendPduHandle, origRetries)) + "processResponsePdu: sendPduHandle %s, retry count %d exceeded" + % (sendPduHandle, origRetries) + ) cbFun(snmpEngine, origSendRequestHandle, errorIndication, None, cbCtx) return @@ -96,45 +129,75 @@ def processResponsePdu(self, snmpEngine, messageProcessingModel, try: sendPduHandle = snmpEngine.msgAndPduDsp.sendPdu( - snmpEngine, origTransportDomain, origTransportAddress, - origMessageProcessingModel, origSecurityModel, - origSecurityName, origSecurityLevel, origContextEngineId, - origContextName, pduVersion, reqPDU, - True, origTimeout, self.processResponsePdu, - (origSendRequestHandle, cbFun, cbCtx)) + snmpEngine, + origTransportDomain, + origTransportAddress, + origMessageProcessingModel, + origSecurityModel, + origSecurityName, + origSecurityLevel, + origContextEngineId, + origContextName, + pduVersion, + reqPDU, + True, + origTimeout, + self.processResponsePdu, + (origSendRequestHandle, cbFun, cbCtx), + ) snmpEngine.transportDispatcher.jobStarted(id(self)) self.__pendingReqs[sendPduHandle] = ( - origTransportDomain, origTransportAddress, - origMessageProcessingModel, origSecurityModel, - origSecurityName, origSecurityLevel, origContextEngineId, - origContextName, origPduVersion, origPdu, origTimeout, - origRetryCount, origRetries, origDiscoveryRetries + origTransportDomain, + origTransportAddress, + origMessageProcessingModel, + origSecurityModel, + origSecurityName, + origSecurityLevel, + origContextEngineId, + origContextName, + origPduVersion, + origPdu, + origTimeout, + origRetryCount, + origRetries, + origDiscoveryRetries, ) return except StatusInformation: statusInformation = sys.exc_info()[1] debug.logger & debug.flagApp and debug.logger( - 'processResponsePdu: origSendRequestHandle {}, _sendPdu() failed with {!r}'.format( - sendPduHandle, statusInformation)) - cbFun(snmpEngine, origSendRequestHandle, - statusInformation['errorIndication'], - None, cbCtx) + "processResponsePdu: origSendRequestHandle {}, _sendPdu() failed with {!r}".format( + sendPduHandle, statusInformation + ) + ) + cbFun( + snmpEngine, + origSendRequestHandle, + statusInformation["errorIndication"], # type: ignore + None, + cbCtx, + ) return - if (origMessageProcessingModel != messageProcessingModel or - origSecurityModel != securityModel or - origSecurityName != origSecurityName or - origContextEngineId and origContextEngineId != contextEngineId or - origContextName and origContextName != contextName or - origPduVersion != pduVersion): + if ( + origMessageProcessingModel != messageProcessingModel + or origSecurityModel != securityModel + or origSecurityName != origSecurityName + or origContextEngineId + and origContextEngineId != contextEngineId + or origContextName + and origContextName != contextName + or origPduVersion != pduVersion + ): debug.logger & debug.flagApp and debug.logger( - 'processResponsePdu: sendPduHandle %s, request/response data mismatch' % sendPduHandle) + "processResponsePdu: sendPduHandle %s, request/response data mismatch" + % sendPduHandle + ) - cbFun(snmpEngine, origSendRequestHandle, - 'badResponse', None, cbCtx) + cbFun(snmpEngine, origSendRequestHandle, "badResponse", None, cbCtx) return # User-side API assumes SMIv2 @@ -144,25 +207,45 @@ def processResponsePdu(self, snmpEngine, messageProcessingModel, # 3.1.2 if v2c.apiPDU.getRequestID(PDU) != v2c.apiPDU.getRequestID(origPdu): debug.logger & debug.flagApp and debug.logger( - 'processResponsePdu: sendPduHandle %s, request-id/response-id mismatch' % sendPduHandle) - cbFun(snmpEngine, origSendRequestHandle, - 'badResponse', None, cbCtx) + "processResponsePdu: sendPduHandle %s, request-id/response-id mismatch" + % sendPduHandle + ) + cbFun(snmpEngine, origSendRequestHandle, "badResponse", None, cbCtx) return cbFun(snmpEngine, origSendRequestHandle, None, PDU, cbCtx) - def sendPdu(self, snmpEngine, targetName, contextEngineId, - contextName, PDU, cbFun, cbCtx): - (transportDomain, transportAddress, timeout, - retryCount, messageProcessingModel, securityModel, - securityName, - securityLevel) = config.getTargetInfo(snmpEngine, targetName) - + def sendPdu( + self, + snmpEngine: SnmpEngine, + targetName, + contextEngineId, + contextName, + PDU, + cbFun, + cbCtx, + ): + ( + transportDomain, + transportAddress, + timeout, + retryCount, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + ) = config.getTargetInfo(snmpEngine, targetName) + + if snmpEngine.transportDispatcher is None: + raise error.PySnmpError("No transport dispatcher available") # Convert timeout in seconds into timeout in timer ticks - timeoutInTicks = float(timeout) / 100 / snmpEngine.transportDispatcher.getTimerResolution() + timeoutInTicks = ( + float(timeout) / 100 / snmpEngine.transportDispatcher.getTimerResolution() + ) - SnmpEngineID, SnmpAdminString = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( - 'SNMP-FRAMEWORK-MIB', 'SnmpEngineID', 'SnmpAdminString') + SnmpEngineID, SnmpAdminString = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( # type: ignore + "SNMP-FRAMEWORK-MIB", "SnmpEngineID", "SnmpAdminString" + ) # Cast possible strings into bytes if contextEngineId: @@ -182,25 +265,46 @@ def sendPdu(self, snmpEngine, targetName, contextEngineId, # 3.1 sendPduHandle = snmpEngine.msgAndPduDsp.sendPdu( - snmpEngine, transportDomain, transportAddress, - messageProcessingModel, securityModel, securityName, - securityLevel, contextEngineId, contextName, - pduVersion, PDU, True, timeoutInTicks, self.processResponsePdu, - (sendRequestHandle, cbFun, cbCtx) + snmpEngine, + transportDomain, + transportAddress, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + True, + timeoutInTicks, + self.processResponsePdu, + (sendRequestHandle, cbFun, cbCtx), ) snmpEngine.transportDispatcher.jobStarted(id(self)) self.__pendingReqs[sendPduHandle] = ( - transportDomain, transportAddress, messageProcessingModel, - securityModel, securityName, securityLevel, contextEngineId, - contextName, pduVersion, origPDU, timeoutInTicks, - retryCount, 0, 0 + transportDomain, + transportAddress, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + origPDU, + timeoutInTicks, + retryCount, + 0, + 0, ) debug.logger & debug.flagApp and debug.logger( - 'sendPdu: sendPduHandle %s, timeout %d*10 ms/%d ticks, retry 0 of %d' % ( - sendPduHandle, timeout, timeoutInTicks, retryCount)) + "sendPdu: sendPduHandle %s, timeout %d*10 ms/%d ticks, retry 0 of %d" + % (sendPduHandle, timeout, timeoutInTicks, retryCount) + ) return sendRequestHandle @@ -210,80 +314,139 @@ def sendPdu(self, snmpEngine, targetName, contextEngineId, class GetCommandGenerator(CommandGenerator): - def processResponseVarBinds(self, snmpEngine, sendRequestHandle, - errorIndication, PDU, cbCtx): + def processResponseVarBinds( + self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx + ): cbFun, cbCtx = cbCtx - cbFun(snmpEngine, sendRequestHandle, errorIndication, - PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, - PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, - PDU and v2c.apiPDU.getVarBinds(PDU) or (), cbCtx) + cbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, + PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, + PDU and v2c.apiPDU.getVarBinds(PDU) or (), + cbCtx, + ) - def sendVarBinds(self, snmpEngine, targetName, contextEngineId, - contextName, varBinds, cbFun, cbCtx=None): + def sendVarBinds( + self, + snmpEngine, + targetName, + contextEngineId, + contextName, + varBinds, + cbFun, + cbCtx=None, + ): reqPDU = v2c.GetRequestPDU() v2c.apiPDU.setDefaults(reqPDU) v2c.apiPDU.setVarBinds(reqPDU, varBinds) - return self.sendPdu(snmpEngine, targetName, contextEngineId, - contextName, reqPDU, self.processResponseVarBinds, - (cbFun, cbCtx)) + return self.sendPdu( + snmpEngine, + targetName, + contextEngineId, + contextName, + reqPDU, + self.processResponseVarBinds, + (cbFun, cbCtx), + ) class SetCommandGenerator(CommandGenerator): - def processResponseVarBinds(self, snmpEngine, sendRequestHandle, - errorIndication, PDU, cbCtx): + def processResponseVarBinds( + self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx + ): cbFun, cbCtx = cbCtx - cbFun(snmpEngine, sendRequestHandle, errorIndication, - PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, - PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, - PDU and v2c.apiPDU.getVarBinds(PDU) or (), cbCtx) + cbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, + PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, + PDU and v2c.apiPDU.getVarBinds(PDU) or (), + cbCtx, + ) - def sendVarBinds(self, snmpEngine, targetName, contextEngineId, - contextName, varBinds, cbFun, cbCtx=None): + def sendVarBinds( + self, + snmpEngine, + targetName, + contextEngineId, + contextName, + varBinds, + cbFun, + cbCtx=None, + ): reqPDU = v2c.SetRequestPDU() v2c.apiPDU.setDefaults(reqPDU) v2c.apiPDU.setVarBinds(reqPDU, varBinds) - return self.sendPdu(snmpEngine, targetName, contextEngineId, - contextName, reqPDU, - self.processResponseVarBinds, (cbFun, cbCtx)) + return self.sendPdu( + snmpEngine, + targetName, + contextEngineId, + contextName, + reqPDU, + self.processResponseVarBinds, + (cbFun, cbCtx), + ) class NextCommandGeneratorSingleRun(CommandGenerator): - def processResponseVarBinds(self, snmpEngine, sendRequestHandle, - errorIndication, PDU, cbCtx): + def processResponseVarBinds( + self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx + ): targetName, contextEngineId, contextName, reqPDU, cbFun, cbCtx = cbCtx - cbFun(snmpEngine, sendRequestHandle, errorIndication, - PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, - PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, - PDU and v2c.apiPDU.getVarBinds(PDU) or (), cbCtx) + cbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, + PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, + PDU and v2c.apiPDU.getVarBinds(PDU) or (), + cbCtx, + ) - def sendVarBinds(self, snmpEngine, targetName, contextEngineId, - contextName, varBinds, cbFun, cbCtx=None): + def sendVarBinds( + self, + snmpEngine, + targetName, + contextEngineId, + contextName, + varBinds, + cbFun, + cbCtx=None, + ): reqPDU = v2c.GetNextRequestPDU() v2c.apiPDU.setDefaults(reqPDU) v2c.apiPDU.setVarBinds(reqPDU, varBinds) - return self.sendPdu(snmpEngine, targetName, contextEngineId, - contextName, reqPDU, self.processResponseVarBinds, - (targetName, contextEngineId, contextName, - reqPDU, cbFun, cbCtx)) + return self.sendPdu( + snmpEngine, + targetName, + contextEngineId, + contextName, + reqPDU, + self.processResponseVarBinds, + (targetName, contextEngineId, contextName, reqPDU, cbFun, cbCtx), + ) class NextCommandGenerator(NextCommandGeneratorSingleRun): - def processResponseVarBinds(self, snmpEngine, sendRequestHandle, - errorIndication, PDU, cbCtx): + def processResponseVarBinds( + self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx + ): targetName, contextEngineId, contextName, reqPDU, cbFun, cbCtx = cbCtx if errorIndication: - cbFun(snmpEngine, sendRequestHandle, errorIndication, - 0, 0, (), cbCtx) + cbFun(snmpEngine, sendRequestHandle, errorIndication, 0, 0, (), cbCtx) return varBindTable = v2c.apiPDU.getVarBindTable(reqPDU, PDU) @@ -297,12 +460,19 @@ def processResponseVarBinds(self, snmpEngine, sendRequestHandle, varBindTable[-1], v2c.apiPDU.getVarBinds(reqPDU) ) - if not cbFun(snmpEngine, sendRequestHandle, errorIndication, - v2c.apiPDU.getErrorStatus(PDU), - v2c.apiPDU.getErrorIndex(PDU, muteErrors=True), - varBindTable, cbCtx): + if not cbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + v2c.apiPDU.getErrorStatus(PDU), + v2c.apiPDU.getErrorIndex(PDU, muteErrors=True), + varBindTable, + cbCtx, + ): debug.logger & debug.flagApp and debug.logger( - 'processResponseVarBinds: sendRequestHandle %s, app says to stop walking' % sendRequestHandle) + "processResponseVarBinds: sendRequestHandle %s, app says to stop walking" + % sendRequestHandle + ) return # app says enough if not varBinds: @@ -312,35 +482,69 @@ def processResponseVarBinds(self, snmpEngine, sendRequestHandle, v2c.apiPDU.setVarBinds(reqPDU, varBinds) try: - self.sendPdu(snmpEngine, targetName, contextEngineId, - contextName, reqPDU, - self.processResponseVarBinds, - (targetName, contextEngineId, contextName, - reqPDU, cbFun, cbCtx)) + self.sendPdu( + snmpEngine, + targetName, + contextEngineId, + contextName, + reqPDU, + self.processResponseVarBinds, + (targetName, contextEngineId, contextName, reqPDU, cbFun, cbCtx), + ) except StatusInformation: statusInformation = sys.exc_info()[1] debug.logger & debug.flagApp and debug.logger( - f'sendVarBinds: sendPduHandle {sendRequestHandle}: sendPdu() failed with {statusInformation!r}') - cbFun(snmpEngine, sendRequestHandle, - statusInformation['errorIndication'], - 0, 0, (), cbCtx) + f"sendVarBinds: sendPduHandle {sendRequestHandle}: sendPdu() failed with {statusInformation!r}" + ) + cbFun( + snmpEngine, + sendRequestHandle, + statusInformation["errorIndication"], # type: ignore + 0, + 0, + (), + cbCtx, + ) class BulkCommandGeneratorSingleRun(CommandGenerator): - def processResponseVarBinds(self, snmpEngine, sendRequestHandle, - errorIndication, PDU, cbCtx): - (targetName, nonRepeaters, maxRepetitions, - contextEngineId, contextName, reqPDU, cbFun, cbCtx) = cbCtx - - cbFun(snmpEngine, sendRequestHandle, errorIndication, - PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, - PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, - PDU and v2c.apiPDU.getVarBinds(PDU) or (), cbCtx) - - def sendVarBinds(self, snmpEngine, targetName, contextEngineId, - contextName, nonRepeaters, maxRepetitions, - varBinds, cbFun, cbCtx=None): + def processResponseVarBinds( + self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx + ): + ( + targetName, + nonRepeaters, + maxRepetitions, + contextEngineId, + contextName, + reqPDU, + cbFun, + cbCtx, + ) = cbCtx + + cbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + PDU and v2c.apiPDU.getErrorStatus(PDU) or 0, + PDU and v2c.apiPDU.getErrorIndex(PDU, muteErrors=True) or 0, + PDU and v2c.apiPDU.getVarBinds(PDU) or (), + cbCtx, + ) + + def sendVarBinds( + self, + snmpEngine, + targetName, + contextEngineId, + contextName, + nonRepeaters, + maxRepetitions, + varBinds, + cbFun, + cbCtx=None, + ): reqPDU = v2c.GetBulkRequestPDU() v2c.apiBulkPDU.setDefaults(reqPDU) @@ -349,23 +553,43 @@ def sendVarBinds(self, snmpEngine, targetName, contextEngineId, v2c.apiBulkPDU.setVarBinds(reqPDU, varBinds) - return self.sendPdu(snmpEngine, targetName, contextEngineId, - contextName, reqPDU, - self.processResponseVarBinds, - (targetName, nonRepeaters, maxRepetitions, - contextEngineId, contextName, reqPDU, - cbFun, cbCtx)) + return self.sendPdu( + snmpEngine, + targetName, + contextEngineId, + contextName, + reqPDU, + self.processResponseVarBinds, + ( + targetName, + nonRepeaters, + maxRepetitions, + contextEngineId, + contextName, + reqPDU, + cbFun, + cbCtx, + ), + ) class BulkCommandGenerator(BulkCommandGeneratorSingleRun): - def processResponseVarBinds(self, snmpEngine, sendRequestHandle, - errorIndication, PDU, cbCtx): - (targetName, nonRepeaters, maxRepetitions, - contextEngineId, contextName, reqPDU, cbFun, cbCtx) = cbCtx + def processResponseVarBinds( + self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx + ): + ( + targetName, + nonRepeaters, + maxRepetitions, + contextEngineId, + contextName, + reqPDU, + cbFun, + cbCtx, + ) = cbCtx if errorIndication: - cbFun(snmpEngine, sendRequestHandle, errorIndication, - 0, 0, (), cbCtx) + cbFun(snmpEngine, sendRequestHandle, errorIndication, 0, 0, (), cbCtx) return varBindTable = v2c.apiBulkPDU.getVarBindTable(reqPDU, PDU) @@ -380,14 +604,24 @@ def processResponseVarBinds(self, snmpEngine, sendRequestHandle, ) nonRepeaters = v2c.apiBulkPDU.getNonRepeaters(reqPDU) if nonRepeaters: - varBinds = v2c.apiBulkPDU.getVarBinds(reqPDU)[:int(nonRepeaters)] + varBinds[int(nonRepeaters):] + varBinds = ( + v2c.apiBulkPDU.getVarBinds(reqPDU)[: int(nonRepeaters)] + + varBinds[int(nonRepeaters) :] + ) - if not cbFun(snmpEngine, sendRequestHandle, errorIndication, - v2c.apiBulkPDU.getErrorStatus(PDU), - v2c.apiBulkPDU.getErrorIndex(PDU, muteErrors=True), - varBindTable, cbCtx): + if not cbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + v2c.apiBulkPDU.getErrorStatus(PDU), + v2c.apiBulkPDU.getErrorIndex(PDU, muteErrors=True), + varBindTable, + cbCtx, + ): debug.logger & debug.flagApp and debug.logger( - 'processResponseVarBinds: sendRequestHandle %s, app says to stop walking' % sendRequestHandle) + "processResponseVarBinds: sendRequestHandle %s, app says to stop walking" + % sendRequestHandle + ) return # app says enough if not varBinds: @@ -397,51 +631,113 @@ def processResponseVarBinds(self, snmpEngine, sendRequestHandle, v2c.apiBulkPDU.setVarBinds(reqPDU, varBinds) try: - self.sendPdu(snmpEngine, targetName, contextEngineId, - contextName, reqPDU, - self.processResponseVarBinds, - (targetName, nonRepeaters, maxRepetitions, - contextEngineId, contextName, reqPDU, cbFun, cbCtx)) + self.sendPdu( + snmpEngine, + targetName, + contextEngineId, + contextName, + reqPDU, + self.processResponseVarBinds, + ( + targetName, + nonRepeaters, + maxRepetitions, + contextEngineId, + contextName, + reqPDU, + cbFun, + cbCtx, + ), + ) except StatusInformation: statusInformation = sys.exc_info()[1] debug.logger & debug.flagApp and debug.logger( - 'processResponseVarBinds: sendPduHandle {}: _sendPdu() failed with {!r}'.format( - sendRequestHandle, statusInformation)) - cbFun(snmpEngine, sendRequestHandle, - statusInformation['errorIndication'], 0, 0, (), cbCtx) + "processResponseVarBinds: sendPduHandle {}: _sendPdu() failed with {!r}".format( + sendRequestHandle, statusInformation + ) + ) + cbFun( + snmpEngine, + sendRequestHandle, + statusInformation["errorIndication"], + 0, + 0, + (), + cbCtx, + ) # type: ignore # # Obsolete, compatibility interfaces. # -def __sendReqCbFun(snmpEngine, sendRequestHandle, errorIndication, - errorStatus, errorIndex, varBinds, cbCtx): - cbFun, cbCtx = cbCtx - return cbFun(sendRequestHandle, errorIndication, errorStatus, - errorIndex, varBinds, cbCtx) - - -def _sendReq(self, snmpEngine, targetName, varBinds, cbFun, - cbCtx=None, contextEngineId=None, contextName=''): - return self.sendVarBinds(snmpEngine, targetName, contextEngineId, - contextName, varBinds, __sendReqCbFun, - (cbFun, cbCtx)) - -def _sendBulkReq(self, snmpEngine, targetName, nonRepeaters, maxRepetitions, - varBinds, cbFun, cbCtx=None, contextEngineId=None, - contextName=''): - return self.sendVarBinds(snmpEngine, targetName, contextEngineId, - contextName, nonRepeaters, maxRepetitions, - varBinds, __sendReqCbFun, (cbFun, cbCtx)) +def __sendReqCbFun( + snmpEngine, + sendRequestHandle, + errorIndication, + errorStatus, + errorIndex, + varBinds, + cbCtx, +): + cbFun, cbCtx = cbCtx + return cbFun( + sendRequestHandle, errorIndication, errorStatus, errorIndex, varBinds, cbCtx + ) + + +def _sendReq( + self, + snmpEngine, + targetName, + varBinds, + cbFun, + cbCtx=None, + contextEngineId=None, + contextName="", +): + return self.sendVarBinds( + snmpEngine, + targetName, + contextEngineId, + contextName, + varBinds, + __sendReqCbFun, + (cbFun, cbCtx), + ) + + +def _sendBulkReq( + self, + snmpEngine, + targetName, + nonRepeaters, + maxRepetitions, + varBinds, + cbFun, + cbCtx=None, + contextEngineId=None, + contextName="", +): + return self.sendVarBinds( + snmpEngine, + targetName, + contextEngineId, + contextName, + nonRepeaters, + maxRepetitions, + varBinds, + __sendReqCbFun, + (cbFun, cbCtx), + ) # install compatibility wrappers -GetCommandGenerator.sendReq = _sendReq -SetCommandGenerator.sendReq = _sendReq -NextCommandGenerator.sendReq = _sendReq -NextCommandGeneratorSingleRun.sendReq = _sendReq -BulkCommandGenerator.sendReq = _sendBulkReq -BulkCommandGeneratorSingleRun.sendReq = _sendBulkReq +GetCommandGenerator.sendReq = _sendReq # type: ignore +SetCommandGenerator.sendReq = _sendReq # type: ignore +NextCommandGenerator.sendReq = _sendReq # type: ignore +NextCommandGeneratorSingleRun.sendReq = _sendReq # type: ignore +BulkCommandGenerator.sendReq = _sendBulkReq # type: ignore +BulkCommandGeneratorSingleRun.sendReq = _sendBulkReq # type: ignore diff --git a/pysnmp/entity/rfc3413/config.py b/pysnmp/entity/rfc3413/config.py index f683c7834..778192b7e 100644 --- a/pysnmp/entity/rfc3413/config.py +++ b/pysnmp/entity/rfc3413/config.py @@ -4,38 +4,46 @@ # Copyright (c) 2005-2019, Ilya Etingof # License: https://www.pysnmp.com/pysnmp/license.html # +from typing import Any, Dict +from pysnmp.entity.engine import SnmpEngine +from pysnmp.error import PySnmpError from pysnmp.smi.error import SmiError, NoSuchInstanceError from pysnmp.entity import config -def getTargetAddr(snmpEngine, snmpTargetAddrName): +def getTargetAddr(snmpEngine: SnmpEngine, snmpTargetAddrName): mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder - snmpTargetAddrEntry, = mibBuilder.importSymbols( - 'SNMP-TARGET-MIB', 'snmpTargetAddrEntry' + (snmpTargetAddrEntry,) = mibBuilder.importSymbols( # type: ignore + "SNMP-TARGET-MIB", "snmpTargetAddrEntry" ) - cache = snmpEngine.getUserContext('getTargetAddr') + cache = snmpEngine.getUserContext("getTargetAddr") if cache is None: - cache = {'id': -1} + cache: Dict[str, Any] | None = {"id": -1} snmpEngine.setUserContext(getTargetAddr=cache) - if cache['id'] != snmpTargetAddrEntry.branchVersionId: - cache['nameToTargetMap'] = {} + if cache["id"] != snmpTargetAddrEntry.branchVersionId: + cache["nameToTargetMap"] = {} - nameToTargetMap = cache['nameToTargetMap'] + nameToTargetMap = cache["nameToTargetMap"] if snmpTargetAddrName not in nameToTargetMap: - (snmpTargetAddrTDomain, - snmpTargetAddrTAddress, - snmpTargetAddrTimeout, - snmpTargetAddrRetryCount, - snmpTargetAddrParams) = mibBuilder.importSymbols( - 'SNMP-TARGET-MIB', 'snmpTargetAddrTDomain', - 'snmpTargetAddrTAddress', 'snmpTargetAddrTimeout', - 'snmpTargetAddrRetryCount', 'snmpTargetAddrParams' + ( + snmpTargetAddrTDomain, + snmpTargetAddrTAddress, + snmpTargetAddrTimeout, + snmpTargetAddrRetryCount, + snmpTargetAddrParams, + ) = mibBuilder.importSymbols( # type: ignore + "SNMP-TARGET-MIB", + "snmpTargetAddrTDomain", + "snmpTargetAddrTAddress", + "snmpTargetAddrTimeout", + "snmpTargetAddrRetryCount", + "snmpTargetAddrParams", ) - snmpSourceAddrTAddress, = mibBuilder.importSymbols('PYSNMP-SOURCE-MIB', 'snmpSourceAddrTAddress') + (snmpSourceAddrTAddress,) = mibBuilder.importSymbols("PYSNMP-SOURCE-MIB", "snmpSourceAddrTAddress") # type: ignore tblIdx = snmpTargetAddrEntry.getInstIdFromIndices(snmpTargetAddrName) @@ -59,64 +67,78 @@ def getTargetAddr(snmpEngine, snmpTargetAddrName): snmpSourceAddrTAddress.name + tblIdx ).syntax except NoSuchInstanceError: - raise SmiError('Target %s not configured to LCD' % snmpTargetAddrName) + raise SmiError("Target %s not configured to LCD" % snmpTargetAddrName) + if snmpEngine.transportDispatcher is None: + raise PySnmpError("TransportDispatcher not set") transport = snmpEngine.transportDispatcher.getTransport(snmpTargetAddrTDomain) - if snmpTargetAddrTDomain[:len(config.snmpUDPDomain)] == config.snmpUDPDomain: - SnmpUDPAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMPv2-TM', - 'SnmpUDPAddress') - snmpTargetAddrTAddress = transport.addressType( + if snmpTargetAddrTDomain[: len(config.snmpUDPDomain)] == config.snmpUDPDomain: + ( + SnmpUDPAddress, + ) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( # type: ignore + "SNMPv2-TM", "SnmpUDPAddress" + ) + addr = transport.addressType( # type: ignore SnmpUDPAddress(snmpTargetAddrTAddress) ).setLocalAddress(SnmpUDPAddress(snmpSourceAddrTAddress)) - elif snmpTargetAddrTDomain[:len(config.snmpUDP6Domain)] == config.snmpUDP6Domain: - TransportAddressIPv6, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( - 'TRANSPORT-ADDRESS-MIB', 'TransportAddressIPv6') - snmpTargetAddrTAddress = transport.addressType( + elif ( + snmpTargetAddrTDomain[: len(config.snmpUDP6Domain)] == config.snmpUDP6Domain + ): + (TransportAddressIPv6,) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( # type: ignore + "TRANSPORT-ADDRESS-MIB", "TransportAddressIPv6" + ) + addr = transport.addressType( # type: ignore TransportAddressIPv6(snmpTargetAddrTAddress) ).setLocalAddress(TransportAddressIPv6(snmpSourceAddrTAddress)) - elif snmpTargetAddrTDomain[:len(config.snmpLocalDomain)] == config.snmpLocalDomain: - snmpTargetAddrTAddress = transport.addressType( - snmpTargetAddrTAddress - ) + elif ( + snmpTargetAddrTDomain[: len(config.snmpLocalDomain)] + == config.snmpLocalDomain + ): + addr = transport.addressType(snmpTargetAddrTAddress) # type: ignore nameToTargetMap[snmpTargetAddrName] = ( snmpTargetAddrTDomain, - snmpTargetAddrTAddress, + addr, snmpTargetAddrTimeout, snmpTargetAddrRetryCount, - snmpTargetAddrParams + snmpTargetAddrParams, ) - cache['id'] = snmpTargetAddrEntry.branchVersionId + cache["id"] = snmpTargetAddrEntry.branchVersionId return nameToTargetMap[snmpTargetAddrName] -def getTargetParams(snmpEngine, paramsName): +def getTargetParams(snmpEngine: SnmpEngine, paramsName): mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder - snmpTargetParamsEntry, = mibBuilder.importSymbols( - 'SNMP-TARGET-MIB', 'snmpTargetParamsEntry' + (snmpTargetParamsEntry,) = mibBuilder.importSymbols( # type: ignore + "SNMP-TARGET-MIB", "snmpTargetParamsEntry" ) - cache = snmpEngine.getUserContext('getTargetParams') + cache = snmpEngine.getUserContext("getTargetParams") if cache is None: - cache = {'id': -1} + cache: Dict[str, Any] | None = {"id": -1} snmpEngine.setUserContext(getTargetParams=cache) - if cache['id'] != snmpTargetParamsEntry.branchVersionId: - cache['nameToParamsMap'] = {} + if cache["id"] != snmpTargetParamsEntry.branchVersionId: + cache["nameToParamsMap"] = {} - nameToParamsMap = cache['nameToParamsMap'] + nameToParamsMap = cache["nameToParamsMap"] if paramsName not in nameToParamsMap: - (snmpTargetParamsMPModel, snmpTargetParamsSecurityModel, - snmpTargetParamsSecurityName, - snmpTargetParamsSecurityLevel) = mibBuilder.importSymbols( - 'SNMP-TARGET-MIB', 'snmpTargetParamsMPModel', - 'snmpTargetParamsSecurityModel', 'snmpTargetParamsSecurityName', - 'snmpTargetParamsSecurityLevel' + ( + snmpTargetParamsMPModel, + snmpTargetParamsSecurityModel, + snmpTargetParamsSecurityName, + snmpTargetParamsSecurityLevel, + ) = mibBuilder.importSymbols( # type: ignore + "SNMP-TARGET-MIB", + "snmpTargetParamsMPModel", + "snmpTargetParamsSecurityModel", + "snmpTargetParamsSecurityName", + "snmpTargetParamsSecurityLevel", ) tblIdx = snmpTargetParamsEntry.getInstIdFromIndices(paramsName) @@ -135,105 +157,115 @@ def getTargetParams(snmpEngine, paramsName): snmpTargetParamsSecurityLevel.name + tblIdx ).syntax except NoSuchInstanceError: - raise SmiError('Parameters %s not configured at LCD' % paramsName) + raise SmiError("Parameters %s not configured at LCD" % paramsName) - nameToParamsMap[paramsName] = (snmpTargetParamsMPModel, - snmpTargetParamsSecurityModel, - snmpTargetParamsSecurityName, - snmpTargetParamsSecurityLevel) + nameToParamsMap[paramsName] = ( + snmpTargetParamsMPModel, + snmpTargetParamsSecurityModel, + snmpTargetParamsSecurityName, + snmpTargetParamsSecurityLevel, + ) - cache['id'] = snmpTargetParamsEntry.branchVersionId + cache["id"] = snmpTargetParamsEntry.branchVersionId return nameToParamsMap[paramsName] -def getTargetInfo(snmpEngine, snmpTargetAddrName): +def getTargetInfo(snmpEngine: SnmpEngine, snmpTargetAddrName): # Transport endpoint - (snmpTargetAddrTDomain, - snmpTargetAddrTAddress, - snmpTargetAddrTimeout, - snmpTargetAddrRetryCount, - snmpTargetAddrParams) = getTargetAddr(snmpEngine, snmpTargetAddrName) - - (snmpTargetParamsMPModel, - snmpTargetParamsSecurityModel, - snmpTargetParamsSecurityName, - snmpTargetParamsSecurityLevel) = getTargetParams(snmpEngine, - snmpTargetAddrParams) - - return (snmpTargetAddrTDomain, snmpTargetAddrTAddress, - snmpTargetAddrTimeout, snmpTargetAddrRetryCount, - snmpTargetParamsMPModel, snmpTargetParamsSecurityModel, - snmpTargetParamsSecurityName, snmpTargetParamsSecurityLevel) + ( + snmpTargetAddrTDomain, + snmpTargetAddrTAddress, + snmpTargetAddrTimeout, + snmpTargetAddrRetryCount, + snmpTargetAddrParams, + ) = getTargetAddr(snmpEngine, snmpTargetAddrName) + + ( + snmpTargetParamsMPModel, + snmpTargetParamsSecurityModel, + snmpTargetParamsSecurityName, + snmpTargetParamsSecurityLevel, + ) = getTargetParams(snmpEngine, snmpTargetAddrParams) + + return ( + snmpTargetAddrTDomain, + snmpTargetAddrTAddress, + snmpTargetAddrTimeout, + snmpTargetAddrRetryCount, + snmpTargetParamsMPModel, + snmpTargetParamsSecurityModel, + snmpTargetParamsSecurityName, + snmpTargetParamsSecurityLevel, + ) -def getNotificationInfo(snmpEngine, notificationTarget): +def getNotificationInfo(snmpEngine: SnmpEngine, notificationTarget): mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder - snmpNotifyEntry, = mibBuilder.importSymbols('SNMP-NOTIFICATION-MIB', - 'snmpNotifyEntry') + (snmpNotifyEntry,) = mibBuilder.importSymbols( # type: ignore + "SNMP-NOTIFICATION-MIB", "snmpNotifyEntry" + ) - cache = snmpEngine.getUserContext('getNotificationInfo') + cache = snmpEngine.getUserContext("getNotificationInfo") if cache is None: - cache = {'id': -1} + cache: Dict[str, Any] | None = {"id": -1} snmpEngine.setUserContext(getNotificationInfo=cache) - if cache['id'] != snmpNotifyEntry.branchVersionId: - cache['targetToNotifyMap'] = {} + if cache["id"] != snmpNotifyEntry.branchVersionId: + cache["targetToNotifyMap"] = {} # type: ignore - targetToNotifyMap = cache['targetToNotifyMap'] + targetToNotifyMap = cache["targetToNotifyMap"] if notificationTarget not in targetToNotifyMap: - (snmpNotifyTag, - snmpNotifyType) = mibBuilder.importSymbols('SNMP-NOTIFICATION-MIB', - 'snmpNotifyTag', - 'snmpNotifyType') + (snmpNotifyTag, snmpNotifyType) = mibBuilder.importSymbols( # type: ignore + "SNMP-NOTIFICATION-MIB", "snmpNotifyTag", "snmpNotifyType" + ) tblIdx = snmpNotifyEntry.getInstIdFromIndices(notificationTarget) try: - snmpNotifyTag = snmpNotifyTag.getNode( - snmpNotifyTag.name + tblIdx - ).syntax - snmpNotifyType = snmpNotifyType.getNode( - snmpNotifyType.name + tblIdx - ).syntax + snmpNotifyTag = snmpNotifyTag.getNode(snmpNotifyTag.name + tblIdx).syntax + snmpNotifyType = snmpNotifyType.getNode(snmpNotifyType.name + tblIdx).syntax except NoSuchInstanceError: - raise SmiError('Target %s not configured at LCD' % notificationTarget) + raise SmiError("Target %s not configured at LCD" % notificationTarget) - targetToNotifyMap[notificationTarget] = ( - snmpNotifyTag, - snmpNotifyType - ) + targetToNotifyMap[notificationTarget] = (snmpNotifyTag, snmpNotifyType) - cache['id'] = snmpNotifyEntry.branchVersionId + cache["id"] = snmpNotifyEntry.branchVersionId return targetToNotifyMap[notificationTarget] -def getTargetNames(snmpEngine, tag): +def getTargetNames(snmpEngine: SnmpEngine, tag): mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder - snmpTargetAddrEntry, = mibBuilder.importSymbols('SNMP-TARGET-MIB', - 'snmpTargetAddrEntry') + (snmpTargetAddrEntry,) = mibBuilder.importSymbols( # type: ignore + "SNMP-TARGET-MIB", "snmpTargetAddrEntry" + ) - cache = snmpEngine.getUserContext('getTargetNames') + cache = snmpEngine.getUserContext("getTargetNames") if cache is None: - cache = {'id': -1} + cache: Dict[str, Any] | None = {"id": -1} snmpEngine.setUserContext(getTargetNames=cache) - if cache['id'] == snmpTargetAddrEntry.branchVersionId: - tagToTargetsMap = cache['tagToTargetsMap'] + if cache["id"] == snmpTargetAddrEntry.branchVersionId: + tagToTargetsMap = cache["tagToTargetsMap"] else: - cache['tagToTargetsMap'] = {} - - tagToTargetsMap = cache['tagToTargetsMap'] - - (SnmpTagValue, snmpTargetAddrName, - snmpTargetAddrTagList) = mibBuilder.importSymbols( - 'SNMP-TARGET-MIB', 'SnmpTagValue', 'snmpTargetAddrName', - 'snmpTargetAddrTagList' + cache["tagToTargetsMap"] = {} + + tagToTargetsMap = cache["tagToTargetsMap"] + + ( + SnmpTagValue, + snmpTargetAddrName, + snmpTargetAddrTagList, + ) = mibBuilder.importSymbols( # type: ignore + "SNMP-TARGET-MIB", + "SnmpTagValue", + "snmpTargetAddrName", + "snmpTargetAddrTagList", ) mibNode = snmpTargetAddrTagList while True: @@ -242,9 +274,11 @@ def getTargetNames(snmpEngine, tag): except NoSuchInstanceError: break - idx = mibNode.name[len(snmpTargetAddrTagList.name):] + idx = mibNode.name[len(snmpTargetAddrTagList.name) :] - _snmpTargetAddrName = snmpTargetAddrName.getNode(snmpTargetAddrName.name + idx).syntax + _snmpTargetAddrName = snmpTargetAddrName.getNode( + snmpTargetAddrName.name + idx + ).syntax for _tag in mibNode.syntax.asOctets().split(): _tag = SnmpTagValue(_tag) @@ -252,11 +286,12 @@ def getTargetNames(snmpEngine, tag): tagToTargetsMap[_tag] = [] tagToTargetsMap[_tag].append(_snmpTargetAddrName) - cache['id'] = snmpTargetAddrEntry.branchVersionId + cache["id"] = snmpTargetAddrEntry.branchVersionId if tag not in tagToTargetsMap: - raise SmiError('Transport tag %s not configured at LCD' % tag) + raise SmiError("Transport tag %s not configured at LCD" % tag) return tagToTargetsMap[tag] + # convert cmdrsp/cmdgen into this api diff --git a/pysnmp/hlapi/asyncio/cmdgen.py b/pysnmp/hlapi/asyncio/cmdgen.py index b7cca28ce..8545fae10 100644 --- a/pysnmp/hlapi/asyncio/cmdgen.py +++ b/pysnmp/hlapi/asyncio/cmdgen.py @@ -32,6 +32,8 @@ # THE POSSIBILITY OF SUCH DAMAGE. # import sys +from pysnmp.entity.engine import SnmpEngine +from pysnmp.hlapi.transport import AbstractTransportTarget from pysnmp.smi.rfc1902 import * from pysnmp.hlapi.auth import * @@ -44,7 +46,7 @@ import asyncio -__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib'] +__all__ = ["getCmd", "nextCmd", "setCmd", "bulkCmd", "isEndOfMib"] vbProcessor = CommandGeneratorVarBinds() lcd = CommandGeneratorLcdConfigurator() @@ -52,9 +54,14 @@ isEndOfMib = lambda x: not cmdgen.getNextVarBinds(x)[1] - -async def getCmd(snmpEngine, authData, transportTarget, contextData, - *varBinds, **options): +async def getCmd( + snmpEngine: SnmpEngine, + authData: CommunityData | UsmUserData, + transportTarget: AbstractTransportTarget, + contextData: ContextData, + *varBinds, + **options +): r"""Creates a generator to perform SNMP GET query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -127,15 +134,20 @@ async def getCmd(snmpEngine, authData, transportTarget, contextData, """ - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBinds, cbCtx): + def __cbFun( + snmpEngine: SnmpEngine, + sendRequestHandle, + errorIndication, + errorStatus, + errorIndex: int, + varBinds, + cbCtx, + ): lookupMib, future = cbCtx if future.cancelled(): return try: - varBindsUnmade = vbProcessor.unmakeVarBinds(snmpEngine, varBinds, - lookupMib) + varBindsUnmade = vbProcessor.unmakeVarBinds(snmpEngine, varBinds, lookupMib) except Exception: ex = sys.exc_info()[1] future.set_exception(ex) @@ -145,21 +157,31 @@ def __cbFun(snmpEngine, sendRequestHandle, ) addrName, paramsName = lcd.configure( - snmpEngine, authData, transportTarget, contextData.contextName) + snmpEngine, authData, transportTarget, contextData.contextName + ) future = asyncio.get_running_loop().create_future() cmdgen.GetCommandGenerator().sendVarBinds( - snmpEngine, addrName, contextData.contextEngineId, + snmpEngine, + addrName, + contextData.contextEngineId, contextData.contextName, - vbProcessor.makeVarBinds(snmpEngine, varBinds), __cbFun, - (options.get('lookupMib', True), future) + vbProcessor.makeVarBinds(snmpEngine, varBinds), + __cbFun, + (options.get("lookupMib", True), future), ) return await future -async def setCmd(snmpEngine, authData, transportTarget, contextData, - *varBinds, **options): +async def setCmd( + snmpEngine: SnmpEngine, + authData: CommunityData | UsmUserData, + transportTarget: AbstractTransportTarget, + contextData: ContextData, + *varBinds, + **options +): r"""Creates a generator to perform SNMP SET query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -232,15 +254,20 @@ async def setCmd(snmpEngine, authData, transportTarget, contextData, """ - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBinds, cbCtx): + def __cbFun( + snmpEngine: SnmpEngine, + sendRequestHandle, + errorIndication, + errorStatus, + errorIndex: int, + varBinds, + cbCtx, + ): lookupMib, future = cbCtx if future.cancelled(): return try: - varBindsUnmade = vbProcessor.unmakeVarBinds(snmpEngine, varBinds, - lookupMib) + varBindsUnmade = vbProcessor.unmakeVarBinds(snmpEngine, varBinds, lookupMib) except Exception: ex = sys.exc_info()[1] future.set_exception(ex) @@ -250,21 +277,31 @@ def __cbFun(snmpEngine, sendRequestHandle, ) addrName, paramsName = lcd.configure( - snmpEngine, authData, transportTarget, contextData.contextName) + snmpEngine, authData, transportTarget, contextData.contextName + ) future = asyncio.get_running_loop().create_future() cmdgen.SetCommandGenerator().sendVarBinds( - snmpEngine, addrName, contextData.contextEngineId, + snmpEngine, + addrName, + contextData.contextEngineId, contextData.contextName, - vbProcessor.makeVarBinds(snmpEngine, varBinds), __cbFun, - (options.get('lookupMib', True), future) + vbProcessor.makeVarBinds(snmpEngine, varBinds), + __cbFun, + (options.get("lookupMib", True), future), ) return await future -async def nextCmd(snmpEngine, authData, transportTarget, contextData, - *varBinds, **options): +async def nextCmd( + snmpEngine: SnmpEngine, + authData: CommunityData | UsmUserData, + transportTarget: AbstractTransportTarget, + contextData: ContextData, + *varBinds, + **options +): r"""Creates a generator to perform SNMP GETNEXT query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -341,17 +378,23 @@ async def nextCmd(snmpEngine, authData, transportTarget, contextData, """ - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbCtx): + def __cbFun( + snmpEngine: SnmpEngine, + sendRequestHandle, + errorIndication, + errorStatus, + errorIndex: int, + varBindTable, + cbCtx, + ): lookupMib, future = cbCtx if future.cancelled(): return try: - varBindsUnmade = [vbProcessor.unmakeVarBinds(snmpEngine, - varBindTableRow, - lookupMib) - for varBindTableRow in varBindTable] + varBindsUnmade = [ + vbProcessor.unmakeVarBinds(snmpEngine, varBindTableRow, lookupMib) + for varBindTableRow in varBindTable + ] except Exception: ex = sys.exc_info()[1] future.set_exception(ex) @@ -361,21 +404,33 @@ def __cbFun(snmpEngine, sendRequestHandle, ) addrName, paramsName = lcd.configure( - snmpEngine, authData, transportTarget, contextData.contextName) + snmpEngine, authData, transportTarget, contextData.contextName + ) future = asyncio.get_running_loop().create_future() cmdgen.NextCommandGenerator().sendVarBinds( - snmpEngine, addrName, contextData.contextEngineId, + snmpEngine, + addrName, + contextData.contextEngineId, contextData.contextName, - vbProcessor.makeVarBinds(snmpEngine, varBinds), __cbFun, - (options.get('lookupMib', True), future) + vbProcessor.makeVarBinds(snmpEngine, varBinds), + __cbFun, + (options.get("lookupMib", True), future), ) return await future -async def bulkCmd(snmpEngine, authData, transportTarget, contextData, - nonRepeaters, maxRepetitions, *varBinds, **options): +async def bulkCmd( + snmpEngine: SnmpEngine, + authData: CommunityData | UsmUserData, + transportTarget: AbstractTransportTarget, + contextData: ContextData, + nonRepeaters: int, + maxRepetitions: int, + *varBinds, + **options +): r"""Creates a generator to perform SNMP GETBULK query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -481,17 +536,23 @@ async def bulkCmd(snmpEngine, authData, transportTarget, contextData, """ - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbCtx): + def __cbFun( + snmpEngine: SnmpEngine, + sendRequestHandle, + errorIndication, + errorStatus, + errorIndex: int, + varBindTable, + cbCtx, + ): lookupMib, future = cbCtx if future.cancelled(): return try: - varBindsUnmade = [vbProcessor.unmakeVarBinds(snmpEngine, - varBindTableRow, - lookupMib) - for varBindTableRow in varBindTable] + varBindsUnmade = [ + vbProcessor.unmakeVarBinds(snmpEngine, varBindTableRow, lookupMib) + for varBindTableRow in varBindTable + ] except Exception: ex = sys.exc_info()[1] future.set_exception(ex) @@ -501,14 +562,20 @@ def __cbFun(snmpEngine, sendRequestHandle, ) addrName, paramsName = lcd.configure( - snmpEngine, authData, transportTarget, contextData.contextName) + snmpEngine, authData, transportTarget, contextData.contextName + ) future = asyncio.get_running_loop().create_future() cmdgen.BulkCommandGenerator().sendVarBinds( - snmpEngine, addrName, contextData.contextEngineId, - contextData.contextName, nonRepeaters, maxRepetitions, - vbProcessor.makeVarBinds(snmpEngine, varBinds), __cbFun, - (options.get('lookupMib', True), future) + snmpEngine, + addrName, + contextData.contextEngineId, + contextData.contextName, + nonRepeaters, + maxRepetitions, + vbProcessor.makeVarBinds(snmpEngine, varBinds), + __cbFun, + (options.get("lookupMib", True), future), ) return await future diff --git a/pysnmp/hlapi/asyncio/slim.py b/pysnmp/hlapi/asyncio/slim.py index dc13cc6f4..17ca5578c 100644 --- a/pysnmp/hlapi/asyncio/slim.py +++ b/pysnmp/hlapi/asyncio/slim.py @@ -34,17 +34,26 @@ class Slim: """ - def __init__(self, version=2): + def __init__(self, version: int = 2): self.snmpEngine = SnmpEngine() if version not in (1, 2): - raise PySnmpError("Not supported version {}".format(version)) + raise PySnmpError(f"Not supported version {version}") self.version = version def close(self): """Closes the wrapper to release its resources.""" - self.snmpEngine.transportDispatcher.closeDispatcher() + if self.snmpEngine.transportDispatcher is not None: + self.snmpEngine.transportDispatcher.closeDispatcher() - async def get(self, communityName, address, port, *varBinds, timeout=1, retries=5): + async def get( + self, + communityName: str, + address: str, + port: int, + *varBinds, + timeout: int = 1, + retries: int = 5, + ): """ Creates a generator to perform SNMP GET query. @@ -122,7 +131,15 @@ async def get(self, communityName, address, port, *varBinds, timeout=1, retries= *varBinds, ) - async def next(self, communityName, address, port, *varBinds, timeout=1, retries=5): + async def next( + self, + communityName: str, + address: str, + port: int, + *varBinds, + timeout: int = 1, + retries: int = 5, + ): """ Creates a generator to perform SNMP GETNEXT query. @@ -205,14 +222,14 @@ async def next(self, communityName, address, port, *varBinds, timeout=1, retries async def bulk( self, - communityName, - address, - port, - nonRepeaters, - maxRepetitions, + communityName: str, + address: str, + port: int, + nonRepeaters: int, + maxRepetitions: int, *varBinds, - timeout=1, - retries=5 + timeout: int = 1, + retries: int = 5, ): r"""Creates a generator to perform SNMP GETBULK query. @@ -327,7 +344,15 @@ async def bulk( *varBinds, ) - async def set(self, communityName, address, port, *varBinds, timeout=1, retries=5): + async def set( + self, + communityName: str, + address: str, + port: int, + *varBinds, + timeout: int = 1, + retries: int = 5, + ): """ Creates a generator to perform SNMP SET query. diff --git a/pysnmp/hlapi/asyncio/transport.py b/pysnmp/hlapi/asyncio/transport.py index 8c8202329..116fdd4e8 100644 --- a/pysnmp/hlapi/asyncio/transport.py +++ b/pysnmp/hlapi/asyncio/transport.py @@ -51,9 +51,9 @@ class instance. """ transportDomain: Tuple[int, ...] = udp.domainName - protoTransport: AbstractTransport = udp.UdpAsyncioTransport + protoTransport = udp.UdpAsyncioTransport - def _resolveAddr(self, transportAddr: AbstractTransportAddress): + def _resolveAddr(self, transportAddr: tuple) -> tuple[str, int]: try: return socket.getaddrinfo( transportAddr[0], @@ -118,9 +118,9 @@ class instance. """ transportDomain: Tuple[int, ...] = udp6.domainName - protoTransport: AbstractTransport = udp6.Udp6AsyncioTransport + protoTransport = udp6.Udp6AsyncioTransport - def _resolveAddr(self, transportAddr: AbstractTransportAddress): + def _resolveAddr(self, transportAddr: tuple): try: return socket.getaddrinfo( transportAddr[0], diff --git a/pysnmp/hlapi/transport.py b/pysnmp/hlapi/transport.py index 5a730175d..b9fd34db1 100644 --- a/pysnmp/hlapi/transport.py +++ b/pysnmp/hlapi/transport.py @@ -16,15 +16,15 @@ class AbstractTransportTarget: retries: int timeout: float - transport: AbstractTransport - transportAddr: AbstractTransportAddress + transport: AbstractTransport | None + transportAddr: tuple[str, int] transportDomain = None protoTransport = AbstractTransport def __init__( self, - transportAddr: AbstractTransportAddress, + transportAddr: tuple, timeout: float = 1, retries: int = 5, tagList=null, @@ -79,5 +79,5 @@ def verifyDispatcherCompatibility(self, snmpEngine: SnmpEngine): ) ) - def _resolveAddr(self, transportAddr: AbstractTransportAddress): + def _resolveAddr(self, transportAddr: tuple) -> tuple[str, int]: raise NotImplementedError() diff --git a/pysnmp/proto/rfc3412.py b/pysnmp/proto/rfc3412.py index 5bfe0cca0..fab6b6bcb 100644 --- a/pysnmp/proto/rfc3412.py +++ b/pysnmp/proto/rfc3412.py @@ -16,7 +16,7 @@ class MsgAndPduDispatcher: """SNMP engine PDU & message dispatcher. Exchanges SNMP PDU's with - applications and serialized messages with transport level. + applications and serialized messages with transport level. """ def __init__(self, mibInstrumController=None): @@ -28,8 +28,11 @@ def __init__(self, mibInstrumController=None): self.mibInstrumController = mibInstrumController self.mibInstrumController.mibBuilder.loadModules( - 'SNMPv2-MIB', 'SNMP-MPD-MIB', 'SNMP-COMMUNITY-MIB', - 'SNMP-TARGET-MIB', 'SNMP-USER-BASED-SM-MIB' + "SNMPv2-MIB", + "SNMP-MPD-MIB", + "SNMP-COMMUNITY-MIB", + "SNMP-TARGET-MIB", + "SNMP-USER-BASED-SM-MIB", ) # Requests cache @@ -39,7 +42,7 @@ def __init__(self, mibInstrumController=None): self.__appsRegistration = {} # Source of sendPduHandle and cache of requesting apps - self.__sendPduHandle = nextid.Integer(0xffffff) + self.__sendPduHandle = nextid.Integer(0xFFFFFF) # To pass transport info to app (legacy) self.__transportInfo = {} @@ -49,9 +52,7 @@ def getTransportInfo(self, stateReference): if stateReference in self.__transportInfo: return self.__transportInfo[stateReference] else: - raise error.ProtocolError( - 'No data for stateReference %s' % stateReference - ) + raise error.ProtocolError("No data for stateReference %s" % stateReference) # Application registration with dispatcher @@ -65,14 +66,15 @@ def registerContextEngineId(self, contextEngineId, pduTypes, processPdu): k = (contextEngineId, pduType) if k in self.__appsRegistration: raise error.ProtocolError( - f'Duplicate registration {contextEngineId!r}/{pduType}' + f"Duplicate registration {contextEngineId!r}/{pduType}" ) # 4.3.4 self.__appsRegistration[k] = processPdu debug.logger & debug.flagDsp and debug.logger( - f'registerContextEngineId: contextEngineId {contextEngineId!r} pduTypes {pduTypes}') + f"registerContextEngineId: contextEngineId {contextEngineId!r} pduTypes {pduTypes}" + ) # 4.4.1 def unregisterContextEngineId(self, contextEngineId, pduTypes): @@ -80,8 +82,9 @@ def unregisterContextEngineId(self, contextEngineId, pduTypes): # 4.3.4 if contextEngineId is None: # Default to local snmpEngineId - contextEngineId, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', - 'snmpEngineID') + (contextEngineId,) = self.mibInstrumController.mibBuilder.importSymbols( + "__SNMP-FRAMEWORK-MIB", "snmpEngineID" # type: ignore + ) for pduType in pduTypes: k = (contextEngineId, pduType) @@ -89,7 +92,8 @@ def unregisterContextEngineId(self, contextEngineId, pduTypes): del self.__appsRegistration[k] debug.logger & debug.flagDsp and debug.logger( - f'unregisterContextEngineId: contextEngineId {contextEngineId!r} pduTypes {pduTypes}') + f"unregisterContextEngineId: contextEngineId {contextEngineId!r} pduTypes {pduTypes}" + ) def getRegisteredApp(self, contextEngineId, pduType): k = (contextEngineId, pduType) @@ -103,11 +107,24 @@ def getRegisteredApp(self, contextEngineId, pduType): # 4.1.1 - def sendPdu(self, snmpEngine, transportDomain, transportAddress, - messageProcessingModel, securityModel, securityName, - securityLevel, contextEngineId, contextName, - pduVersion, PDU, expectResponse, timeout=0, - cbFun=None, cbCtx=None): + def sendPdu( + self, + snmpEngine, + transportDomain, + transportAddress, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + expectResponse, + timeout: float = 0, + cbFun=None, + cbCtx=None, + ): """PDU dispatcher -- prepare and serialize a request or notification""" # 4.1.1.2 k = int(messageProcessingModel) @@ -119,7 +136,8 @@ def sendPdu(self, snmpEngine, transportDomain, transportAddress, ) debug.logger & debug.flagDsp and debug.logger( - f'sendPdu: securityName {securityName}, PDU\n{PDU.prettyPrint()}') + f"sendPdu: securityName {securityName}, PDU\n{PDU.prettyPrint()}" + ) # 4.1.1.3 sendPduHandle = self.__sendPduHandle() @@ -130,34 +148,53 @@ def sendPdu(self, snmpEngine, transportDomain, transportAddress, sendPduHandle=sendPduHandle, timeout=timeout + snmpEngine.transportDispatcher.getTimerTicks(), cbFun=cbFun, - cbCtx=cbCtx + cbCtx=cbCtx, ) - debug.logger & debug.flagDsp and debug.logger('sendPdu: current time %d ticks, one tick is %s seconds' % ( - snmpEngine.transportDispatcher.getTimerTicks(), snmpEngine.transportDispatcher.getTimerResolution())) + debug.logger & debug.flagDsp and debug.logger( + "sendPdu: current time %d ticks, one tick is %s seconds" + % ( + snmpEngine.transportDispatcher.getTimerTicks(), + snmpEngine.transportDispatcher.getTimerResolution(), + ) + ) debug.logger & debug.flagDsp and debug.logger( - f'sendPdu: new sendPduHandle {sendPduHandle}, timeout {timeout} ticks, cbFun {cbFun}') + f"sendPdu: new sendPduHandle {sendPduHandle}, timeout {timeout} ticks, cbFun {cbFun}" + ) origTransportDomain = transportDomain origTransportAddress = transportAddress # 4.1.1.4 & 4.1.1.5 try: - (transportDomain, - transportAddress, - outgoingMessage) = mpHandler.prepareOutgoingMessage( - snmpEngine, origTransportDomain, origTransportAddress, - messageProcessingModel, securityModel, securityName, - securityLevel, contextEngineId, contextName, - pduVersion, PDU, expectResponse, sendPduHandle + ( + transportDomain, + transportAddress, + outgoingMessage, + ) = mpHandler.prepareOutgoingMessage( + snmpEngine, + origTransportDomain, + origTransportAddress, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + expectResponse, + sendPduHandle, ) - debug.logger & debug.flagDsp and debug.logger('sendPdu: MP succeeded') + debug.logger & debug.flagDsp and debug.logger("sendPdu: MP succeeded") except PySnmpError: if expectResponse: self.__cache.pop(sendPduHandle) - self.releaseStateInformation(snmpEngine, sendPduHandle, messageProcessingModel) + self.releaseStateInformation( + snmpEngine, sendPduHandle, messageProcessingModel + ) raise # 4.1.1.6 @@ -165,20 +202,23 @@ def sendPdu(self, snmpEngine, transportDomain, transportAddress, if expectResponse: self.__cache.pop(sendPduHandle) - raise error.PySnmpError('Transport dispatcher not set') + raise error.PySnmpError("Transport dispatcher not set") snmpEngine.observer.storeExecutionContext( - snmpEngine, 'rfc3412.sendPdu', - dict(transportDomain=transportDomain, - transportAddress=transportAddress, - outgoingMessage=outgoingMessage, - messageProcessingModel=messageProcessingModel, - securityModel=securityModel, - securityName=securityName, - securityLevel=securityLevel, - contextEngineId=contextEngineId, - contextName=contextName, - pdu=PDU) + snmpEngine, + "rfc3412.sendPdu", + dict( + transportDomain=transportDomain, + transportAddress=transportAddress, + outgoingMessage=outgoingMessage, + messageProcessingModel=messageProcessingModel, + securityModel=securityModel, + securityName=securityName, + securityLevel=securityLevel, + contextEngineId=contextEngineId, + contextName=contextName, + pdu=PDU, + ), ) try: @@ -190,29 +230,41 @@ def sendPdu(self, snmpEngine, transportDomain, transportAddress, self.__cache.pop(sendPduHandle) raise - snmpEngine.observer.clearExecutionContext(snmpEngine, 'rfc3412.sendPdu') + snmpEngine.observer.clearExecutionContext(snmpEngine, "rfc3412.sendPdu") # Update cache with orignal req params (used for retrying) if expectResponse: - self.__cache.update(sendPduHandle, - transportDomain=origTransportDomain, - transportAddress=origTransportAddress, - securityModel=securityModel, - securityName=securityName, - securityLevel=securityLevel, - contextEngineId=contextEngineId, - contextName=contextName, - pduVersion=pduVersion, - PDU=PDU) + self.__cache.update( + sendPduHandle, + transportDomain=origTransportDomain, + transportAddress=origTransportAddress, + securityModel=securityModel, + securityName=securityName, + securityLevel=securityLevel, + contextEngineId=contextEngineId, + contextName=contextName, + pduVersion=pduVersion, + PDU=PDU, + ) return sendPduHandle # 4.1.2.1 - def returnResponsePdu(self, snmpEngine, messageProcessingModel, - securityModel, securityName, securityLevel, - contextEngineId, contextName, pduVersion, - PDU, maxSizeResponseScopedPDU, stateReference, - statusInformation): + def returnResponsePdu( + self, + snmpEngine, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + maxSizeResponseScopedPDU, + stateReference, + statusInformation, + ): # Extract input values and initialize defaults k = int(messageProcessingModel) if k in snmpEngine.messageProcessingSubsystems: @@ -223,65 +275,84 @@ def returnResponsePdu(self, snmpEngine, messageProcessingModel, ) debug.logger & debug.flagDsp and debug.logger( - 'returnResponsePdu: PDU {}'.format(PDU and PDU.prettyPrint() or "")) + "returnResponsePdu: PDU {}".format(PDU and PDU.prettyPrint() or "") + ) # 4.1.2.2 try: - (transportDomain, - transportAddress, - outgoingMessage) = mpHandler.prepareResponseMessage( - snmpEngine, messageProcessingModel, securityModel, - securityName, securityLevel, contextEngineId, contextName, - pduVersion, PDU, maxSizeResponseScopedPDU, stateReference, - statusInformation + ( + transportDomain, + transportAddress, + outgoingMessage, + ) = mpHandler.prepareResponseMessage( + snmpEngine, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + maxSizeResponseScopedPDU, + stateReference, + statusInformation, ) - debug.logger & debug.flagDsp and debug.logger('returnResponsePdu: MP suceeded') + debug.logger & debug.flagDsp and debug.logger( + "returnResponsePdu: MP suceeded" + ) except error.StatusInformation: # 4.1.2.3 raise # Handle oversized messages XXX transport constrains? - snmpEngineMaxMessageSize, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', - 'snmpEngineMaxMessageSize') - if (snmpEngineMaxMessageSize.syntax and - len(outgoingMessage) > snmpEngineMaxMessageSize.syntax): - snmpSilentDrops, = self.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpSilentDrops') + ( + snmpEngineMaxMessageSize, + ) = self.mibInstrumController.mibBuilder.importSymbols( + "__SNMP-FRAMEWORK-MIB", "snmpEngineMaxMessageSize" # type: ignore + ) + if ( + snmpEngineMaxMessageSize.syntax + and len(outgoingMessage) > snmpEngineMaxMessageSize.syntax + ): + (snmpSilentDrops,) = self.mibInstrumController.mibBuilder.importSymbols("__SNMPv2-MIB", "snmpSilentDrops") # type: ignore snmpSilentDrops.syntax += 1 raise error.StatusInformation(errorIndication=errind.tooBig) snmpEngine.observer.storeExecutionContext( snmpEngine, - 'rfc3412.returnResponsePdu', - dict(transportDomain=transportDomain, - transportAddress=transportAddress, - outgoingMessage=outgoingMessage, - messageProcessingModel=messageProcessingModel, - securityModel=securityModel, - securityName=securityName, - securityLevel=securityLevel, - contextEngineId=contextEngineId, - contextName=contextName, - pdu=PDU) + "rfc3412.returnResponsePdu", + dict( + transportDomain=transportDomain, + transportAddress=transportAddress, + outgoingMessage=outgoingMessage, + messageProcessingModel=messageProcessingModel, + securityModel=securityModel, + securityName=securityName, + securityLevel=securityLevel, + contextEngineId=contextEngineId, + contextName=contextName, + pdu=PDU, + ), ) # 4.1.2.4 - snmpEngine.transportDispatcher.sendMessage(outgoingMessage, - transportDomain, - transportAddress) + snmpEngine.transportDispatcher.sendMessage( + outgoingMessage, transportDomain, transportAddress + ) snmpEngine.observer.clearExecutionContext( - snmpEngine, 'rfc3412.returnResponsePdu' + snmpEngine, "rfc3412.returnResponsePdu" ) # 4.2.1 - def receiveMessage(self, snmpEngine, transportDomain, - transportAddress, wholeMsg): + def receiveMessage(self, snmpEngine, transportDomain, transportAddress, wholeMsg): """Message dispatcher -- de-serialize message into PDU""" # 4.2.1.1 - snmpInPkts, = self.mibInstrumController.mibBuilder.importSymbols( - '__SNMPv2-MIB', 'snmpInPkts' + (snmpInPkts,) = self.mibInstrumController.mibBuilder.importSymbols( # type: ignore + "__SNMPv2-MIB", "snmpInPkts" ) snmpInPkts.syntax += 1 @@ -291,20 +362,25 @@ def receiveMessage(self, snmpEngine, transportDomain, msgVersion = verdec.decodeMessageVersion(wholeMsg) except error.ProtocolError: - snmpInASNParseErrs, = self.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', - 'snmpInASNParseErrs') + (snmpInASNParseErrs,) = self.mibInstrumController.mibBuilder.importSymbols( + "__SNMPv2-MIB", "snmpInASNParseErrs" # type: ignore + ) snmpInASNParseErrs.syntax += 1 return null # n.b the whole buffer gets dropped - debug.logger & debug.flagDsp and debug.logger('receiveMessage: msgVersion %s, msg decoded' % msgVersion) + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: msgVersion %s, msg decoded" % msgVersion + ) messageProcessingModel = msgVersion try: - mpHandler = snmpEngine.messageProcessingSubsystems[int(messageProcessingModel)] + mpHandler = snmpEngine.messageProcessingSubsystems[ + int(messageProcessingModel) + ] except KeyError: - snmpInBadVersions, = self.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInBadVersions') + (snmpInBadVersions,) = self.mibInstrumController.mibBuilder.importSymbols("__SNMPv2-MIB", "snmpInBadVersions") # type: ignore snmpInBadVersions.syntax += 1 return restOfWholeMsg @@ -312,135 +388,181 @@ def receiveMessage(self, snmpEngine, transportDomain, # 4.2.1.4 try: - (messageProcessingModel, - securityModel, - securityName, - securityLevel, - contextEngineId, - contextName, - pduVersion, - PDU, - pduType, - sendPduHandle, - maxSizeResponseScopedPDU, - statusInformation, - stateReference) = mpHandler.prepareDataElements( + ( + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + pduType, + sendPduHandle, + maxSizeResponseScopedPDU, + statusInformation, + stateReference, + ) = mpHandler.prepareDataElements( snmpEngine, transportDomain, transportAddress, wholeMsg ) - debug.logger & debug.flagDsp and debug.logger('receiveMessage: MP succeded') + debug.logger & debug.flagDsp and debug.logger("receiveMessage: MP succeded") except error.StatusInformation: statusInformation = sys.exc_info()[1] - if 'sendPduHandle' in statusInformation: + if "sendPduHandle" in statusInformation: # type: ignore # Dropped REPORT -- re-run pending reqs queue as some # of them may be waiting for this REPORT debug.logger & debug.flagDsp and debug.logger( - 'receiveMessage: MP failed, statusInformation %s, forcing a retry' % statusInformation) + "receiveMessage: MP failed, statusInformation %s, forcing a retry" + % statusInformation + ) self.__expireRequest( - statusInformation['sendPduHandle'], - self.__cache.pop(statusInformation['sendPduHandle']), + statusInformation["sendPduHandle"], # type: ignore + self.__cache.pop(statusInformation["sendPduHandle"]), # type: ignore snmpEngine, - statusInformation + statusInformation, ) return restOfWholeMsg except PyAsn1Error: - debug.logger & debug.flagMP and debug.logger(f'receiveMessage: {sys.exc_info()[1]}') - snmpInASNParseErrs, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInASNParseErrs') + debug.logger & debug.flagMP and debug.logger( + f"receiveMessage: {sys.exc_info()[1]}" + ) + ( + snmpInASNParseErrs, + ) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + "__SNMPv2-MIB", "snmpInASNParseErrs" + ) snmpInASNParseErrs.syntax += 1 return restOfWholeMsg - debug.logger & debug.flagDsp and debug.logger('receiveMessage: PDU %s' % PDU.prettyPrint()) + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: PDU %s" % PDU.prettyPrint() + ) # 4.2.2 if sendPduHandle is None: # 4.2.2.1 (request or notification) - debug.logger & debug.flagDsp and debug.logger('receiveMessage: pduType %s' % pduType) + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: pduType %s" % pduType + ) # 4.2.2.1.1 processPdu = self.getRegisteredApp(contextEngineId, pduType) # 4.2.2.1.2 if processPdu is None: # 4.2.2.1.2.a - snmpUnknownPDUHandlers, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-MPD-MIB', - 'snmpUnknownPDUHandlers') + ( + snmpUnknownPDUHandlers, + ) = self.mibInstrumController.mibBuilder.importSymbols( + "__SNMP-MPD-MIB", "snmpUnknownPDUHandlers" # type: ignore + ) snmpUnknownPDUHandlers.syntax += 1 # 4.2.2.1.2.b statusInformation = { - 'errorIndication': errind.unknownPDUHandler, - 'oid': snmpUnknownPDUHandlers.name, - 'val': snmpUnknownPDUHandlers.syntax + "errorIndication": errind.unknownPDUHandler, + "oid": snmpUnknownPDUHandlers.name, + "val": snmpUnknownPDUHandlers.syntax, } - debug.logger & debug.flagDsp and debug.logger('receiveMessage: unhandled PDU type') + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: unhandled PDU type" + ) # 4.2.2.1.2.c try: - (destTransportDomain, - destTransportAddress, - outgoingMessage) = mpHandler.prepareResponseMessage( - snmpEngine, messageProcessingModel, - securityModel, securityName, securityLevel, - contextEngineId, contextName, pduVersion, - PDU, maxSizeResponseScopedPDU, stateReference, - statusInformation + ( + destTransportDomain, + destTransportAddress, + outgoingMessage, + ) = mpHandler.prepareResponseMessage( + snmpEngine, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + maxSizeResponseScopedPDU, + stateReference, + statusInformation, ) snmpEngine.transportDispatcher.sendMessage( - outgoingMessage, destTransportDomain, - destTransportAddress + outgoingMessage, destTransportDomain, destTransportAddress ) except PySnmpError: debug.logger & debug.flagDsp and debug.logger( - 'receiveMessage: report failed, statusInformation %s' % sys.exc_info()[1]) + "receiveMessage: report failed, statusInformation %s" + % sys.exc_info()[1] + ) else: - debug.logger & debug.flagDsp and debug.logger('receiveMessage: reporting succeeded') + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: reporting succeeded" + ) # 4.2.2.1.2.d return restOfWholeMsg else: snmpEngine.observer.storeExecutionContext( - snmpEngine, 'rfc3412.receiveMessage:request', - dict(transportDomain=transportDomain, - transportAddress=transportAddress, - wholeMsg=wholeMsg, - messageProcessingModel=messageProcessingModel, - securityModel=securityModel, - securityName=securityName, - securityLevel=securityLevel, - contextEngineId=contextEngineId, - contextName=contextName, - pdu=PDU) + snmpEngine, + "rfc3412.receiveMessage:request", + dict( + transportDomain=transportDomain, + transportAddress=transportAddress, + wholeMsg=wholeMsg, + messageProcessingModel=messageProcessingModel, + securityModel=securityModel, + securityName=securityName, + securityLevel=securityLevel, + contextEngineId=contextEngineId, + contextName=contextName, + pdu=PDU, + ), ) # pass transport info to app (legacy) if stateReference is not None: self.__transportInfo[stateReference] = ( - transportDomain, transportAddress + transportDomain, + transportAddress, ) # 4.2.2.1.3 - processPdu(snmpEngine, messageProcessingModel, - securityModel, securityName, securityLevel, - contextEngineId, contextName, pduVersion, - PDU, maxSizeResponseScopedPDU, stateReference) + processPdu( + snmpEngine, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + maxSizeResponseScopedPDU, + stateReference, + ) snmpEngine.observer.clearExecutionContext( - snmpEngine, 'rfc3412.receiveMessage:request' + snmpEngine, "rfc3412.receiveMessage:request" ) # legacy if stateReference is not None: del self.__transportInfo[stateReference] - debug.logger & debug.flagDsp and debug.logger('receiveMessage: processPdu succeeded') + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: processPdu succeeded" + ) return restOfWholeMsg else: # 4.2.2.2 (response) @@ -450,51 +572,69 @@ def receiveMessage(self, snmpEngine, transportDomain, # 4.2.2.2.2 if cachedParams is None: - snmpUnknownPDUHandlers, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-MPD-MIB', - 'snmpUnknownPDUHandlers') + ( + snmpUnknownPDUHandlers, + ) = self.mibInstrumController.mibBuilder.importSymbols( + "__SNMP-MPD-MIB", "snmpUnknownPDUHandlers" # type: ignore + ) snmpUnknownPDUHandlers.syntax += 1 return restOfWholeMsg debug.logger & debug.flagDsp and debug.logger( - 'receiveMessage: cache read by sendPduHandle %s' % sendPduHandle) + "receiveMessage: cache read by sendPduHandle %s" % sendPduHandle + ) # 4.2.2.2.3 # no-op ? XXX snmpEngine.observer.storeExecutionContext( - snmpEngine, 'rfc3412.receiveMessage:response', - dict(transportDomain=transportDomain, - transportAddress=transportAddress, - wholeMsg=wholeMsg, - messageProcessingModel=messageProcessingModel, - securityModel=securityModel, - securityName=securityName, - securityLevel=securityLevel, - contextEngineId=contextEngineId, - contextName=contextName, - pdu=PDU) + snmpEngine, + "rfc3412.receiveMessage:response", + dict( + transportDomain=transportDomain, + transportAddress=transportAddress, + wholeMsg=wholeMsg, + messageProcessingModel=messageProcessingModel, + securityModel=securityModel, + securityName=securityName, + securityLevel=securityLevel, + contextEngineId=contextEngineId, + contextName=contextName, + pdu=PDU, + ), ) # 4.2.2.2.4 - processResponsePdu = cachedParams['cbFun'] - - processResponsePdu(snmpEngine, messageProcessingModel, - securityModel, securityName, securityLevel, - contextEngineId, contextName, pduVersion, - PDU, statusInformation, - cachedParams['sendPduHandle'], - cachedParams['cbCtx']) + processResponsePdu = cachedParams["cbFun"] + + processResponsePdu( + snmpEngine, + messageProcessingModel, + securityModel, + securityName, + securityLevel, + contextEngineId, + contextName, + pduVersion, + PDU, + statusInformation, + cachedParams["sendPduHandle"], + cachedParams["cbCtx"], + ) snmpEngine.observer.clearExecutionContext( - snmpEngine, 'rfc3412.receiveMessage:response' + snmpEngine, "rfc3412.receiveMessage:response" ) - debug.logger & debug.flagDsp and debug.logger('receiveMessage: processResponsePdu succeeded') + debug.logger & debug.flagDsp and debug.logger( + "receiveMessage: processResponsePdu succeeded" + ) return restOfWholeMsg - def releaseStateInformation(self, snmpEngine, sendPduHandle, - messageProcessingModel): + def releaseStateInformation( + self, snmpEngine, sendPduHandle, messageProcessingModel + ): k = int(messageProcessingModel) if k in snmpEngine.messageProcessingSubsystems: mpHandler = snmpEngine.messageProcessingSubsystems[k] @@ -505,17 +645,20 @@ def releaseStateInformation(self, snmpEngine, sendPduHandle, # Cache expiration stuff # noinspection PyUnusedLocal - def __expireRequest(self, cacheKey, cachedParams, snmpEngine, - statusInformation=None): + def __expireRequest( + self, cacheKey, cachedParams, snmpEngine, statusInformation=None + ): timeNow = snmpEngine.transportDispatcher.getTimerTicks() - timeoutAt = cachedParams['timeout'] + timeoutAt = cachedParams["timeout"] if statusInformation is None and timeNow < timeoutAt: return - processResponsePdu = cachedParams['cbFun'] + processResponsePdu = cachedParams["cbFun"] - debug.logger & debug.flagDsp and debug.logger('__expireRequest: req cachedParams %s' % cachedParams) + debug.logger & debug.flagDsp and debug.logger( + "__expireRequest: req cachedParams %s" % cachedParams + ) # Fail timed-out requests if not statusInformation: @@ -523,14 +666,26 @@ def __expireRequest(self, cacheKey, cachedParams, snmpEngine, errorIndication=errind.requestTimedOut ) - self.releaseStateInformation(snmpEngine, - cachedParams['sendPduHandle'], - cachedParams['messageProcessingModel']) + self.releaseStateInformation( + snmpEngine, + cachedParams["sendPduHandle"], + cachedParams["messageProcessingModel"], + ) - processResponsePdu(snmpEngine, None, None, None, None, None, - None, None, None, statusInformation, - cachedParams['sendPduHandle'], - cachedParams['cbCtx']) + processResponsePdu( + snmpEngine, + None, + None, + None, + None, + None, + None, + None, + None, + statusInformation, + cachedParams["sendPduHandle"], + cachedParams["cbCtx"], + ) return True # noinspection PyUnusedLocal