-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·188 lines (157 loc) · 5.58 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# TODO: build in more resilience, comm_based host
from utilities import pi
from utilities.comms import Server
from utilities.utils import animate_and_save
from utilities.collision import check_surroundings, check_collision
from socket import gethostbyname
from atexit import register
from time import process_time
from threading import Thread
from _thread import interrupt_main
from rplidar import RPLidar
from math import floor
# Parameters
collision_space = 20 # Range of angles to check for obstacles in front of car
collision_threshold = 5 # Minimum distance for the code to consider as obstacle.
max_speed = 100
scan_error_tolerance = 10
scan_error_point = 0
battery_dead_threshold = 11.3 # 7.4
battery_low_threshold = 11.5
trash_lvl_threshold = 1
spin_intensity = 4 # Divides max_speed by this to spin robot in setCourse()
save = True
scan_save_dir = "/home/gg/PycharmProjects/SojiSim/saves/scans_test" # Last word is filename
arm_tolerance = 5
async_life = 1
# Program variables
collision_bounds = (270 - (collision_space // 2), 270 + (collision_space // 2))
scans = []
wait_id = {}
arm_position = []
scan = [0]*360
serv = Server(gethostbyname("soji.local"), 9160)
print("Connecting...")
conn, addr = serv.connect()
print("Connected to", addr)
# Control Systems
motors = pi.Drive(21, 20, 16, 12) # Motor A = Left, Motor B = Right, P1 = Direction, P2 = Speed. Change to BCM numbering.
battery = pi.Battery()
motors.brake()
lidar = RPLidar('/dev/ttyUSB0', timeout=3)
arm = pi.Arm(8)
trash_lvl = 0
# TODO: Insert Encoders, Insert Comms, Insert Arm, Insert AI
# On testing controller, here goes camera
# Class instantiations.
def wait(time, do_something=lambda x=5: None, args=None):
# Wait for a certain amount of time.
# You can set multiple waits, as long as they have different times.
try:
time_stamp = wait_id[str(time)]
except KeyError:
time_stamp = wait_id[str(time)] = process_time()
if process_time() - time_stamp >= time:
wait_id.pop(str(time))
do_something(args) if args else do_something()
del time_stamp
return 1
else:
return 0
def set_course(lidar_data):
if check_collision(lidar_data, collision_bounds, collision_threshold): # Check for obstacles ahead
# If there is an obstacle...
direction, angles = check_surroundings(lidar_data, collision_threshold) # Find out the clearest side
if direction == "Left":
motors.setLeftSpeed(max_speed/spin_intensity)
motors.setRightSpeed(max_speed)
else:
motors.setLeftSpeed(max_speed)
motors.setRightSpeed(max_speed/spin_intensity)
return direction, angles # Return the direction and the angles of the obstacle for future optional odometry.
else:
motors.setLeftSpeed(max_speed)
motors.setRightSpeed(max_speed)
return "Forward", collision_bounds
def exit_handle():
global async_life
motors.exit()
lidar.stop()
lidar.stop_motor()
lidar.disconnect()
async_life = 0
async_thread.join()
if save:
animate_and_save(scans, filename=scan_save_dir, bounds=collision_bounds)
print("Clean exit. Robot stopped.")
def async_ops(check_period=30):
from time import sleep
trash_lvl_check = 0
while async_life:
current_trash_lvl = trash_lvl
bat_lvl = battery.read()
if current_trash_lvl<=trash_lvl_threshold and trash_lvl_check >= 10:
print("Trash LVL Threshold Reached.")
break
else:
trash_lvl_check += 1
if bat_lvl <= battery_dead_threshold:
break
elif bat_lvl <= battery_low_threshold:
print("WARNING: BATTERY LOW")
sleep(check_period)
interrupt_main()
def grab():
c2c = serv.rx(conn) # Get initial center-to-center distance.
while 80<=arm_position[0]<=100: # Phase 1: Car Alignment
if abs(c2c) < 25:
motors.setLeftSpeed(max_speed/spin_intensity)
motors.setRightSpeed(max_speed/(spin_intensity/2))
elif abs(c2c) > 25:
motors.setLeftSpeed(max_speed/(spin_intensity/2))
motors.setRightSpeed(max_speed/spin_intensity)
c2c = serv.rx(conn)
serv.tx("received", conn)
motors.brake()
while 80<=arm_position[1]<=100:
motors.setLeftSpeed(max_speed/spin_intensity)
motors.setRightSpeed(max_speed/spin_intensity)
arm.grab_item()
serv.rx(conn)
serv.tx("grabbed", conn)
global trash_lvl
trash_lvl += 1
motors.brake()
# Preloop preparations.
register(exit_handle) # Register exit handler
async_thread = Thread(target=async_ops)
async_thread.start()
lidar.clean_input()
scan = [0]*360
# Main Loop
for scan in lidar.iter_scans():
try:
for _, distance, angle in scan:
scan[min(359, int(angle))] = distance/10
# Data Fetching
position, center = serv.rx(conn)
if center:
serv.tx("received", conn)
motors.brake()
grab()
else:
arm.move(position)
serv.tx(scan, conn) # Here, you can send any relevant cart data. [scans, battery, trash, etc.]
# Data Processing
if scan.count(scan_error_point) < scan_error_tolerance: # Check for bad scans (how many 0-scans are there)
scans.append(scan)
else: # If there is a bad scan, skip the rest of the loop
continue
# Robot Control
direction, angles = set_course(scan)
# Cleanup
wait(1, print, process_time())
except KeyboardInterrupt as e:
print(f"{e}. Exiting now.")
break
exit_handle()