-
Notifications
You must be signed in to change notification settings - Fork 0
/
port_scan.py
executable file
·1142 lines (1043 loc) · 48 KB
/
port_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
#Copyright (C) 2009 Allen Sanabria
#This program is free software; you can redistribute it and/or modify it under
#the terms of the GNU General Public License as published by the Free Software Foundation;
#either version 2 of the License, or (at your option) any later version.
#This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
#without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#See the GNU General Public License for more details. You should have received a copy of i
#the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc.,
#51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Revision 1.14 07/17/15
* Fixed some spelling. Such as 'retreive_communities"
* Added more verbose statements.
* Added some comments by the dictionaries
"""
"""Revision 1.13 10/01/09
* Fixed line 348 as per christianha. return nmac.lower()
This fix will allow you to pass a MAC in all uppercase and still match though the switch is responding in lowercase.
"""
"""Revision 1.12 09/18/09
* Added follow switch option. Now when you run the Port Report with the --report option, you an also
pass the --follow option. This option will follow every dp neighbor connectetd to the switch you are
scaning as well as the switches it is scanning.
"""
"""Revision 1.11 09/13/09
* More code clean up and another increase in speed.
* Also port_report can now follow EtherChannel
* Fixed issue, where the matching of the cdp neighbor was not matching correctly
"""
"""Revision 1.10 09/09/09
Code Clean up and a slight increase in speed ( by a few seconds ) during the search by mac or ip
"""
"""Revision 1.9 05/08/2009
Code changes and Added CDP support..
* Detect CDP Neighbors during the scan for MAC Addresses or IP Addresses
"""
""" Revision 1.7 04/30/09
This is a big update for Port Report.... In this revision the following brands and devices are supported
1. Cisco
* Catalyst 6509 w/ Supervisor 720 running IOS
* Catalyst 3560
* Catalyst 3550 (SMI)
* Cisco CIGESM series Chassis Blades
* Cisco Catalyst 2960
2. Foundry
* Foundry Server Iron
3. Nortel
* Nortel Passport 8600
* Nortel 5520 Ethernet Routing Switch
4. HP
* HP Procurve 5406xl
"""
#Now at the 5th revision 1.5 04/21/09
"""This is a complete rewrite of the get_port.py script 04/12/09
This script now accurately reports all MAC Addresses on the Port that you specified
*Also better error checking added
*Cleaner Code
*Reusable Functions
This scipt is intended for Administrator/Engineers who need to find the port on a switch
that they are plugged into using either the MAC Address or the IP Address.
So far this has been tested on Cisco Switches, though I assume it will work on other ones as well"""
import sys
import re
import string
import getopt
from time import ctime
from socket import gethostbyaddr
try:
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.hlapi import IpAddress
except Exception as e:
print("You need to download pysnmp and pyasn1", e)
sys.exit(1)
def usage():
print("""
-d, --device This is the device you want to scan
-c, --community This is the SNMP community string to use
-m, --mac This is the MAC Address you are using to search foir what port your device is plugged into
-i, --ip This is the IP Address you are using to find the MAC Address of the device and The port in the switched it is plugged into
-n, --pname This is the Port Name you are searching For
-v, --verbose This will print out the time stamps of the for loops and of the functions
-r, --report This will print out all the mac addresses on the switch you are scanning and what ports they are connected to.
-f, --follow This will follow all the cdp neighbors found per switch when using the -r or --report option.
example below..
python port_report.py -d 192.168.101.1 -c public -i "192.168.101.201"
This IPAddress is not in the ARP table
python port_report.py -d 192.168.101.1 -c public -i "192.168.101.209"
MAC = 00 14 38 7f 6e 38
Port = GigabitEthernet1/17
Vlan = 175
IPAddr = 192.168.101.209
python port_report.py -d 192.168.101.1 -c public -m "00 14 38 4f 5e 39"
MAC = 00 14 38 4f 5e 39
Port = GigabitEthernet1/17
Vlan = 175
IPAddr = 192.168.101.201
python port_report.py -d 192.168.101.1 -c public -n "1/40"
Port 1/40 has the below MAC Addresses associated with it
MAC = 00 1b 95 97 3c 81
Port = GigabitEthernet1/40
Vlan = 1
IPAddr = The IP Address for this MAC is not in the ARP Table
MAC = 00 15 fa b4 10 06
Port = GigabitEthernet1/40
Vlan = 174
IPAddr = The IP Address for this MAC is not in the ARP Table
Total MAC Addresses associated with this interface 2
python port_report.py -d 192.168.101.1 -c public -n "1/2"
Port 1/2 has the below MAC Addresses associated with it
MAC = 08 00 0f 20 b3 aa
Port = GigabitEthernet1/2
Vlan = 176
IPAddr = 192.168.101.104
python port_report.py -d 192.168.101.1 -c public -r -f
Running Switch Report on 192.168.101.1
GigabitEthernet1/40,00 1b 90 99 3d 83,None,None,vlan1,up,up,unknown,1000mb,
GigabitEthernet10/15,00 23 5e ef 34 81,192.168.101.2,Pointer Record Not set for 192.168.101.62,vlan1,up,up,fullDuplex,1000mb,GIG Laser to 71Fifth
GigabitEthernet1/34,00 01 02 03 03 05,None,None,vlan174,up,up,unknown,1000mb,CCA_CAS Untrusted Interface IP 168.3
""")
if __name__ == '__main__':
sys.exit(0)
else:
return
def main():
if verbose: print(ctime(), " Main Started ")
if ( community and device and ( mac or ip or pname or report ) ):
snmperror, switchtype = get( device, community, oTable["sysDescr"], 0 )
switchtype = str(switchtype)
if snmperror:
print(snmperror, "Either Wrong Community String or Firewall or SNMP Not Running")
if __name__ == '__main__':
sys.exit(1)
else:
return
sbrand = None
scanned_neighbor = False
dswitch = device
switch = followSwitch( dswitch, community )
entIpList = {}
ipList = []
entaddr = walk( device, community, oTable["ipAdEntAddr"] )
for line in entaddr:
ipList.append(switch.convertOctectIp( line[0][1] ))
entIpList[device] = ipList
nip = ""
nmac = ""
count = 0
if mac or ip:
switch.set_duplex()
switch.set_speed()
switch.set_port_name()
switch.set_phys_addr()
if verbose: print("\nYou should get the switchtype after this:\n")
if verbose: print(switchtype)
mTable = ""
if mac:
nmac = mac
nmac, valid = verify_mac( nmac )
if not valid:
print("Your mac %s is in the wrong format" % (mac))
if __name__ == '__main__':
sys.exit(1)
else:
return
nip = switch.findIpByMac( nmac )
if ip:
nip = ip
nmac = switch.findMacByIp( nip )
if nmac == None:
print("This IP Address, %s , is not in the ARP table" % (ip))
if __name__ == '__main__':
sys.exit(1)
else:
return
if ( re.search("Cisco|PROCURVE|HP|Nortel|ERS|Foundry", switchtype, re.IGNORECASE ) ):
mTable, ifIndex = switch.find_mac( nmac, nip )
count = 0
sswitch = dswitch
if (mTable):
count += 1
for key, val in list(mTable.items()):
print("Switch Connected to %s" % ( dswitch ))
print("SwitchPort = %s\nSwitchPortSpeed = %s\nSwitchPortDuplex = %s\nSwitchVlan = %s" % \
( val["ifDescr"], val["ifSpeed"], val["ifDuplex"], val["vlan"] ))
print("SnmpHostName = %s\nSnmpHostDescr = %s\nHostMAC = %s\nHostIP = %s\nHostName = %s\n" % \
( val["sysName"], val["sysDescr"], val["nmac"], val["ipAddr"], val["hostName"] ))
if re.search("Port-channel", val["ifDescr"]):
if verbose: print("ifIndex %d is a %s interface" % ( ifIndex, val["ifDescr"] ))
ifIndex_pagp_list = switch.get_pagp_ports( ifIndex )
if verbose: print("List of Interfaces is in EtherChannel: ", ifIndex_pagp_list)
ifIndex = ifIndex_pagp_list[0]
neighbor = followSwitch(dswitch, community).get_cdp_neighbor_ip( ifIndex )
if neighbor:
if verbose: print(neighbor)
sswitch = dswitch
while neighbor:
old_neighbor = neighbor
new_neighbor = followSwitch(sswitch, community)
try:
ifIndex, count1 = new_neighbor.get_mac_from_cdp_neighbor(neighbor, nmac, nip)
except Exception as e:
if verbose: print("Stopping: {}".format(e))
break
count += count1
sswitch = neighbor
neighbor = followSwitch(sswitch, community).get_cdp_neighbor_ip( ifIndex )
if neighbor:
entaddr = walk( sswitch, community, oTable["ipAdEntAddr"] )
iplist = []
for line in entaddr:
ipList.append(new_neighbor.convertOctectIp( line[0][1] ))
entIpList[neighbor] = ipList
for key, value in list(entIpList.items()):
for line in value:
if line == neighbor:
scanned_neighbor = True
print("this neighbor has already been scanned")
print(line, neighbor)
neighbor = None
if ( count >= 1 ):
print("This MAC %s was finally traced to this switch %s" % ( nmac, sswitch ))
if ( count == 0 ):
print("The Mac Address %s is not on this switch %s" % ( nmac, dswitch ))
if pname:
count = 0
switch.set_duplex()
switch.set_speed()
switch.set_port_name()
switch.set_phys_addr()
switch.find_port_match( pname )
ifIndex = switch.get_ifIndex()
ifName = switch.get_ifName()
sbrand = switch.get_sbrand()
connected_macs = []
if ( re.search("Cisco|PROCURVE|HP", switchtype, re.IGNORECASE ) ):
lcomm, lvlan = switch.retrieve_communities( )
if verbose: print(ctime(), "Retrieving Community Strings %s\n" % ( lcomm ))
for i in range(len(lcomm)):
mdict = switch.return_mac_by_ifIndex( lcomm[i], lvlan[i] )
if mdict:
connected_macs.append(mdict)
elif ( re.search("Nortel|ERS|Foundry", switchtype, re.IGNORECASE ) ):
comm = community
vlan = None
mdict = switch.return_mac_by_ifIndex( comm, vlan )
if mdict:
connected_macs.append(mdict)
if len(connected_macs) > 0:
for host in connected_macs:
for key, val in list(host.items()):
count += 1
print("SwitchPort = %s\nSwitchPortSpeed = %s\nSwitchPortDuplex = %s\nSwitchVlan = %s" \
% ( val["ifDescr"], val["ifSpeed"], val["ifDuplex"], val["vlan"] ))
print("SnmpHostName = %s\nHostDescr = %s\nHostMAC = %s\nHostIP = %s\nHostName = %s\n" \
% ( val["sysName"], val["sysDescr"], val["nmac"], val["ipAddr"], val["hostName"] ))
print("There are %d MAC Addresses connected to port %s" % ( count, pname ))
else:
print("There are not any MAC Addresses connected to this %s port" % ( pname ))
if report:
dswitch = device
count = write_report(dswitch, entIpList)
tcount = 0
if follow:
for item in count:
tcount += item
print("Total MAC Address(es) found: %d" % tcount)
def write_report( dev, entIpList, tcount = [] ):
print("Running Switch Report on %s" % ( dev ))
scanned_neighbor = False
ipList = []
snmperror, switchtype = get( dev, community, oTable["sysDescr"], 0 )
switchtype = str(switchtype)
if snmperror:
print(snmperror, "Either Wrong Community String or Firewall or SNMP Not Running")
if __name__ == '__main__':
sys.exit(1)
else:
return
switch = followSwitch( dev, community )
switch.set_duplex()
switch.set_speed()
switch.set_port_name()
switch.set_phys_addr()
switch.set_oper_status()
switch.set_admin_status()
switch.set_alias()
count = 0
conn_output = open("connnected_ports_on_"+dev+".csv", "a")
if ( re.search("Cisco|PROCURVE|HP", switchtype, re.IGNORECASE ) ):
lcomm, lvlan = switch.retrieve_communities( )
if verbose: print(ctime(), "Retreiving Community Strings\n %s" % ( lcomm ))
for i in range(len(lcomm)):
macout = switch.switch_report( lcomm[i], lvlan[i], conn_output )
count += macout
tcount.append(macout)
print("total MAC Addresses found on %s: %d\n" % ( dev, count ))
elif ( re.search("Nortel|ERS|Foundry", switchtype, re.IGNORECASE ) ):
macout = switch.switch_report( community, None, conn_output )
count += macout
tcount.append(macout)
print("total MAC Addresses found on %s: %d\n" % ( dev, count ))
conn_output.close()
if follow:
neighbors = switch.get_cdp_neighbor_ip_table()
if len(neighbors) > 0:
for neighbor in neighbors:
if verbose: "Print neighbor %s " % neighbor
for key, value in list(entIpList.items()):
for line in value:
if line == neighbor:
scanned_neighbor = True
if verbose:
print("this neighbor has already been scanned")
print(line, neighbor)
break
if scanned_neighbor:
break
if scanned_neighbor:
continue
else:
entaddr = walk( neighbor, community, oTable["ipAdEntAddr"] )
if entaddr == "requestTimedOut":
continue
for line in entaddr:
ipList.append(switch.convertOctectIp( line[0][1] ))
entIpList[neighbor] = ipList
write_report(neighbor, entIpList, tcount)
else:
tcount = count
return tcount
def verify_mac( nmac ):
"""Verifies the MAC Addresses that was inputed by the user
And returns the newly converted nmac and the Valid code"""
valid = ""
if re.search("([0-9a-fA-F]{2}\:){5}[0-9a-fA-F]{2}", mac):
nmac = re.sub("\:", " ", mac)
valid = 1
elif re.search("([0-9a-fA-F]{2}\-){5}[0-9a-fA-F]{2}", mac):
nmac = re.sub("\-", " ", mac)
valid = 1
elif re.search("([0-9a-fA-F]{4}\.){2}[0-9a-fA-F]{4}", mac):
nmac = re.sub("\s{2}", " ", re.sub("^\s|\s$", "", re.sub("\'|\,|\[|\]", "", str(re.split("([0-9a-fA-F]{2})", re.sub("\.", "", mac) ) ) ) ) )
valid = 1
elif re.search("([0-9a-fA-F]{2}\s){5}[0-9a-fA-F]{2}", mac):
valid = 1
pass
else:
valid = 0
if verbose: print(ctime(), " Finished Checking for mac")
return( nmac, valid )
def walk( dswitch, commVlan, oid ):
"""This function will return the table of OID's that I am walking"""
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, \
generic = (
cmdGen.nextCmd(
cmdgen.CommunityData(commVlan),
cmdgen.UdpTransportTarget((dswitch, 161)),
oid,
lookupNames=True, lookupValues=True
)
)
if errorIndication:
return errorIndication
return generic
def get( device, commVlan, oid, rval, indexOid="None" ):
"""This is essentially my generic snmpget, but with options. Since if I am doing an
snmpget, I will usually either pass a index ID or a list of ID's, This function makes
my life easier, by not creating multiple getCmd's"""
cmdGen = cmdgen.CommandGenerator()
if not isinstance(rval, int):
rval = 0
oidN = list(oid)
if isinstance(indexOid, int):
oidN.append(indexOid)
elif type(indexOid) == list:
oidN = oidN + list(map(int, indexOid))
oidN = tuple(oidN)
errorIndication, errorStatus, errorIndex, generic = (
cmdGen.getCmd(
cmdgen.CommunityData(commVlan),
cmdgen.UdpTransportTarget((device, 161)),
oidN,
lookupNames=True, lookupValues=True
)
)
if errorIndication:
return (errorIndication, generic )
if rval == 0:
return (errorIndication, str(generic[0][1]) )
elif rval == 1:
return (errorIndication, generic[0][0] )
elif rval == 2:
return (errorIndication, str(generic[0][1]) )
def hex2dec(mack):
return int(mack, 16)
def dec2hex(mack):
return re.sub( "^0x", "", hex(mack) )
def alias_name( switch ):
aliasW = walk( switch, community, oTable['ifAlias'] )
aliasH = {}
for i in aliasW:
aliasH[i[0][0][-1]] = str(i[0][1])
return aliasH
def admin_settings( switch ):
adminW = walk( switch , community, oTable['ifAdminStatus'] )
adminH = {}
for i in adminW:
adminH[i[0][0][-1]] = str(ostatus[i[0][1]])
return adminH
def oper_settings( switch ):
operW = walk( switch, community, oTable['ifOperStatus'] )
operH = {}
for i in operW:
operH[i[0][0][-1]] = str(ostatus[i[0][1]])
return operH
def duplex_settings( switch ):
duplexW = walk( switch, community, oTable['dot3StatsDuplexStatus'] )
duplexH = {}
for i in duplexW:
duplexH[i[0][0][-1]] = str(duplex[i[0][1]])
return duplexH
def speed_settings( switch ):
speedW = walk( switch, community, oTable['ifSpeed'] )
speedH = {}
for i in speedW:
speedH[i[0][0][-1]] = str(port_speed(int(i[0][1]) ) )
return speedH
def port_name( switch ):
portW = walk( switch, community, oTable['ifDescr'] )
portH = {}
for i in portW:
portH[i[0][0][-1]] = str(i[0][1])
return portH
def port_speed( speed ):
speed = speed / 1000000
speed = str(speed)+"mb"
return speed
def exit_error():
'''Exit if script run as standalone'''
if __name__ == '__main__':
sys.exit(1)
def exit():
'''Exit if script run as standalone'''
if __name__ == '__main__':
sys.exit(0)
class followSwitch(object):
def __init__(self, switch, comm="public" ):
self.community = comm
self.switch = switch
self.sbrand = None
self.snmperror, self.switchtype = get( self.switch, self.community, oTable["sysDescr"], 2)
if ( re.search("Cisco", self.switchtype, re.IGNORECASE ) ):
self.sbrand = "Cisco"
elif ( re.search("PROCURVE", self.switchtype, re.IGNORECASE ) ):
self.sbrand = "PROCURVE"
elif ( re.search("HP", self.switchtype, re.IGNORECASE ) ):
self.sbrand = "HP"
elif ( re.search("Nortel|ERS", self.switchtype, re.IGNORECASE ) ):
self.sbrand = "Nortel"
elif ( re.search("Foundry", self.switchtype, re.IGNORECASE ) ):
self.sbrand = "Foundry"
def get_sbrand( self ):
return self.sbrand
def set_duplex( self ):
try:
self.duplexH = duplex_settings( self.switch )
except:
self.duplexH = None
def set_speed( self ):
try:
self.speedH = speed_settings( self.switch )
except:
self.speedH = None
def set_port_name( self ):
self.portNameH = port_name( self.switch )
def set_oper_status( self ):
self.operH = oper_settings( self.switch )
def set_admin_status( self ):
self.adminH = admin_settings( self.switch )
def set_alias( self ):
self.aliasH = alias_name( self.switch )
def set_phys_addr( self ):
self.PhysAddr = walk( self.switch, community, oTable["atPhysAddress"] )
def set_ifIndex_dict( self ):
self.portNameH = port_name( self.switch )
self.operH = oper_settings( self.switch )
self.adminH = admin_settings( self.switch )
self.aliasH = alias_name( self.switch )
self.PhysAddr = walk( self.switch, community, oTable["atPhysAddress"] )
try:
self.duplexH = duplex_settings( self.switch )
except:
self.duplexH = None
try:
self.speedH = speed_settings( self.switch )
except:
self.speedH = None
def findMacByIp( self, nip ):
"""This Function will return the MAC Address if the IPAddress that was in the ARP table
if not it will return None"""
count = 0
nmac = ""
for ipAddress in self.PhysAddr:
ip1 = str(ipAddress[0][0][-4:])
ipmap = re.sub("\'|\[|\]|\(|\)", "", re.sub(",\s", ".", ip1))
if ( nip == ipmap ):
count += 1
nmac = self.convertOctectMac(ipAddress[0][1])
break
if count == 1:
return( nmac )
else:
nmac = None
return( nmac )
def findIpByMac( self, nmac ):
"""This Function will only return the IP Address of the MAC you are searching for if
the IP Address is in the ARP table. """
self.ipAddr = None
for mack in self.PhysAddr:
cmac = self.convertOctectMac(mack[0][1])
if re.search(nmac, cmac, re.IGNORECASE):
ip1 = str(mack[0][0][-4:])
self.ipAddr = re.sub("\'|\(|\)|,", "", ip1).replace(" ", ".")
break
return self.ipAddr
def convertOctectMac(self, mack):
"""This Function will convert the OctectString into a Valid MAC Address"""
mmap = list(map(hex, list(mack) ))
cmac = mmap
for i in range(len(mmap)):
mmap[i] = re.sub("0x", "", mmap[i])
mmap[i] = mmap[i].zfill(2)
cmac = re.sub("\'|\,|\[|\]", "", str(mmap) )
return cmac
def convertOctectIp(self, hexip):
"""This Function will convert the OctectString into a valid Ip Address"""
ip = IpAddress( hexip ).prettyPrint()
return ip
def convertDecMac(self, mack):
"""This Function will convert the Decimal into HEX"""
mmap = list(map(hex, mack))
cmac = mmap
for i in range(len(mmap)):
mmap[i] = re.sub("0x", "", mmap[i])
mmap[i] = mmap[i].zfill(2)
cmac = re.sub("\'|\,|\[|\]", "", str(mmap) )
return cmac
def get_pagp_ports( self, pagp_ifIndex ):
"""get_pagp_ports will return a list of ifIndex ID's, that is associated with
the EtherChannel ifIndex. Will return a tuple of ifIndex ID's"""
pagp_group = walk( self.switch, self.community, pagpTable["pagpGroupIfIndex"] )
ifIndexList = []
for line in pagp_group:
if pagp_ifIndex == int(line[0][1]):
ifIndexList.append(int(line[0][0][-1]))
return tuple(ifIndexList)
def retrieve_communities( self ):
""" This function does exactly what it is defined as. It will return a list
of Community Strings from the entLogicalCommunity OID Table.
As well as grab the associated vlan ID. Then return both in a tuple"""
commTable = walk( self.switch, self.community, oTable["entLogicalCommunity"] )
if verbose: print(commTable)
lcomm = []
lvlan = []
for comm in commTable:
if verbose: print(ctime(), " Looping Through CommTable")
vlan = int(comm[0][0][-1])
comm = str(comm[0][1])
if len(lcomm) == 0:
lcomm.append(comm)
lvlan.append(vlan)
elif len(lcomm) >= 1:
if lcomm[:len(comm[-1])].__contains__(comm):
if verbose: print("Duplicate Community String")
continue
else:
lcomm.append(comm)
lvlan.append(vlan)
return( lcomm, lvlan )
def find_port_match( self, pname ):
""" By passing this function the Port Name and The Brand of
this Switch, you will in return get the ifIndex and the ifName"""
if verbose: print(ctime(), " In generic_pname Function")
self.ifIndex = None
self.ifName = None
count = 0
ifNameTable = walk( self.switch, community, oTable["ifName"] )
pfield = re.compile("([0-9]{1,2})\\/([0-9]{1,2})") # "1/9"
gen_pname = []
new_pname = ""
if ( pfield.search(pname) ):
gen_pname.append(pfield.match(pname).group(1))
gen_pname.append(pfield.match(pname).group(2))
if ( self.sbrand == "PROCURVE" ):
new_pname = hpTable[str(gen_pname[0])]+str(gen_pname[1])
count += 1
if verbose: print(new_pname)
if ( self.sbrand == "HP" ):
new_pname = hpTable[str(gen_pname[0])]+str(gen_pname[1])
count += 1
if verbose: print(new_pname)
if ( self.sbrand == "Nortel" ):
new_pname = "Slot: %s Port: %s" % ( str(gen_pname[0]), str(gen_pname[1]) )
count += 1
if verbose: print(new_pname)
if (self. sbrand == "Foundry" ):
new_pname = "FastEthernet%s" % ( str(gen_pname[1]) )
count += 1
if verbose: print(new_pname)
for iface in ifNameTable:
ifIndex = int(iface[0][0][-1])
ifName = str(iface[0][1])
if new_pname:
if ( re.search(new_pname, ifName) ):
count +=1
self.ifIndex = ifIndex
self.ifName = ifName
if verbose: print("Found %s on %s and the ifIndex is %d" % ( new_pname, self.ifName, self.ifIndex ))
break
if ( re.search("[A-Z]+"+pname+"|"+pname, ifName) ):
count +=1
self.ifIndex = ifIndex
self.ifName = ifName
if verbose: print("Found %s on %s and the ifIndex is %d" % ( pname, ifName, ifIndex ))
break
def get_ifIndex(self):
return( self.ifIndex )
def get_ifName(self):
return( self.ifName )
def find_mac(self, nmac, nip=None):
self.mac = nmac
snmperror, switchtype = get( self.switch, self.community, oTable["sysDescr"], 2)
mTable = ifIndex = None
if ( re.search("Cisco|PROCURVE|HP", self.sbrand, re.IGNORECASE) ):
lcomm, lvlan = self.retrieve_communities( )
if verbose: print(ctime(), "Retrieving Community Strings\n %s" % ( lcomm ))
for i in range(len(lcomm)):
mTable, ifIndex = self.find_mac_or_ip( self.mac, nip, lcomm[i], lvlan[i] )
if len(mTable) >= 1:
break
elif ( re.search("Nortel|ERS|Foundry", self.sbrand, re.IGNORECASE) ):
mTable, ifIndex = self.find_mac_or_ip( self.mac, nip, self.community, None )
return mTable, ifIndex
def find_mac_or_ip( self, nmac, nip, comm, vlanID ):
if verbose: print(ctime(), " In generic 'find_mac_or_ip' Function ")
if verbose: print(nmac, nip, self.switch, comm)
macVlanTable = walk( self.switch, comm, oTable["dot1dTpFdbPort"] )
if verbose: print(macVlanTable)
mTable = {}
count = 0
ifIndex = None
if ( len(macVlanTable) > 0 ):
if verbose: print(ctime(), " First If Statement in the find_mac_or_ip Function ")
for macVlan in macVlanTable:
cmac = self.convertDecMac(list(macVlan[0][0][-6:]))
if re.search(nmac, cmac, re.IGNORECASE):
if verbose: print("MAC Addresses Match %s and %s" % ( nmac, cmac ))
count += 1
bIndex = int(macVlan[0][1])
snmperror, ifIndex = get( self.switch, comm, oTable["dot1dBasePortIfIndex"], 2, bIndex )
if snmperror:
print(snmperror)
if __name__ == '__main__':
sys.exit(1)
else:
return
ifIndex = int(ifIndex)
ifSpeed = self.speedH[ifIndex]
try:
ifDuplex = self.duplexH[ifIndex]
except:
ifDuplex = None
port = self.portNameH[ifIndex]
vlan = ""
hname = None
if vlanID:
try:
snmperror, vlan = get( self.switch, comm, oTable["entLogicalDescr"], 2, vlanID )
except:
vlan = None
if not nip:
ipAddr = str(self.findIpByMac( nmac ))
else:
ipAddr = nip
if re.search("(\d{1,3}\.){3}\d{1,3}", str(ipAddr) ):
try:
hname = gethostbyaddr(ipAddr)
hname = hname[0]
except:
hname = "Pointer Record Not set for %s" % ( ipAddr )
snmperror, sysName = get( ipAddr, community, oTable["sysName"], 2 )
if snmperror: sysName = snmperror
snmperror, sysDescr = get( ipAddr, community, oTable["sysDescr"], 2 )
if snmperror: sysDescr = snmperror
mTable[nmac] = {
"ifDescr" : port,
"ifSpeed" : ifSpeed,
"ifDuplex" : ifDuplex,
"vlan" : vlan,
"sysName" : sysName,
"sysDescr" : sysDescr,
"nmac" : nmac,
"ipAddr" : ipAddr,
"hostName" : hname
}
count += 1
else:
sysName = "None"
sysDescr = "None"
mTable[nmac] = {
"ifDescr" : port,
"ifSpeed" : ifSpeed,
"ifDuplex" : ifDuplex,
"vlan" : vlan,
"sysName" : "No SNMP Access",
"sysDescr" : "No SNMP Access",
"nmac" : nmac,
"ipAddr" : ipAddr,
"hostName" : hname
}
count += 1
if verbose: print(ctime(), " Done ")
break
return mTable, ifIndex
def return_mac_by_ifIndex( self, comm, vlanID ):
""" This function will return a list of dictionairies by the port Index
So lets say you pass this function ifIndex 10, return_mac_by_ifIndex
will return any MAC Addresses Associated with that ifIndex."""
mTable = {}
sTable = {}
switch = []
mcount = 0
vIfIndexTable = walk( self.switch, comm, oTable["dot1dBasePortIfIndex"] )
if ( len(vIfIndexTable) > 0 ):
for v in vIfIndexTable:
vIndex = int(v[0][1])
bIndex = int(v[0][0][-1])
if verbose: print("Now Trying to Match the %d ifIndex to the bridge %d ifIndex table" % ( self.ifIndex, vIndex ))
if ( self.ifIndex == vIndex ):
if verbose: print("Match %d ifIndex to %d BridgeifIndex Table" % ( self.ifIndex, vIndex ))
dM = walk( self.switch, comm, oTable["dot1dTpFdbPort"] )
for d in dM:
decmac = int(d[0][1])
if verbose: print("Now Trying to Match the %d bIndex to the decimal bridge %d Index table" % ( bIndex, decmac ))
if vlanID:
snmperror, vlan = get( self.switch, comm, oTable["entLogicalDescr"], 2, vlanID )
else:
vlan = None
if ( bIndex == decmac ):
mcount += 1
indexListOid = list(d[0][0][-6:])
nmac = self.convertDecMac( indexListOid )
if verbose: print("Found %s Decimal Mac and Coverted into a HEX MAC %s " % ( indexListOid, nmac ))
ipAddr = str( self.findIpByMac( nmac ) )
hname = None
try:
ifSpeed = self.speedH[self.ifIndex]
except:
ifSpeed = None
try:
ifDuplex = self.duplexH[self.ifIndex]
except:
ifDuplex = None
port = self.portNameH[self.ifIndex]
if re.search("(\d{1,3}\.){3}\d{1,3}", ipAddr ):
if re.search("127.0.0.\d+", ipAddr ):
pass
else:
try:
hname = gethostbyaddr(ipAddr)
hname = hname[0]
except:
hname = "Pointer Record Not set for %s" % ( ipAddr )
snmperror, sysName = get( ipAddr, community, oTable["sysName"], 2 )
if snmperror: sysName = snmperror
snmperror, sysDescr = get( ipAddr, community, oTable["sysDescr"], 2 )
sysName = str(sysName)
if snmperror: sysDescr = snmperror
sysDescr = str(sysDescr)
snmperror, sysModel = get( ipAddr, community, oTable["entPhysicalModelName"], 2 )
if re.search("Cisco|PROCURVE|Nortel|Foundry", sysDescr, re.IGNORECASE):
switch.append(ipAddr)
switch.append(sysName)
switch.append(sysDescr)
switch.append(sysModel)
switch.append(vlan)
switch.append(self.speedH[self.ifIndex])
switch.append(self.portNameH[self.ifIndex])
else:
mTable[nmac] = {
"ifDescr" : port,
"ifSpeed" : ifSpeed,
"ifDuplex" : ifDuplex,
"vlan" : vlan,
"sysName" : sysName,
"sysDescr" : sysDescr,
"nmac" : nmac,
"ipAddr" : ipAddr,
"hostName" : hname
}
else:
mTable[nmac] = {
"ifDescr" : port,
"ifSpeed" : ifSpeed,
"ifDuplex" : ifDuplex,
"vlan" : vlan,
"sysName" : "No SNMP Access",
"sysDescr" : "No SNMP Access",
"nmac" : nmac,
"ipAddr" : ipAddr,
"hostName" : hname
}
return mTable
def switch_report( self, comm, vlanID, conn_output ):
"""switch_report will write to stdout and to a .csv file
everytime it finds a mac attached to a specific Interface"""
vlan = ""
output = []
count = 0
if vlanID:
snmperror, vlan = get( self.switch, comm, oTable["entLogicalDescr"], 2, vlanID )
else:
vlan = None
bridgeTable = walk( self.switch, comm, oTable['dot1dBasePort'] )
decMacTable = walk( self.switch, comm, oTable['dot1dTpFdbPort'] )
if verbose: print(decMacTable)
if ( len(decMacTable) > 0 ):
for bindex in bridgeTable:
bIndex = int(bindex[0][1])
snmperror, ifIndex = get( self.switch, comm, oTable['dot1dBasePortIfIndex'], 2, int(bIndex) )
ifIndex = int(ifIndex)
for dm in decMacTable:
if dm[0][1] == bIndex:
ifDuplex = ""
ifSpeed = self.speedH[ifIndex]
try:
ifDuplex = self.duplexH[ifIndex]
except:
ifDuplex = "Not Known"
port = self.portNameH[ifIndex]
ifOperStatus = self.operH[ifIndex]
ifAdminStatus = self.adminH[ifIndex]
alias = self.aliasH[ifIndex]
decMac = list(dm[0][0][-6:])
nmac = self.convertDecMac( decMac )
ipAddr = str( self.findIpByMac( nmac ) )
hname = None
if re.search("(\d{1,3}\.){3}\d{1,3}", ipAddr ):
if re.search("127.0.0.\d+", ipAddr ):
pass
else:
try:
hname = gethostbyaddr(ipAddr)
hname = hname[0]
except:
hname = "Pointer Record Not set for %s" % ( ipAddr )
output = "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n" % ( port,nmac,ipAddr,hname,vlan,ifAdminStatus,ifOperStatus,ifDuplex,ifSpeed,alias )
count += 1
print(output)
conn_output.write(output)
conn_output.flush()
else:
continue
return count
def get_cdp_neighbor_ip(self, ifIndex):
""" This function will grab the cdp neigbor that is attached
to the ifIndex that is passed to this function
"""
self.ifIndex = ifIndex
ctable = walk( self.switch, self.community, cdpTable["cdpCacheAddress"])
count = 0
self.ip = None
for host in ctable:
match = tuple(host[0])
if verbose:
print(self.ifIndex, match[0][-2], self.convertOctectIp(str(host[0][1])))
if ( self.ifIndex == match[0][-2] ):
if verbose:
print("found ifIndex %s and here is the ip address of the cdpneighbor %s" \
% (str(self.ifIndex), self.convertOctectIp(str(host[0][1]))))
self.ip = self.convertOctectIp(str(host[0][1]))
return self.ip
def get_cdp_neighbor_ip_table(self):
""" This function will grab the cdp neigbor table and return a list of neighbors"""
ctable = walk( self.switch, self.community, cdpTable["cdpCacheAddress"])
self.ipTable = []
for host in ctable:
self.ipTable.append(self.convertOctectIp(str(host[0][1])))
if verbose:
print(self.convertOctectIp(str(host[0][1])))
self.ipTable = tuple(self.ipTable)
return self.ipTable
def get_mac_from_cdp_neighbor( self, cswitch, nmac, nip ):
self.cswitch = cswitch
self.nmac = nmac
self.nip = nip
snmperror, switchtype = get( self.cswitch, self.community, oTable["sysDescr"], 2)
if snmperror:
print("Wrong Community String %s for device %s" % ( self.community, self.cswitch ))
if __name__ == '__main__':
sys.exit(1)
else:
return
get_nmac = followSwitch( self.cswitch, self.community )
get_nmac.set_duplex()
get_nmac.set_speed()
get_nmac.set_port_name()
if self.nip == None:
get_nmac.set_phys_addr()
cTable, self.new_ifIndex = get_nmac.find_mac( self.nmac, self.nip )
count = 0
if cTable:
count += 1
print("Found %s on %s\n" % ( self.nmac, self.cswitch ))
for key, val in list(cTable.items()):
print("Switch Connected to %s" % ( self.cswitch ))
print("SwitchPort = %s\nSwitchPortSpeed = %s\nSwitchPortDuplex = %s\nSwitchVlan = %s" % \
( val["ifDescr"], val["ifSpeed"], val["ifDuplex"], val["vlan"] ))
print("SnmpHostName = %s\nSnmpHostDescr = %s\nHostMAC = %s\nHostIP = %s\nHostName = %s\n" % \