-
Notifications
You must be signed in to change notification settings - Fork 1
/
pcapng_file_parser.py
executable file
·149 lines (131 loc) · 6.66 KB
/
pcapng_file_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#
# @see PCAP Next Generation Dump File Format
# @see https://github.com/pcapng/pcapng
# @see https://www.tcpdump.org/linktypes/LINKTYPE_NORDIC_BLE.html
#
import copy
from datetime import datetime
import logging
import os
from os.path import exists
from SnifferAPI import Logger, Packet, Exceptions, SnifferCollector
from SnifferAPI.Types import *
import time
from SnifferAPI.Packet import all_tifs
import sys
from pcapng import FileScanner
from SnifferAPI.Packet import test_log
def parse_pcapng_file(file_type, file):
"""parse the saved pcapng file.
@see [python-pcapng wireshark 包解析](https://blog.csdn.net/weifengdq/article/details/117751828)
Args:
file_type:
0: Wireshark saved pcapng file
1: pcap file converted pcapng file
file: the saved sniffer pcapng file name with path
"""
#
# utilize the nRF sniffer code
#
packet_reader = Packet.PacketReader(pcapng_parser=True)
with open(file, 'rb') as fp:
scanner = FileScanner(fp)
block_ndx = 1
packet_ndx = 1
for block in scanner:
if block_ndx == 1: # SectionHeader 信息(cpu, os, wireshark version等)
# print(f'1st block: {block}')
block_ndx = 2
elif block_ndx == 2: # InterfaceDescription, 主要是接口的信息, 如以太网网卡信息等
# print(f'2nd block: {block}')
block_ndx = 3
else: # EnhancedPacket
# <EnhancedPacket interface_id=0 timestamp_high=377899 timestamp_low=1033621862 packet_payload_info=(
# 93, 93, b'\x11"3DUfH\xb0-\x13G*\x08\x00E\x00\x00O^e@\x00@\x11NN\xc0\xa8\x06B\xc0\xa8\x06X\xe2\x15
# \x0f\xa1\x00;\xbc\xd3\xaa\x00\x00\x00\x19\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
# \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
# \x00\x00\x00\x00\x00\x00\x00\x00') options=Options({})>
# <EnhancedPacket interface_id=0
# timestamp_high=386514 timestamp_low=3889417204 captured_len=43 packet_len=43
# packet_data=b'\x07$\x00\x03@\x10\x02\n\x01%.\x00\x00\x000\x82\x01\xd6\xbe\x89\x8e\x00\x11\xd8R\x04
# \x80\x18\x00\x02\x01\x06\x07\tPeriphs5\xec' options=Options({})> print(f'{packet_ndx:>6}: {
# block.timestamp_high}, {block.timestamp_low}, {block.timestamp}')
block_time = datetime.fromtimestamp(block.timestamp).strftime("%Y-%m-%d %H:%M:%S.%f")
# data = ' '.join(format(x, '02x') for x in block.packet_data)
#if test_log:
# test_log.write(f'{packet_ndx:}: {block_time} ({block.timestamp}), len: {block.packet_len}\n')
try:
packet_list = block.packet_data[1:]
packet = Packet.Packet(packet_list, is_parser=True, packet_reader=packet_reader,
file_type=file_type, packet_time_from_pcap=block.timestamp)
if packet.valid:
packet_reader.handlePacketCompatibility(packet)
if packet is None or not packet.valid:
raise Exceptions.InvalidPacketException("")
except Exceptions.InvalidPacketException:
pass
else:
if packet.id == EVENT_PACKET_DATA_PDU or packet.id == EVENT_PACKET_ADV_PDU:
pass
elif packet.id == EVENT_FOLLOW:
# This packet has no value for the user.
pass
elif packet.id == EVENT_CONNECT:
pass
elif packet.id == EVENT_DISCONNECT:
pass
elif packet.id == SWITCH_BAUD_RATE_RESP:
pass
elif packet.id == PING_RESP:
if hasattr(packet, 'version'):
versions = {1116: '3.1.0',
1115: '3.0.0',
1114: '2.0.0',
1113: '2.0.0-beta-3',
1112: '2.0.0-beta-1'}
fwversion = versions.get(packet.version, 'SVN rev: %d' % packet.version)
print(f'fw version: {fwversion}')
elif packet.id == RESP_VERSION:
pass
elif packet.id == RESP_TIMESTAMP:
"""
# Use current time as timestamp reference
packet_reader._last_time = time.time()
packet_reader._last_timestamp = packet.timestamp
lt = time.localtime(packet_reader._last_time)
usecs = int((packet_reader._last_time - int(packet_reader._last_time)) * 1_000_000)
logging.info(f'Firmware timestamp {packet_reader._last_timestamp} reference: '
f'{time.strftime("%b %d %Y %X", lt)}.{usecs} {time.strftime("%Z", lt)}')
"""
else:
logging.info("Unknown packet ID")
packet_reader.handlePacketHistory(packet) # Will save this packet as last packet
packet_ndx += 1
if __name__ == "__main__":
if len(sys.argv) != 3:
print(f'Please give the pcapng file type and name with full path.')
exit(1)
# file_name = r'C:\Users\Ycai3\Documents\Ellisys\Captures\fit_01.pcapng'
file_type = int(sys.argv[1])
if file_type != 0 and file_type != 1:
print(f'file type should be 0 (Wireshark saved pcapng file) or 1 (pcap converted pcapng file).')
exit(2)
file_name = sys.argv[2]
if not exists(file_name):
print(f'File "{file_name}" does not exist.')
exit(3)
parse_pcapng_file(file_type, file_name)
if len(all_tifs) > 0:
max_tifs = max(all_tifs)
min_tifs = min(all_tifs)
avg = sum(all_tifs) / len(all_tifs)
print(f'TIFS, total: {len(all_tifs)}, max: {max_tifs}, min: {min_tifs}, average: {avg:.1f}')
if max_tifs <= 152 and min_tifs >= 148:
print(" TIFS verification: PASS")
else:
print(" TIFS verification: FAIL")
exit(11)
else:
print("No TIFS captured.")
exit(10)
# print("Done!")