From ae40490957119e57c175bd71168784d374602d84 Mon Sep 17 00:00:00 2001 From: Lex Li Date: Sun, 15 Sep 2024 03:36:34 -0400 Subject: [PATCH] Refined notification related code. --- pysnmp/carrier/base.py | 4 +-- pysnmp/entity/engine.py | 2 +- pysnmp/entity/rfc3413/ntforg.py | 36 ++++++++++++++------------- pysnmp/hlapi/v1arch/asyncio/ntforg.py | 12 ++++----- pysnmp/hlapi/v3arch/asyncio/ntforg.py | 18 ++++++++------ pysnmp/hlapi/varbinds.py | 28 +++++++++++++++------ pysnmp/smi/rfc1902.py | 12 +++++++-- 7 files changed, 69 insertions(+), 43 deletions(-) diff --git a/pysnmp/carrier/base.py b/pysnmp/carrier/base.py index 8f21e15c..5b1ec9ee 100644 --- a/pysnmp/carrier/base.py +++ b/pysnmp/carrier/base.py @@ -100,7 +100,7 @@ def sendMessage(self, outgoingMessage, transportAddress: AbstractTransportAddres class AbstractTransportDispatcher: __transports: "dict[tuple[int, ...], AbstractTransport]" __transportDomainMap: "dict[AbstractTransport, tuple[int, ...]]" - __recvCallables: "dict['tuple[int, ...] | None', Callable]" + __recvCallables: "dict['tuple[int, ...] | str | None', Callable]" __timerCallables: "list[TimerCallable]" __ticks: int __timerResolution: float @@ -161,7 +161,7 @@ def unregisterRoutingCbFun(self): if self.__routingCbFun: self.__routingCbFun = None - def registerRecvCbFun(self, recvCb, recvId: "tuple[int, ...] | None" = None): + def registerRecvCbFun(self, recvCb, recvId: "tuple[int, ...] | str | None" = None): if recvId in self.__recvCallables: raise error.CarrierError( "Receive callback {!r} already registered".format( diff --git a/pysnmp/entity/engine.py b/pysnmp/entity/engine.py index 7c59d77e..584be1f1 100644 --- a/pysnmp/entity/engine.py +++ b/pysnmp/entity/engine.py @@ -232,7 +232,7 @@ def __receiveTimerTickCbFun(self, timeNow: float): def registerTransportDispatcher( self, transportDispatcher: AbstractTransportDispatcher, - recvId: "tuple[int, ...] | None" = None, + recvId: "tuple[int, ...] | str | None" = None, ): if ( self.transportDispatcher is not None diff --git a/pysnmp/entity/rfc3413/ntforg.py b/pysnmp/entity/rfc3413/ntforg.py index ae80e342..895a2357 100644 --- a/pysnmp/entity/rfc3413/ntforg.py +++ b/pysnmp/entity/rfc3413/ntforg.py @@ -8,10 +8,12 @@ from pysnmp import debug, nextid +from pysnmp.entity.engine import SnmpEngine from pysnmp.entity.rfc3413 import config from pysnmp.proto import errind, error, rfc3411 from pysnmp.proto.api import v2c from pysnmp.proto.proxy import rfc2576 +from pysnmp.smi.rfc1902 import ObjectType getNextHandle = nextid.Integer(0x7FFFFFFF) # noqa: N816 @@ -325,11 +327,11 @@ def processResponseVarBinds( # def sendVarBinds( self, - snmpEngine, + snmpEngine: SnmpEngine, notificationTarget, contextEngineId, contextName, - varBinds=(), + varBinds: "tuple[ObjectType, ...]" = (), cbFun=None, cbCtx=None, ): @@ -362,7 +364,7 @@ def sendVarBinds( "notifyType %s" % (notificationHandle, notifyTag, notifyType) ) - varBinds = [(v2c.ObjectIdentifier(x), y) for x, y in varBinds] + inputVarBinds = [(v2c.ObjectIdentifier(x), y) for (x, y) in varBinds] # 3.3.2 & 3.3.3 snmpTrapOID, sysUpTime = mibBuilder.importSymbols( @@ -373,33 +375,33 @@ def sendVarBinds( sysUpTime, uptime = sysUpTime.getName(), sysUpTime.getSyntax() # Add sysUpTime if not present already - if not varBinds or varBinds[0][0] != sysUpTime: - varBinds.insert(0, (v2c.ObjectIdentifier(sysUpTime), uptime.clone())) + if not inputVarBinds or inputVarBinds[0][0] != sysUpTime: + inputVarBinds.insert(0, (v2c.ObjectIdentifier(sysUpTime), uptime.clone())) # Search for and reposition sysUpTime if it's elsewhere - for idx, varBind in enumerate(varBinds[1:]): + for idx, varBind in enumerate(inputVarBinds[1:]): if varBind[0] == sysUpTime: - varBinds[0] = varBind - del varBinds[idx + 1] + inputVarBinds[0] = varBind + del inputVarBinds[idx + 1] break - if len(varBinds) < 2: + if len(inputVarBinds) < 2: raise error.PySnmpError( "SNMP notification PDU requires " "SNMPv2-MIB::snmpTrapOID.0 to be present" ) # Search for and reposition snmpTrapOID if it's elsewhere - for idx, varBind in enumerate(varBinds[2:]): + for idx, varBind in enumerate(inputVarBinds[2:]): if varBind[0] == snmpTrapOID: - del varBinds[idx + 2] - if varBinds[1][0] == snmpTrapOID: - varBinds[1] = varBind + del inputVarBinds[idx + 2] + if inputVarBinds[1][0] == snmpTrapOID: + inputVarBinds[1] = varBind else: - varBinds.insert(1, varBind) + inputVarBinds.insert(1, varBind) break - if varBinds[1][0] != snmpTrapOID: + if inputVarBinds[1][0] != snmpTrapOID: raise error.PySnmpError( "SNMP notification PDU requires " "SNMPv2-MIB::snmpTrapOID.0 to be present" @@ -445,7 +447,7 @@ def sendVarBinds( ) ) - for varName, varVal in varBinds: + for varName, varVal in inputVarBinds: if varName in (sysUpTime, snmpTrapOID): continue try: @@ -480,7 +482,7 @@ def sendVarBinds( raise error.ProtocolError("Unknown notify-type %r", notifyType) v2c.apiPDU.setDefaults(pdu) - v2c.apiPDU.setVarBinds(pdu, varBinds) + v2c.apiPDU.setVarBinds(pdu, inputVarBinds) # 3.3.5 try: diff --git a/pysnmp/hlapi/v1arch/asyncio/ntforg.py b/pysnmp/hlapi/v1arch/asyncio/ntforg.py index 0e15268b..158dc5e6 100644 --- a/pysnmp/hlapi/v1arch/asyncio/ntforg.py +++ b/pysnmp/hlapi/v1arch/asyncio/ntforg.py @@ -27,7 +27,7 @@ async def sendNotification( authData: CommunityData, transportTarget: AbstractTransportTarget, notifyType: str, - *varBinds: ObjectType, + *varBinds: NotificationType, **options ) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType, ...]]": r"""Creates a generator to send SNMP notification. @@ -185,7 +185,7 @@ def _cbFun(snmpDispatcher, stateHandle, errorIndication, rspPdu, _cbCtx): try: varBindsUnmade = VB_PROCESSOR.unmakeVarBinds( - snmpDispatcher.cache, varBinds, lookupMib + snmpDispatcher.cache, varBinds, lookupMib # type: ignore ) except Exception as e: future.set_exception(e) @@ -203,7 +203,7 @@ def _cbFun(snmpDispatcher, stateHandle, errorIndication, rspPdu, _cbCtx): lookupMib = True if lookupMib: - varBinds = VB_PROCESSOR.makeVarBinds(snmpDispatcher.cache, varBinds) + inputVarBinds = VB_PROCESSOR.makeVarBinds(snmpDispatcher.cache, varBinds) # # make sure required PDU payload is in place # completeVarBinds = [] @@ -229,11 +229,11 @@ def _cbFun(snmpDispatcher, stateHandle, errorIndication, rspPdu, _cbCtx): reqPdu = v2c.InformRequestPDU() v2c.apiTrapPDU.setDefaults(reqPdu) - v2c.apiTrapPDU.setVarBinds(reqPdu, varBinds) + v2c.apiTrapPDU.setVarBinds(reqPdu, inputVarBinds) - varBinds = v2c.apiTrapPDU.getVarBinds(reqPdu) + inputVarBinds = v2c.apiTrapPDU.getVarBinds(reqPdu) - v2c.apiTrapPDU.setVarBinds(reqPdu, _ensureVarBinds(varBinds)) + v2c.apiTrapPDU.setVarBinds(reqPdu, _ensureVarBinds(inputVarBinds)) if authData.mpModel == 0: reqPdu = rfc2576.v2ToV1(reqPdu) diff --git a/pysnmp/hlapi/v3arch/asyncio/ntforg.py b/pysnmp/hlapi/v3arch/asyncio/ntforg.py index a78e02e8..a9c7fc1a 100644 --- a/pysnmp/hlapi/v3arch/asyncio/ntforg.py +++ b/pysnmp/hlapi/v3arch/asyncio/ntforg.py @@ -9,18 +9,16 @@ # Zachary Lorusso # import asyncio -import sys + +from pysnmp.entity.engine import SnmpEngine from pysnmp.entity.rfc3413 import ntforg +from pysnmp.hlapi.transport import AbstractTransportTarget from pysnmp.hlapi.v3arch.asyncio.auth import CommunityData, UsmUserData from pysnmp.hlapi.v3arch.asyncio.context import ContextData from pysnmp.hlapi.v3arch.asyncio.lcd import NotificationOriginatorLcdConfigurator -from pysnmp.hlapi.v3arch.asyncio.transport import ( - Udp6TransportTarget, - UdpTransportTarget, -) from pysnmp.hlapi.varbinds import NotificationOriginatorVarBinds -from pysnmp.smi.rfc1902 import NotificationType, ObjectIdentity +from pysnmp.smi.rfc1902 import NotificationType __all__ = ["sendNotification"] @@ -29,7 +27,13 @@ async def sendNotification( - snmpEngine, authData, transportTarget, contextData, notifyType, varBinds, **options + snmpEngine: SnmpEngine, + authData: "CommunityData | UsmUserData", + transportTarget: AbstractTransportTarget, + contextData: ContextData, + notifyType: str, + *varBinds: NotificationType, + **options ): r"""Creates a generator to send SNMP notification. diff --git a/pysnmp/hlapi/varbinds.py b/pysnmp/hlapi/varbinds.py index 6e3cc375..1751e646 100644 --- a/pysnmp/hlapi/varbinds.py +++ b/pysnmp/hlapi/varbinds.py @@ -86,18 +86,25 @@ def unmakeVarBinds( class NotificationOriginatorVarBinds(MibViewControllerManager): - def makeVarBinds(self, userCache: Dict[str, Any], varBinds): + def makeVarBinds( + self, userCache: Dict[str, Any], varBinds: "tuple[NotificationType, ...]" + ) -> "tuple[ObjectType, ...]": mibViewController = self.getMibViewController(userCache) + # TODO: this shouldn't be needed if isinstance(varBinds, NotificationType): - return varBinds.resolveWithMib(mibViewController, ignoreErrors=False) + return varBinds.resolveWithMib( + mibViewController, ignoreErrors=False + ).toVarBinds() - resolvedVarBinds = [] + resolvedVarBinds: "list[ObjectType]" = [] for varBind in varBinds: if isinstance(varBind, NotificationType): resolvedVarBinds.extend( - varBind.resolveWithMib(mibViewController, ignoreErrors=False) + varBind.resolveWithMib( + mibViewController, ignoreErrors=False + ).toVarBinds() ) continue @@ -114,13 +121,18 @@ def makeVarBinds(self, userCache: Dict[str, Any], varBinds): varBind.resolveWithMib(mibViewController, ignoreErrors=False) ) - return resolvedVarBinds + return tuple(resolvedVarBinds) - def unmakeVarBinds(self, userCache: Dict[str, Any], varBinds, lookupMib=False): + def unmakeVarBinds( + self, + userCache: Dict[str, Any], + varBinds: "tuple[ObjectType, ...]", + lookupMib=False, + ) -> "tuple[ObjectType, ...]": if lookupMib: mibViewController = self.getMibViewController(userCache) - varBinds = [ + varBinds = tuple( ObjectType(ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds - ] + ) return varBinds diff --git a/pysnmp/smi/rfc1902.py b/pysnmp/smi/rfc1902.py index 94cfbabb..07184103 100644 --- a/pysnmp/smi/rfc1902.py +++ b/pysnmp/smi/rfc1902.py @@ -891,7 +891,7 @@ def loadMibs(self, *modNames): self.__args[0].loadMibs(*modNames) return self - def resolveWithMib(self, mibViewController, ignoreErrors=True): + def resolveWithMib(self, mibViewController, ignoreErrors=True) -> "ObjectType": """Perform MIB variable ID and associated value conversion. Parameters @@ -1219,7 +1219,9 @@ def loadMibs(self, *modNames): def isFullyResolved(self): return self.__state & self.ST_CLEAN - def resolveWithMib(self, mibViewController, ignoreErrors=True): + def resolveWithMib( + self, mibViewController, ignoreErrors=True + ) -> "NotificationType": """Perform MIB variable ID conversion and notification objects expansion. Parameters @@ -1328,3 +1330,9 @@ def prettyPrint(self): ) else: raise SmiError("%s object not fully initialized" % self.__class__.__name__) + + def toVarBinds(self) -> "tuple[ObjectType, ...]": + if self.__state & self.ST_CLEAN: + return tuple(self.__varBinds) + else: + raise SmiError("%s object not fully initialized" % self.__class__.__name__)