This repository has been archived by the owner on Dec 17, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
websocket_poc_client.py
187 lines (165 loc) · 6.58 KB
/
websocket_poc_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Author:
Marcus Voß (m.voss@laposte.net)
Description:
POC websocket client that automatically reconnects and should prevent message loss.
"""
from ubxReceiver.ubx_receiver import UBX_receiver, UBX_message, NMEA_message
from collections import deque
import asyncio
import websockets
import itertools
import time
import json
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)
logging.getLogger('websockets').setLevel(logging.WARNING)
DATA = deque() #message cache that is filled if there is no websocket connection
SERVER = None #the current websocket server instance
UUID = "test-client"
SERIAL_PORT = "COM5"
BAUDRATE = 115200
USERNAME = "MVO"
PASSWORD = "e5W7Avnr6NgUiZ"
WEBSOCKET_PORT = "5678"
WEBSOCKET_SERVER = '127.0.0.1' # "10.0.20.237"
WEBSOCKET_ADDRESS = f"ws://{USERNAME}:{PASSWORD}@{WEBSOCKET_SERVER}:{WEBSOCKET_PORT}"
msgID = itertools.count()
def handle_msg(msg):
"""prints out the received message"""
print(f"> {msg}")
async def gather_data():
'''gather data from the connected serial receivers (currently not async)'''
while True:
global msgID
logging.info("Attempting serial connection to UBX-GNSS receiver")
try:
receiver = UBX_receiver(SERIAL_PORT, BAUDRATE)
except Exception as err:
logging.error(f"serial connection not successfull! {err}")
await asyncio.sleep(5)
continue
try:
logging.info("Starting to listen for UBX packets")
receiver.ubx_config_disable_all()
receiver.ubx_config_enable(
"GGA_UART1", "RAWX_UART1", "SFRBX_UART1")
while True:
try:
msg = receiver.parse()
except Exception as err:
logging.error(f"Serial connection Error: {err}")
break
try:
if (isinstance(msg, str)):
logging.error(f"received error: {msg}")
elif (isinstance(msg, UBX_message)):
constructed_message = {
"msgID": next(msgID),
"type": "GNSS",
"protocol": "UBX",
"timestamp": time.time(),
"class": msg.cl,
"id": msg.id,
"payload": list(msg.payload)
# "raw": list(msg.raw_data)
}
DATA.append(constructed_message)
elif (isinstance(msg, NMEA_message)):
constructed_message = {
"msgID": next(msgID),
"type": "GNSS",
"protocol": "NMEA",
"timestamp": time.time(),
"talker": msg.talker_id+msg.msg_type,
"data": msg.data
# "raw": msg.raw_data
}
DATA.append(constructed_message)
except (ValueError, IOError) as err:
logging.error(err)
await asyncio.sleep(0.001)
finally:
del receiver # clean up serial connection
async def gather_placeholder_data():
'''function for generating placeholder Data without a connected serial receiver'''
counter = 0
logging.debug("starting task: gather_data")
while True:
counter += 1
now = str(counter)
DATA.append(now)
await asyncio.sleep(1)
async def report_queque_length():
last_len = 0
while True:
queque_length = len(DATA)
if queque_length != last_len:
logging.info(f"queque length: {queque_length}")
last_len = queque_length
else:
await asyncio.sleep(1)
async def listen_forever():
"""
Basic Loop that listens for incoming websocket messages and reconnects in case of an error.
General design based on https://github.com/aaugustin/websockets/issues/414
"""
global SERVER
logging.debug("starting task: listen_forever")
while True:
# outer loop restarted every time the connection fails
SERVER = None
logging.info("Connecting to websocket server...")
connection = websockets.connect(
WEBSOCKET_ADDRESS, extra_headers=[("uuid", UUID)])
try:
SERVER = await asyncio.wait_for(connection, 5)
except (OSError, websockets.exceptions.InvalidMessage) as err:
logging.error(f"websocket connection unsuccessful: {err}")
await asyncio.sleep(10)
continue
except websockets.exceptions.InvalidStatusCode as err:
logging.error(f"websocket connection refused: {err}")
await asyncio.sleep(10)
continue
except asyncio.TimeoutError as err:
logging.error(f"websocket connection timeout: {err}")
await asyncio.sleep(10)
continue
except Exception as err:
logging.error(
f"websocket connection failed for unknown reason: {err}")
await asyncio.sleep(10)
continue
logging.info("websocket connected!")
while True:
# listener loop
if len(DATA) > 0 and SERVER != None:
try:
entry = DATA[0]
msg_ID = entry.get("msgID")
msg = json.dumps(entry) # get leftmost item
logging.debug(f"> {msg}")
await SERVER.send(msg)
answ = await SERVER.recv()
logging.info(f"< {answ}")
# check if answer contains correct msg_ID to make sure the message was received
if answ == f"OK {msg_ID}":
DATA.popleft()
else:
logging.warning(
f"received wrong ack msgID: {answ} != OK {msg_ID}")
except websockets.exceptions.ConnectionClosed:
logging.error("websocket connection lost")
await asyncio.sleep(1)
break # inner loop
else:
await asyncio.sleep(0.001)
if __name__ == "__main__":
asyncio.get_event_loop().create_task(gather_data())
asyncio.get_event_loop().create_task(report_queque_length())
asyncio.get_event_loop().create_task(listen_forever())
asyncio.get_event_loop().run_forever()