forked from EVNotify/EVNotiPi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhuaweimon_ser.py
executable file
·89 lines (77 loc) · 3.19 KB
/
huaweimon_ser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
import os
import requests
import re
from serial import Serial
import time
import datetime
import pyrfc3339
from influxdb_client import InfluxDBClient, WriteOptions
net_re = re.compile(r"wwan0:" +
r"\s+(?P<bytes_rx>\d+)" +
r"\s+(?P<packets_rx>\d+)" +
r"\s+(?P<errs_rx>\d+)" +
r"\s+(?P<drop_rx>\d+)" +
r"\s+(?P<fifo_rx>\d+)" +
r"\s+(?P<frame_rx>\d+)" +
r"\s+(?P<compressed_rx>\d+)" +
r"\s+(?P<multicast_rx>\d+)" +
r"\s+(?P<bytes_tx>\d+)" +
r"\s+(?P<packets_tx>\d+)" +
r"\s+(?P<errs_tx>\d+)" +
r"\s+(?P<drop_tx>\d+)" +
r"\s+(?P<fifo_tx>\d+)" +
r"\s+(?P<colls_tx>\d+)" +
r"\s+(?P<carrier_tx>\d+)" +
r"\s+(?P<compressed_tx>\d+)")
# load config
if os.path.exists('config.json'):
import json
with open('config.json', encoding='utf-8') as config_file:
C = json.loads(config_file.read())
elif os.path.exists('config.yaml'):
import yaml
with open('config.yaml', encoding='utf-8') as config_file:
C = None
for c in yaml.load_all(config_file, Loader=yaml.SafeLoader):
C = c
else:
raise Exception('No config found')
Influx = InfluxDBClient(url=C['influxdb']['url'],
org=C['influxdb']['org'],
token=C['influxdb']['token'],
enable_gzip=True)
opts = WriteOptions(batch_size=10000,
flush_interval=60000,
jitter_interval=5000)
write_api = Influx.write_api(write_options=opts)
##############################################################################
if __name__ == "__main__":
ser = Serial("/dev/ttyUSB0")
while True:
line = ser.readline()
if line[:6] == b'^RSSI:':
now = time.time()
data = {
'Strength': int(line[6:]),
}
with open('/proc/net/dev', 'r') as netdev:
for line in netdev:
m = net_re.search(line)
if m is not None:
for key in ['bytes_rx', 'packets_rx', 'errs_rx', 'drop_rx',
'fifo_rx', 'frame_rx', 'multicast_rx',
'bytes_tx', 'packets_tx', 'errs_tx', 'drop_tx',
'fifo_tx', 'colls_tx', 'carrier_tx']:
data[key] = int(m[key])
break
write_api.write(bucket=C['influxdb']['bucket'],
org=C['influxdb']['org'],
record=[{
'measurement': 'mobile_net',
'tags': {'akey': C['evnotify']['akey'], 'car': C['car']['type']},
'fields': data,
'time': pyrfc3339.generate(
datetime.datetime.fromtimestamp(now, datetime.timezone.utc)),
}])
#########################