Skip to content

Commit

Permalink
Upgraded test cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
lextm committed Oct 13, 2024
1 parent 7e708dc commit fbeb35d
Show file tree
Hide file tree
Showing 35 changed files with 239 additions and 237 deletions.
60 changes: 31 additions & 29 deletions tests/agent_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,31 @@ async def start_agent(
snmpEngine = engine.SnmpEngine()

# Set up transport endpoint
config.addTransport(
config.add_transport(
snmpEngine,
udp.DOMAIN_NAME,
udp.UdpTransport().openServerMode(("localhost", AGENT_PORT)),
udp.UdpTransport().open_server_mode(("localhost", AGENT_PORT)),
)

if enable_ipv6:
config.addTransport(
config.add_transport(
snmpEngine,
udp6.DOMAIN_NAME,
udp6.Udp6Transport().openServerMode(("localhost", AGENT_PORT)),
udp6.Udp6Transport().open_server_mode(("localhost", AGENT_PORT)),
)

# Set up community data
config.addV1System(snmpEngine, "public", "public")
config.add_v1_system(snmpEngine, "public", "public")
# Add SNMP v3 user
config.addV3User(
config.add_v3_user(
snmpEngine, "usr-none-none", config.USM_AUTH_NONE, config.USM_PRIV_NONE
)

config.addV3User(snmpEngine, "usr-sha-none", config.USM_AUTH_HMAC96_SHA, "authkey1")
config.add_v3_user(
snmpEngine, "usr-sha-none", config.USM_AUTH_HMAC96_SHA, "authkey1"
)

config.addV3User(
config.add_v3_user(
snmpEngine,
"usr-sha-aes",
config.USM_AUTH_HMAC96_SHA,
Expand All @@ -53,7 +55,7 @@ async def start_agent(
"privkey1",
)

config.addV3User(
config.add_v3_user(
snmpEngine,
"usr-sha-aes256",
config.USM_AUTH_HMAC96_SHA,
Expand All @@ -63,21 +65,21 @@ async def start_agent(
)

# Allow read MIB access for this user / securityModels at VACM
config.addVacmUser(snmpEngine, 1, "public", "noAuthNoPriv", (1, 3, 6), (1, 3, 6))
config.addVacmUser(snmpEngine, 2, "public", "noAuthNoPriv", (1, 3, 6), (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-none-none", "noAuthNoPriv", (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-sha-none", "authNoPriv", (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-sha-aes", "authPriv", (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-sha-aes256", "authPriv", (1, 3, 6))
config.add_vacm_user(snmpEngine, 1, "public", "noAuthNoPriv", (1, 3, 6), (1, 3, 6))
config.add_vacm_user(snmpEngine, 2, "public", "noAuthNoPriv", (1, 3, 6), (1, 3, 6))
config.add_vacm_user(snmpEngine, 3, "usr-none-none", "noAuthNoPriv", (1, 3, 6))
config.add_vacm_user(snmpEngine, 3, "usr-sha-none", "authNoPriv", (1, 3, 6))
config.add_vacm_user(snmpEngine, 3, "usr-sha-aes", "authPriv", (1, 3, 6))
config.add_vacm_user(snmpEngine, 3, "usr-sha-aes256", "authPriv", (1, 3, 6))

# Configure SNMP context
snmpContext = context.SnmpContext(snmpEngine)

if enable_custom_objects:
# --- create custom Managed Object Instances ---
mibBuilder = snmpContext.getMibInstrum().getMibBuilder()
mibBuilder = snmpContext.get_mib_instrum().get_mib_builder()

MibScalar, MibScalarInstance = mibBuilder.importSymbols(
MibScalar, MibScalarInstance = mibBuilder.import_symbols(
"SNMPv2-SMI", "MibScalar", "MibScalarInstance"
)

Expand All @@ -99,7 +101,7 @@ def setValue(self, name, idx, value):
print(f"SET operation received. New value: {value}")
return self.getSyntax().clone(value)

mibBuilder.exportSymbols(
mibBuilder.export_symbols(
"__MY_MIB",
MibScalar((1, 3, 6, 1, 4, 1, 60069, 9, 1), v2c.OctetString()),
SlowMibScalarInstance(
Expand All @@ -118,24 +120,24 @@ def setValue(self, name, idx, value):
if enable_table_creation:
# --- define custom SNMP Table within a newly defined EXAMPLE-MIB ---

mibBuilder = snmpContext.getMibInstrum().getMibBuilder()
mibBuilder = snmpContext.get_mib_instrum().get_mib_builder()

(
MibTable,
MibTableRow,
MibTableColumn,
MibScalarInstance,
) = mibBuilder.importSymbols(
) = mibBuilder.import_symbols(
"SNMPv2-SMI",
"MibTable",
"MibTableRow",
"MibTableColumn",
"MibScalarInstance",
)

(RowStatus,) = mibBuilder.importSymbols("SNMPv2-TC", "RowStatus")
(RowStatus,) = mibBuilder.import_symbols("SNMPv2-TC", "RowStatus")

mibBuilder.exportSymbols(
mibBuilder.export_symbols(
"__EXAMPLE-MIB",
# table object
exampleTable=MibTable((1, 3, 6, 6, 1)).setMaxAccess("read-create"),
Expand Down Expand Up @@ -170,16 +172,16 @@ def setValue(self, name, idx, value):
exampleTableColumn2,
exampleTableColumn3,
exampleTableStatus,
) = mibBuilder.importSymbols(
) = mibBuilder.import_symbols(
"__EXAMPLE-MIB",
"exampleTableEntry",
"exampleTableColumn2",
"exampleTableColumn3",
"exampleTableStatus",
)
rowInstanceId = exampleTableEntry.getInstIdFromIndices("example record one")
mibInstrumentation = snmpContext.getMibInstrum()
mibInstrumentation.writeVars(
mibInstrumentation = snmpContext.get_mib_instrum()
mibInstrumentation.write_variables(
(exampleTableColumn2.name + rowInstanceId, "my string value"),
(exampleTableColumn3.name + rowInstanceId, 123456),
(exampleTableStatus.name + rowInstanceId, "createAndGo"),
Expand All @@ -192,9 +194,9 @@ def setValue(self, name, idx, value):
cmdrsp.SetCommandResponder(snmpEngine, snmpContext)

# Start the event loop
snmpEngine.transportDispatcher.jobStarted(1)
snmpEngine.transport_dispatcher.job_started(1)

snmpEngine.openDispatcher()
snmpEngine.open_dispatcher()

# Wait for the agent to start
await asyncio.sleep(1)
Expand Down Expand Up @@ -234,5 +236,5 @@ async def __aenter__(self):
return self.agent

async def __aexit__(self, exc_type, exc_val, exc_tb):
self.agent.transportDispatcher.jobFinished(1)
self.agent.closeDispatcher()
self.agent.transport_dispatcher.job_finished(1)
self.agent.close_dispatcher()
2 changes: 1 addition & 1 deletion tests/hlapi/v1arch/asyncio/manager/cmdgen/test_slim.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def test_v1_next():
"localhost", # "demo.pysnmp.com",
AGENT_PORT, # 161,
ObjectType(
ObjectIdentity("SNMPv2-MIB", "sysDescr", 0).loadMibs("PYSNMP-MIB")
ObjectIdentity("SNMPv2-MIB", "sysDescr", 0).load_mibs("PYSNMP-MIB")
),
)

Expand Down
16 changes: 8 additions & 8 deletions tests/hlapi/v1arch/asyncio/manager/cmdgen/test_v1arch_v1_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async def test_v1_get():
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()

iterator = await getCmd(
iterator = await get_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -31,15 +31,15 @@ async def test_v1_get():
name = pysnmp_errorStatus.namedValues.getName(errorStatus)
assert name == "noError"

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()


@pytest.mark.asyncio
async def test_v1_get_ipv6():
async with AgentContextManager(enable_ipv6=True):
snmpDispatcher = SnmpDispatcher()

iterator = await getCmd(
iterator = await get_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await Udp6TransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -58,7 +58,7 @@ async def test_v1_get_ipv6():
name = pysnmp_errorStatus.namedValues.getName(errorStatus)
assert name == "noError"

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()


# TODO:
Expand All @@ -85,7 +85,7 @@ async def test_v1_get_ipv6():
# except asyncio.TimeoutError:
# assert False, "Test case timed out"
# finally:
# snmpDispatcher.transportDispatcher.closeDispatcher()
# snmpDispatcher.transportDispatcher.close_dispatcher()


# @pytest.mark.asyncio
Expand Down Expand Up @@ -114,14 +114,14 @@ async def test_v1_get_ipv6():
# except asyncio.TimeoutError:
# assert False, "Test case timed out"
# finally:
# snmpDispatcher.transportDispatcher.closeDispatcher()
# snmpDispatcher.transportDispatcher.close_dispatcher()


@pytest.mark.asyncio
async def test_v1_get_no_access_object():
async with AgentContextManager(enable_custom_objects=True):
snmpDispatcher = SnmpDispatcher()
errorIndication, errorStatus, errorIndex, varBinds = await getCmd(
errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(
Expand All @@ -131,4 +131,4 @@ async def test_v1_get_no_access_object():
)
assert errorIndication is None
assert errorStatus.prettyPrint() == "noSuchName" # v1 does not have noAccess
snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
async def test_v1_next():
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()
errorIndication, errorStatus, errorIndex, varBinds = await nextCmd(
errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
ObjectType(
ObjectIdentity("SNMPv2-MIB", "sysDescr", 0).loadMibs("PYSNMP-MIB")
ObjectIdentity("SNMPv2-MIB", "sysDescr", 0).load_mibs("PYSNMP-MIB")
),
)

Expand All @@ -44,4 +44,4 @@ async def test_v1_next():
) # IMPORTANT: MIB is needed to resolve this name
assert type(varBinds[0][1]).__name__ == "ObjectIdentity"

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()
16 changes: 8 additions & 8 deletions tests/hlapi/v1arch/asyncio/manager/cmdgen/test_v1arch_v1_set.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
from pysnmp.entity.engine import SnmpEngine
from pysnmp.hlapi.v1arch.asyncio.cmdgen import setCmd, walkCmd
from pysnmp.hlapi.v1arch.asyncio.cmdgen import set_cmd, walk_cmd
from pysnmp.hlapi.v1arch.asyncio.dispatch import SnmpDispatcher
from pysnmp.hlapi.v1arch.asyncio.transport import UdpTransportTarget
from pysnmp.hlapi.v1arch.asyncio.auth import CommunityData
Expand All @@ -14,7 +14,7 @@ async def test_v1_set():
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()

iterator = await setCmd(
iterator = await set_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -30,7 +30,7 @@ async def test_v1_set():
assert varBinds[0][1].prettyPrint() == "Shanghai"
assert isinstance(varBinds[0][1], OctetString)

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()


@pytest.mark.asyncio
Expand All @@ -39,7 +39,7 @@ async def test_v1_set_table_creation():
snmpDispatcher = SnmpDispatcher()

# Perform a SNMP walk to get all object counts
objects = walkCmd(
objects = walk_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -51,7 +51,7 @@ async def test_v1_set_table_creation():

object_counts = len(objects_list)

errorIndication, errorStatus, errorIndex, varBinds = await setCmd(
errorIndication, errorStatus, errorIndex, varBinds = await set_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -67,7 +67,7 @@ async def test_v1_set_table_creation():
assert varBinds[0][1].prettyPrint() == "My value"
assert type(varBinds[0][1]).__name__ == "OctetString"

errorIndication, errorStatus, errorIndex, varBinds = await setCmd(
errorIndication, errorStatus, errorIndex, varBinds = await set_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -82,7 +82,7 @@ async def test_v1_set_table_creation():
# assert isinstance(varBinds[0][1], Integer)

# Perform a SNMP walk to get all object counts
objects = walkCmd(
objects = walk_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -94,4 +94,4 @@ async def test_v1_set_table_creation():

assert len(objects_list) == object_counts + 4

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
async def test_v1_walk():
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()
objects = walkCmd(
objects = walk_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -32,14 +32,14 @@ async def test_v1_walk():

assert len(objects_list) == 267

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()


@pytest.mark.asyncio
async def test_v1_walk_subtree():
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()
objects = walkCmd(
objects = walk_cmd(
snmpDispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -65,4 +65,4 @@ async def test_v1_walk_subtree():

assert len(objects_list) == 8

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
async def test_v2_walk(): # some agents have different v2 GET NEXT behavior
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()
objects = walkCmd(
objects = walk_cmd(
snmpDispatcher,
CommunityData("public"),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -32,14 +32,14 @@ async def test_v2_walk(): # some agents have different v2 GET NEXT behavior

assert len(objects_list) == 267

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()


@pytest.mark.asyncio
async def test_v2_walk_subtree():
async with AgentContextManager():
snmpDispatcher = SnmpDispatcher()
objects = walkCmd(
objects = walk_cmd(
snmpDispatcher,
CommunityData("public"),
await UdpTransportTarget.create(("localhost", AGENT_PORT)),
Expand All @@ -65,4 +65,4 @@ async def test_v2_walk_subtree():

assert len(objects_list) == 8

snmpDispatcher.transportDispatcher.closeDispatcher()
snmpDispatcher.transportDispatcher.close_dispatcher()
Loading

0 comments on commit fbeb35d

Please sign in to comment.