-
Notifications
You must be signed in to change notification settings - Fork 7
/
run_batch.py
163 lines (140 loc) · 5.53 KB
/
run_batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Range of data to run for batch run
x: [-20, 30]
y: [-2, 60]
theta: [0, 2*pi]
v: [3, 10]
Time horizon: {3.0, 4.0, 5.0, ..., 10.0}
Constant Goal: (x, y, radius)
6.0, 40.0, 2.0
Constant Obstacles: (x, y, radius)
9.0, 25.0, 4.5
20.0, 35.0, 3.0
6.5, 50.0, 3.0
-4.0, 33.0, 2.0
-5.0, 44.0, 2.0
2.5, 50.0, 2.0
Max runtime before termination: 150
"""
from bdb import effective
import numpy as np
import os
import logging
import time
import math
datestr = time.strftime("%Y-%m-%d")
if os.path.exists("result/batch-{}/".format(datestr)):
input("You are running a new batch onto an existing one, please append name or delete old run before running, or press Enter to continue running")
else:
os.makedirs("result/batch-{}/".format(datestr))
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)-20s '
'%(levelname)-8s %(message)s',
handlers=[
logging.FileHandler(
"result/batch-{}/batch_test.log".format(datestr)),
logging.StreamHandler()])
car_params = {
"wheelbase": 2.413,
"length": 4.267,
"width": 1.988
}
collision_r = math.sqrt((0.5 * (car_params["length"] - car_params["wheelbase"])) ** 2 + (0.5 * car_params["width"]) ** 2)
# Constant params
base_flag = " python3 run.py \
--no_players 1 \
--env_type goal_with_obs \
--eps_control 0.1 --eps_state 0.1 \
--alpha_scaling trust_region \
--trust_region_type ratio \
--batch_run \
--initial_margin 5.0 \
--plot \
--log \
--store_freq 1 \
--hallucinated \
"
obstacle_flag = " --obstacles \
9.0 25.0 4.5 \
20.0 35.0 3.0 \
6.5 50.0 3.0 \
-4.0 33.0 2.0 \
-5.0 44.0 2.0 \
2.5 50.0 2.0 \
"
goal_flag = " --goal 6.0 40.0 2.0"
# Dynamic params
no_of_runs = 100
v = np.random.uniform(3, 10, no_of_runs)
time_horizon = np.random.randint(3, 11, no_of_runs)
max_runtime = 150
run_type = "both"
if run_type not in ["time_consistent", "time_inconsistent", "both"]:
raise ValueError("Run type not found: {}".format(run_type))
obstacles = [float(i) for i in obstacle_flag.replace("--obstacles", "").split()]
goal = [float(i) for i in goal_flag.replace("--goal", "").split()]
goal_and_obs = obstacles + goal
def get_random_initial_pos():
x = np.random.uniform(-20, 30)
y = np.random.uniform(-2, 60)
return x, y
def format_angle(angle):
"""
Format angle to be within the range of [-pi,pi]
"""
while angle > np.pi or angle < -np.pi:
if angle > np.pi:
angle = angle - 2*np.pi
elif angle < -np.pi:
angle = angle + 2*np.pi
return angle
def run_time_consistent(cmd):
cmd = cmd + " --time_consistency --exp_name exp_time_consistent"
cmd = " ".join(cmd.split())
logging.info("Running test#{} with data: \n\r\t\n".format(i) + cmd.strip("\t"))
os.system(cmd)
def run_time_inconsistent(cmd):
cmd = cmd + " --exp_name exp_time_inconsistent"
cmd = " ".join(cmd.split())
logging.info("Running test#{} with data: \n\r\t\n".format(i) + cmd.strip("\t"))
os.system(cmd)
note = input("Do you want to note something about this run?: ")
if note != "":
with open("result/batch-{}/note.txt".format(datestr), 'w') as file:
file.write(note)
for i in range(no_of_runs):
found_good_initial_pos = False
# check if the sampled x, y are too close to obstacle or the goal
while not found_good_initial_pos:
x, y = get_random_initial_pos()
found_good_initial_pos = True
for j in range(int(len(goal_and_obs)/3.0)):
dx = x - goal_and_obs[j*3]
dy = y - goal_and_obs[j*3 + 1]
r = goal_and_obs[j*3 + 2]
relative_distance = math.sqrt(dx*dx + dy*dy)
if r - relative_distance + 3.0 * collision_r > 0:
# the initial position collides with an obstacle
found_good_initial_pos = False
break
# calculate the effective theta heading of the car to the goal
effective_theta = math.atan2(40.0 - y, 6.0 - x)
# sampling theta in the range [-pi/4 + effective_theta, pi/4 + effective theta]
theta = format_angle(np.random.uniform(-np.pi/4.0, np.pi/4.0) + effective_theta)
while abs(theta - format_angle(effective_theta)) < np.pi/60.0:
theta = format_angle(np.random.uniform(-np.pi/4.0, np.pi/4.0) + effective_theta)
init_state_flag = " --init_states {} {} {} 0.0 {}".format(x, y, theta, v[i])
time_horizon_flag = " --t_horizon {}".format(time_horizon[i])
max_runtime_flag = " --max_steps {}".format(max_runtime)
cmd = base_flag + obstacle_flag + goal_flag + init_state_flag + time_horizon_flag + max_runtime_flag
if run_type == "time_consistent":
run_time_consistent(cmd)
elif run_type == "time_inconsistent":
run_time_inconsistent(cmd)
elif run_type == "both":
run_time_consistent(cmd)
run_time_inconsistent(cmd)
else:
raise NotImplementedError