forked from YMBo/sflow_traffic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
getTraffic.py
185 lines (165 loc) · 5.33 KB
/
getTraffic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# !/usr/local/bin/python
# -*- coding: UTF-8 -*-
'''统计跨机房服务调用'''
import addpath
import pcap
from dpkt.ethernet import Ethernet
from dpkt.ip import IP as dpktIP
import dpkt
import sys
from log import logger
from IPy import IP
from conf import TIME, NAME
from datetime import datetime, timedelta
from intervalTime import Timer
from formart_server.f_s import formartS
from getDefaultIp.getDefaultIp import hostip
ETH_TYPE_ERSPAN1 = 0x88be
# 列出所有网络接口
# pcap.findalldevs()
# name接口名,
# promisc为真代表打开混杂模式,
# immediate代表立即模式,启用将不缓存数据包
# timeout_ms代表接收数据包的超时时间
# pcap.pcap对象pc是个动态数据,通常结合for循环或是while循环不断读取数据包,数据包会返回时间戳及报文数据.
# data = pcap.pcap(name='en0', promisc=True, immediate=True)
# setfilter用来设置数据包过滤器,比如只想抓http的包,那就通过setfilter(tcp port 80)实现
# 记录程序执行次数
num = 0
error = False
#所有记录
allRecord = []
# 数量
count = []
# 总次数
totalN = 0
def findin(arr, obj, obj1):
if obj1 in arr:
index = arr.index(obj1)
elif obj in arr:
index = arr.index(obj)
else:
count.append(0)
index = len(count) - 1
arr.append(obj)
count[index] += 1
# 解析sflow报文的数据,要配合sflowtool工具
def parseSflow(Ethernet_pack):
if type(Ethernet_pack.data) == dpktIP and type(
Ethernet_pack.data.data) == dpkt.udp.UDP:
# 解包,获得netFlowv5报文
ip = Ethernet_pack.data
udp = ip.data
netflowData = dpkt.netflow.Netflow5(udp.data)
data = netflowData.data
allData = data[0]
srcIp = str(IP(allData.src_addr))
dstIp = str(IP(allData.dst_addr))
sport = allData.src_port
dport = allData.dst_port
if (sport == 22 or dport == 22):
return
# 源服务
# sServer = get_s(srcIp, sport)
# # 目的服务
# dServer = get_s(dstIp, dport)
# 插入数据库
tags = {
'srcIp': srcIp,
'dstIp': dstIp,
'sport': sport,
'dport': dport,
}
tags2 = {
'srcIp': tags['dstIp'],
'dstIp': tags['srcIp'],
'sport': tags['dport'],
'dport': tags['sport'],
}
findin(allRecord, tags, tags2)
# 解析经过GRE封装过的报文,含有方向,就是通过GRE封装了一层ip和port,要获取最里面的两层数据
def parseTCP(Ethernet_pack):
# 判断是否为GRE protocol==47
if type(Ethernet_pack.data) == dpktIP and Ethernet_pack.data.p == 47:
# 协议类型 25944,35006
# IP层
greContent = Ethernet_pack.ip.gre.ethernet.ip
srcIp = '%d.%d.%d.%d' % tuple(map(ord, list(greContent.src)))
dstIp = '%d.%d.%d.%d' % tuple(map(ord, list(greContent.dst)))
sport = greContent.data.sport
dport = greContent.data.dport
if (sport == 22 or dport == 22):
return
tags = {
'srcIp': srcIp,
'dstIp': dstIp,
'sport': sport,
'dport': dport,
}
tags2 = {
'srcIp': tags['dstIp'],
'dstIp': tags['srcIp'],
'sport': tags['dport'],
'dport': tags['sport'],
}
findin(allRecord, tags, tags2)
# 开始抓包
def getIp():
global error, totalN
# 取默认网卡
# name = pcap.findalldevs()
try:
dataPack = pcap.pcap(name=NAME, promisc=True, immediate=True)
# dataPack.setfilter('udp port 9991')
# dataPack.setfilter('tcp')
logger.info('连接网卡->%s,开始抓包', NAME)
except Exception as e:
logger.error('连接网卡->%s失败,强制退出,错误信息->%s', NAME, e)
error = True
sys.exit(1)
else:
for ptime, pdata in dataPack:
totalN += 1
# 解包,获得数据链路层包
Ethernet_pack = Ethernet(pdata)
# 扩展dpkt解析ERSPAN数据
Ethernet.set_type(ETH_TYPE_ERSPAN1, Ethernet)
try:
parseTCP(Ethernet_pack)
# dataBase.insert(tags, fields)
except Exception as e:
pass
dataPack.close()
# 清空结果
def clearResult():
global allRecord, count, num, totalN
num += 1
logger.info('----------------第%s次-数据条数%s-------抓包总条数%s----------------',
num, len(allRecord), totalN)
formartS(allRecord, count)
allRecord = []
count = []
totalN = 0
# 定时器
# def setInterval(fun, time=TIME):
# if error:
# logger.error('连接网卡失败,强制退出')
# sys.exit(1)
# timer = threading.Timer(time, setInterval, (fun, time))
# fun()
# timer.start()
def main():
if not hostip:
return
# 保证清表在三台机器数据填充前完成,清表在望京机房提前5分钟完成
time = TIME
if '10.136' in hostip:
cday = datetime.strptime('2019-3-3 ' + time, '%Y-%m-%d %H:%M:%S')
cday = cday - timedelta(minutes=5)
time = cday.strftime('%H:%M:%S')
logger.info('设置时间:%s', time)
t = Timer(clearResult, time)
t.start()
getIp()
if __name__ == "__main__":
main()