diff --git a/pysnmp/entity/engine.py b/pysnmp/entity/engine.py index 535eeff7d..b1547e536 100644 --- a/pysnmp/entity/engine.py +++ b/pysnmp/entity/engine.py @@ -58,7 +58,7 @@ class SnmpEngine: """ - transportDispatcher: AbstractTransportDispatcher | None + transportDispatcher: "AbstractTransportDispatcher | None" def __init__( self, snmpEngineID=None, maxMessageSize: int = 65507, msgAndPduDsp=None @@ -215,7 +215,9 @@ def getMibBuilder(self): def setUserContext(self, **kwargs): self.cache.update({"__%s" % k: kwargs[k] for k in kwargs}) - def getUserContext(self, arg) -> Dict[str, Any] | None: # TODO: fix this type check + def getUserContext( + self, arg + ) -> "Dict[str, Any] | None": # TODO: fix this type check return self.cache.get("__%s" % arg) def delUserContext(self, arg): diff --git a/pysnmp/entity/rfc3413/config.py b/pysnmp/entity/rfc3413/config.py index 778192b7e..45cc3f1d0 100644 --- a/pysnmp/entity/rfc3413/config.py +++ b/pysnmp/entity/rfc3413/config.py @@ -20,7 +20,7 @@ def getTargetAddr(snmpEngine: SnmpEngine, snmpTargetAddrName): cache = snmpEngine.getUserContext("getTargetAddr") if cache is None: - cache: Dict[str, Any] | None = {"id": -1} + cache: "Dict[str, Any] | None" = {"id": -1} snmpEngine.setUserContext(getTargetAddr=cache) if cache["id"] != snmpTargetAddrEntry.branchVersionId: @@ -119,7 +119,7 @@ def getTargetParams(snmpEngine: SnmpEngine, paramsName): cache = snmpEngine.getUserContext("getTargetParams") if cache is None: - cache: Dict[str, Any] | None = {"id": -1} + cache: "Dict[str, Any] | None" = {"id": -1} snmpEngine.setUserContext(getTargetParams=cache) if cache["id"] != snmpTargetParamsEntry.branchVersionId: @@ -209,7 +209,7 @@ def getNotificationInfo(snmpEngine: SnmpEngine, notificationTarget): cache = snmpEngine.getUserContext("getNotificationInfo") if cache is None: - cache: Dict[str, Any] | None = {"id": -1} + cache: "Dict[str, Any] | None" = {"id": -1} snmpEngine.setUserContext(getNotificationInfo=cache) if cache["id"] != snmpNotifyEntry.branchVersionId: @@ -247,7 +247,7 @@ def getTargetNames(snmpEngine: SnmpEngine, tag): cache = snmpEngine.getUserContext("getTargetNames") if cache is None: - cache: Dict[str, Any] | None = {"id": -1} + cache: "Dict[str, Any] | None" = {"id": -1} snmpEngine.setUserContext(getTargetNames=cache) if cache["id"] == snmpTargetAddrEntry.branchVersionId: diff --git a/pysnmp/hlapi/asyncio/cmdgen.py b/pysnmp/hlapi/asyncio/cmdgen.py index 962a4ae7a..973bc2278 100644 --- a/pysnmp/hlapi/asyncio/cmdgen.py +++ b/pysnmp/hlapi/asyncio/cmdgen.py @@ -57,7 +57,7 @@ async def getCmd( snmpEngine: SnmpEngine, - authData: CommunityData | UsmUserData, + authData: "CommunityData | UsmUserData", transportTarget: AbstractTransportTarget, contextData: ContextData, *varBinds, @@ -177,7 +177,7 @@ def __cbFun( async def setCmd( snmpEngine: SnmpEngine, - authData: CommunityData | UsmUserData, + authData: "CommunityData | UsmUserData", transportTarget: AbstractTransportTarget, contextData: ContextData, *varBinds, @@ -297,7 +297,7 @@ def __cbFun( async def nextCmd( snmpEngine: SnmpEngine, - authData: CommunityData | UsmUserData, + authData: "CommunityData | UsmUserData", transportTarget: AbstractTransportTarget, contextData: ContextData, *varBinds, @@ -395,8 +395,11 @@ def __cbFun( lookupMib, future = cbCtx if future.cancelled(): return - if (options.get('ignoreNonIncreasingOid', False) and - errorIndication and isinstance(errorIndication, errind.OidNotIncreasing)): + if ( + options.get("ignoreNonIncreasingOid", False) + and errorIndication + and isinstance(errorIndication, errind.OidNotIncreasing) + ): errorIndication = None try: varBindsUnmade = [ @@ -431,7 +434,7 @@ def __cbFun( async def bulkCmd( snmpEngine: SnmpEngine, - authData: CommunityData | UsmUserData, + authData: "CommunityData | UsmUserData", transportTarget: AbstractTransportTarget, contextData: ContextData, nonRepeaters: int, @@ -560,8 +563,11 @@ def __cbFun( lookupMib, future = cbCtx if future.cancelled(): return - if (options.get('ignoreNonIncreasingOid', False) and - errorIndication and isinstance(errorIndication, errind.OidNotIncreasing)): + if ( + options.get("ignoreNonIncreasingOid", False) + and errorIndication + and isinstance(errorIndication, errind.OidNotIncreasing) + ): errorIndication = None try: varBindsUnmade = [ diff --git a/pysnmp/hlapi/asyncio/transport.py b/pysnmp/hlapi/asyncio/transport.py index 116fdd4e8..5ed5c647f 100644 --- a/pysnmp/hlapi/asyncio/transport.py +++ b/pysnmp/hlapi/asyncio/transport.py @@ -53,7 +53,7 @@ class instance. transportDomain: Tuple[int, ...] = udp.domainName protoTransport = udp.UdpAsyncioTransport - def _resolveAddr(self, transportAddr: tuple) -> tuple[str, int]: + def _resolveAddr(self, transportAddr: Tuple) -> Tuple[str, int]: try: return socket.getaddrinfo( transportAddr[0], diff --git a/pysnmp/hlapi/transport.py b/pysnmp/hlapi/transport.py index b9fd34db1..a8b18d0e9 100644 --- a/pysnmp/hlapi/transport.py +++ b/pysnmp/hlapi/transport.py @@ -4,6 +4,7 @@ # Copyright (c) 2005-2019, Ilya Etingof # License: https://www.pysnmp.com/pysnmp/license.html # +from typing import Tuple from pyasn1.compat.octets import null from pysnmp.carrier.base import AbstractTransport, AbstractTransportAddress from pysnmp import error @@ -16,15 +17,15 @@ class AbstractTransportTarget: retries: int timeout: float - transport: AbstractTransport | None - transportAddr: tuple[str, int] + transport: "AbstractTransport | None" + transportAddr: Tuple[str, int] transportDomain = None protoTransport = AbstractTransport def __init__( self, - transportAddr: tuple, + transportAddr: Tuple, timeout: float = 1, retries: int = 5, tagList=null, @@ -79,5 +80,5 @@ def verifyDispatcherCompatibility(self, snmpEngine: SnmpEngine): ) ) - def _resolveAddr(self, transportAddr: tuple) -> tuple[str, int]: + def _resolveAddr(self, transportAddr: Tuple) -> Tuple[str, int]: raise NotImplementedError()