-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
195 lines (163 loc) · 7.04 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import pynanovna
import threading
import requests
from flask import Flask, render_template, jsonify, request
from signal_processing import SignalProcessing
from bt_car_control import BTSender
import time
import asyncio
import subprocess
DATA_FILE = False#"../save_plate.csv"
CALIBRATION_FILE = "../cal0514.cal"
VERBOSE = True
PROCESS_SLEEP_TIME = 0.0001
CAR_DEVICE_NAME = "BT05-BLE"
app = Flask(__name__)
latest_data = {"angle": 1, "throttle": 1, 's11': [[0, 0, 0]], 's21':[[0, 0, 0]]}
received_data = {}
log_messages = []
@app.route("/")
def index():
return render_template("index.html")
@app.route("/data")
def data():
return jsonify(latest_data)
@app.route("/update_data", methods=["POST"])
def update_data():
global latest_data
data = request.get_json()
latest_data = data
return jsonify({"status": "success"})
@app.route("/receive_data", methods=["POST"])
def receive_data():
global received_data
data = request.get_json()
received_data = data
return jsonify({"status": "success", "data": received_data})
@app.route("/logs")
def logs():
return jsonify(log_messages)
def run_flask():
app.run(host="0.0.0.0", port=5000, debug=False)
def log_message(message):
"""Store log message for client retrieval."""
log_messages.append({'message': message})
print(message)
# NanoVNA Setup
def setup_nanovna(verbose, calibration_file, data_file, process_sleep_time):
data_source = pynanovna.NanoVNAWorker(verbose=verbose)
data_source.calibrate(load_file=calibration_file) # This needs to be done through a terminal atm.
data_source.set_sweep(2.9e9, 3.1e9, 1, 101)
data_stream = data_source.stream_data(data_file)
signal_processing = SignalProcessing(
data_stream,
process_sleep_time=process_sleep_time,
verbose=verbose,
interactive_mode=False
)
return signal_processing, data_source
# Start Flask in a separate thread
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
# Instantiate the BTSender class
bt_sender = BTSender(device_name=CAR_DEVICE_NAME)
async def main_loop():
global received_data
global log_messages
while True:
signal_processing, vna = setup_nanovna(VERBOSE, CALIBRATION_FILE, DATA_FILE, PROCESS_SLEEP_TIME)
while not signal_processing.reference_setup_done:
# Clear logs.
log_messages.clear()
# Handle any received data and then reset it
await handle_received_data(received_data, bt_sender, signal_processing)
received_data.clear() # Reset received_data to an empty dictionary after handling
time.sleep(1) # Increase sleep time to 1 if running a pre recorded file.
await bt_sender.connect()
data_processor = signal_processing.process_data_continuously()
for angle, throttle, s11, s21 in data_processor:
send_data = {'angle': angle / 2, 'throttle': throttle, 's11': s11, 's21': s21}
global latest_data
latest_data = send_data.copy()
try:
response = requests.post("http://127.0.0.1:5000/update_data", json=send_data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
log_message(f"Error sending data to server: {e}")
# Send the data to the RC car
if bt_sender.is_connected():
await bt_sender.update_speed(angle, throttle)
# Clear logs.
log_messages.clear()
# Handle any received data and then reset it
await handle_received_data(received_data, bt_sender, signal_processing)
received_data.clear() # Reset received_data to an empty dictionary after handling
time.sleep(1) # Increase sleep time to 1 if running a pre recorded file.
# Kill vna.
vna.kill()
signal_processing = None
async def handle_received_data(received_data, bt_sender, signal_processing):
if received_data == {}:
return False
if received_data["button"] == "updateAngleButton":
try:
min_threshold = int(received_data["angleThresholdMin"])
max_threshold = int(received_data["angleThresholdMax"])
max_angle = int(received_data["angleMax"])
if min_threshold != 0:
bt_sender.angle_min_threshold = min_threshold*2
if max_threshold != 0:
bt_sender.angle_max_threshold = max_threshold*2
if max_angle != 0:
bt_sender.max_steering_angle = max_angle
log_message("Car angle thresholds have been updated.")
except ValueError:
log_message("Could not update the car angle thresholds, are they valid?")
elif received_data["button"] == "updateSpeedButton":
try:
min_speed = int(received_data["speedMin"])
max_speed = int(received_data["speedMax"])
if abs(min_speed) > 255 or abs(max_speed) > 255 or max_speed/min_speed <= 1:
raise ValueError('255 is max speed.')
if min_speed != 0:
bt_sender.min_motor_speed= min_speed
if max_speed != 0:
bt_sender.max_motor_speed = max_speed
log_message("Car speed thresholds have been updated.")
except ValueError:
log_message("Could not update the car speed thresholds, are they valid?")
elif received_data["button"] == "rebootButton":
log_message("Rebooting the system.")
subprocess.run(['sudo', 'reboot', 'now'], check=True)
elif received_data["button"] == "shutdownButton":
log_message("Shutting down the system. Goodbye.")
subprocess.run(['sudo', 'shutdown', 'now'], check=True)
elif received_data["button"] == "connectCar":
log_message("Trying to connect to the car.")
if not bt_sender.is_connected():
log_message("Car not connected!")
log_message("Trying to connect car...")
await bt_sender.connect()
elif received_data["button"] == "refMeasure0":
log_message("Running reference measure CLEAR.")
signal_processing.reference_step0()
log_message("Action performed.")
elif received_data["button"] == "refMeasureSetup":
log_message("Running reference setup.")
signal_processing.setup(False)
time.sleep(1)
elif "refMeasure" in received_data["button"]:
stepno = int(received_data["button"][-1])
log_message(f"Running reference measure step {stepno}.")
signal_processing.reference_step_n(stepno)
log_message("Action performed.")
elif received_data["button"] == "updateCalibrationFile":
global CALIBRATION_FILE
log_message(f"Changing calibration file. Old file: {CALIBRATION_FILE}")
CALIBRATION_FILE = received_data["calibrationFile"]
log_message(f"Calibration file changed. New file: {CALIBRATION_FILE}")
log_message("THIS FUNCTION HAS SOME MAJOR PROBLEMS, SYSTEM MAY BE BROKEN.")
time.sleep(1)
time.sleep(1)
# Run the main loop
asyncio.run(main_loop())