-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharris.py
executable file
·267 lines (230 loc) · 7.78 KB
/
arris.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#!/usr/bin/env python3
"""
=head1 NAME
arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85
and compatible cable modems
=head1 DESCRIPTION
Connect to the web-frontend and get current DOCSIS status of upstream and
downstream channels. (Signal Power, SNR, Lock Status)
=head1 REQUIREMENTS
- BeautifulSoup
- pycryptodome
=head1 CONFIGURATION
=head2 Example
[arris]
env.url http://192.168.100.1
env.username admin
env.password yourpassword
=head2 Parameters
url - URL to web-frontend
username - defaults to "admin"
password - valid password
=head1 REFERENCES
https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/
=head1 AUTHOR
Copyright (c) 2019 Daniel Hiepler <d-munin@coderdu.de>
Copyright (c) 2004-2009 Nicolas Stransky <Nico@stransky.cx>
Copyright (c) 2018 Lars Kruse <devel@sumpfralle.de>
=head1 LICENSE
Permission to use, copy, and modify this software with or without fee
is hereby granted, provided that this entire notice is included in
all source code copies of any software which is or includes a copy or
modification of this software.
THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR
IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
PURPOSE.
=head1 MAGIC MARKERS
#%# family=contrib
=cut
"""
import binascii
from bs4 import BeautifulSoup
from Crypto.Cipher import AES
import hashlib
import json
import re
import requests
import sys
import os
def login(session, url, username, password):
"""login to """
# get login page
r = session.get(f"{url}")
# parse HTML
h = BeautifulSoup(r.text, "lxml")
# get session id from javascript in head
current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1]
# encrypt password
salt = os.urandom(8)
iv = os.urandom(8)
key = hashlib.pbkdf2_hmac(
'sha256',
bytes(password.encode("ascii")),
salt,
iterations=1000,
dklen=128/8
)
secret = { "Password": password, "Nonce": current_session_id }
plaintext = bytes(json.dumps(secret).encode("ascii"))
associated_data = "loginPassword"
# initialize cipher
cipher = AES.new(key, AES.MODE_CCM, iv)
# set associated data
cipher.update(bytes(associated_data.encode("ascii")))
# encrypt plaintext
encrypt_data = cipher.encrypt(plaintext)
# append digest
encrypt_data += cipher.digest()
# return
login_data = {
'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"),
'Name': username,
'Salt': binascii.hexlify(salt).decode("ascii"),
'Iv': binascii.hexlify(iv).decode("ascii"),
'AuthData': associated_data
}
# login
r = session.put(
f"{url}/php/ajaxSet_Password.php",
headers={
"Content-Type": "application/json",
"csrfNonce": "undefined"
},
data=json.dumps(login_data)
)
# parse result
result = json.loads(r.text)
# success?
if result['p_status'] == "Fail":
print("login failure", file=sys.stderr)
exit(-1)
# remember CSRF nonce
csrf_nonce = result['nonce']
# prepare headers
session.headers.update({
"X-Requested-With": "XMLHttpRequest",
"csrfNonce": csrf_nonce,
"Origin": f"{url}/",
"Referer": f"{url}/"
})
# set credentials cookie
session.cookies.set(
"credential",
"eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiV"
"EcyNDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOj"
"AsICJ3aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2h"
"hbmdlZCI6IllFUyIgfQ=="
)
# set session
r = session.post(f"{url}/php/ajaxSet_Session.php")
def docsis_status(session):
"""get current DOCSIS status page, parse and return channel data"""
r = session.get(f"{url}/php/status_docsis_data.php")
# extract json from javascript
json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1]
json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1]
# parse json
downstream_data = json.loads(json_downstream_data)
upstream_data = json.loads(json_upstream_data)
# convert lock status to numeric values
for d in [ upstream_data, downstream_data ]:
for c in d:
if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked":
c['LockStatus'] = 1
else:
c['LockStatus'] = 0
return downstream_data, upstream_data
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# get config
url = os.getenv("url")
username = os.getenv("username")
password = os.getenv("password")
# validate config
if not url or not username or not password:
print("Set url, username and password first.", file=sys.stderr)
exit(1)
# create session
session = requests.Session()
# login with username and password
login(session, url, username, password)
# get DOCSIS status
downstream, upstream = docsis_status(session)
# prepare munin graph info
graph_descriptions = [
{
"name": "up_signal",
"title": "DOCSIS Upstream signal strength",
"vlabel": "dBmV",
"info": "DOCSIS upstream signal strength by channel",
"data": upstream,
"key": "PowerLevel"
},
{
"name": "up_lock",
"title": "DOCSIS Upstream lock",
"vlabel": "locked",
"info": "DOCSIS upstream channel lock status",
"data": upstream,
"key": "LockStatus"
},
{
"name": "down_signal",
"title": "DOCSIS Downstream signal strength",
"vlabel": "dBmV",
"info": "DOCSIS downstream signal strength by channel",
"data": downstream,
"key": "PowerLevel"
},
{
"name": "down_lock",
"title": "DOCSIS Downstream lock",
"vlabel": "locked",
"info": "DOCSIS downstream channel lock status",
"data": downstream,
"key": "LockStatus"
},
{
"name": "down_snr",
"title": "DOCSIS Downstream signal/noise ratio",
"vlabel": "dB",
"info": "SNR/MER",
"data": downstream,
"key": "SNRLevel"
}
]
# configure ?
if len(sys.argv) > 1 and "config" == sys.argv[1]:
# process all graphs
for g in graph_descriptions:
# graph config
print(
f"multigraph docsis_{g['name']}\n"
f"graph_title {g['title']}\n" \
f"graph_category network\n" \
f"graph_vlabel {g['vlabel']}\n" \
f"graph_info {g['info']}\n" \
f"graph_scale no\n"
)
# channels
for c in g['data']:
# only use channels with PowerLevel
if not c['PowerLevel']:
continue
print(
f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)\n"
f"channel_{c['ChannelID']}.info Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}"
)
# output values ?
else:
# process all graphs
for g in graph_descriptions:
print(f"multigraph docsis_{g['name']}")
# channels
for c in g['data']:
# only use channels with PowerLevel
if not c['PowerLevel']:
continue
print(f"channel_{c['ChannelID']}.value {c[g['key']]}")