Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

traps: allow trap filtering with multiple key values #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions src/SnmpLibrary/traps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pyasn1.codec.ber import decoder

from . import utils
from robot.api import logger


def _generic_trap_filter(domain, sock, pdu, **kwargs):
Expand All @@ -34,11 +35,18 @@ def _generic_trap_filter(domain, sock, pdu, **kwargs):
if sock[0] != kwargs['host']:
return False

for oid, val in v2c.apiPDU.getVarBindList(pdu):
for oid, val in v2c.apiPDU.getVarBinds(pdu):
if 'oid' in kwargs and kwargs['oid']:
if oid == snmpTrapOID:
if val[0][0][2] != v2c.ObjectIdentifier(kwargs['oid']):
if val != v2c.ObjectIdentifier(kwargs['oid']):
return False

if 'varBinds' in kwargs and kwargs['varBinds']:
for varBind in kwargs['varBinds']:
if oid == utils.parse_oid(varBind[0]):
if val != varBind[1]:
return False

return True


Expand All @@ -55,6 +63,7 @@ def _trap_receiver_cb(transport, domain, sock, msg):
raise RuntimeError('Only SNMP v2c traps are supported.')

req, msg = decoder.decode(msg, asn1Spec=v2c.Message())
logger.info(f"Trap Received: {req.prettyPrint()}")
pdu = v2c.apiMessage.getPDU(req)

# ignore any non trap PDUs
Expand Down Expand Up @@ -85,15 +94,20 @@ class _Traps:
def __init__(self):
self._trap_filters = dict()

def new_trap_filter(self, name, host=None, oid=None):
def new_trap_filter(self, name, host=None, oid=None, varBinds=None):
"""Defines a new SNMP trap filter.

At the moment, you can only filter on the sending host and on the trap
At the moment, you can filter on the sending host and on the trap
OID.

You can also filter on the variable bindings.
Here, varBinds is a list of tuples where each tuple is oid and value pair.
Eg : [(oid1,val1),(oid2,val2)]
"""
trap_filter = functools.partial(_generic_trap_filter,
host=host,
oid=utils.parse_oid(oid))
oid=utils.parse_oid(oid),
varBinds=varBinds)
self._trap_filters[name] = trap_filter

def wait_until_trap_is_received(self, trap_filter_name, timeout=5.0,
Expand Down
Loading