-
Notifications
You must be signed in to change notification settings - Fork 0
/
stop_logic.py
302 lines (262 loc) · 17.8 KB
/
stop_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
from collections import deque
from concurrent.futures.process import _threads_wakeups
import math
from statistics import mean, stdev
from ray.tune import Stopper
from ray.rllib.env.apis.task_settable_env import TaskSettableEnv
#TODO: consider adding a condition where eval mean reward starts to drop, as long as its peak was above
#the acceptable threshold, and as long as the training mean reward is/was also above threshold
#TODO: add condition for eval mean reward below 0
#TODO: add condition where either training or eval mean reward continues to drop significantly below winning threshold
# Trial stopping decision
class StopLogic(Stopper):
"""Determines when to stop a trial, either for success or failure. It allows some rudimentary curriculum learning
by defining multiple phases, where each phase can have its own duration (in time steps), reward thresholds
for success and failure, and stopping criteria. If only a single phase is desired (no curriculum), then these
quantities can bevspecified as scalars. If a multi-phase curriculum is desired, then these 4 quantities must
be specified with lists (each list containing an entry for each phase).
"""
def __init__(self,
max_ep_timesteps : int = None, #max time steps allowed in a single episode
max_iterations : int = None, #max total iterations alloweed over all phases combined
min_timesteps : int = 0, #num timesteps that must be completed before considering a stop;
# use a list for multi-phase; list values are per phase (restart counting each phase)
avg_over_latest : int = 10, #num most recent iterations over which statistics will be calculated
success_threshold : float = 1.0, #reward above which we can call it a win; use a list for multi-phase
failure_threshold : float = -1.0, #reward below which we can early terminate (after min required timesteps);
# use a list for multi-phase
degrade_threshold : float = 0.25, #fraction of mean reward range below its peak that mean is allowed to degrade
compl_std_dev : float = 0.01, #std deviation of reward below which we can terminate (success or failure)
let_it_run : bool = False #should we allow the trial to run to the max iterations, regardless of rewards?
# use a list for multi-phase or default for all phases is False
):
# Check for proper multi-phase inputs
self.num_phases = 1
if type(min_timesteps) == list:
self.num_phases = len(min_timesteps)
assert type(success_threshold) == list, "///// StopLogic: success_threshold needs to be a list but is a scalar."
assert type(failure_threshold) == list, "///// StopLogic: failure_threshold needs to be a list but is a scalar."
assert len(success_threshold) == self.num_phases, "///// StopLogic: success_threshold list is different length from min_timesteps."
assert len(failure_threshold) == self.num_phases, "///// StopLogic: failure_threshold list is different length from min_timesteps."
if type(let_it_run) == list:
assert len(let_it_run) == self.num_phases, "///// StopLogic: let_it_run list is different length from min_timesteps."
else:
let_it_run = [let_it_run] * self.num_phases
self.max_ep_timesteps = max_ep_timesteps
self.max_iterations = max_iterations
self.required_min_timesteps = min_timesteps #num required before declaring failure
self.most_recent = avg_over_latest #num recent trials to consider when deciding to stop
self.success_avg_threshold = success_threshold
self.failure_avg_threshold = failure_threshold
self.degrade_threshold = degrade_threshold
self.completion_std_threshold = compl_std_dev
self.let_it_run = let_it_run
print("\n///// StopLogic initialized with max_ep_timesteps = {}, max_iterations = {}, {} phases."
.format(self.max_ep_timesteps, self.max_iterations, self.num_phases))
print(" min_timesteps = {}".format(self.required_min_timesteps))
print(" most_recent = {}, degrade_threshold = {:.2f}, compl_std_thresh = {:.3f}"
.format(self.most_recent, self.degrade_threshold, self.completion_std_threshold))
print(" success_avg_thresh = {}, failure_avg_thresh = {}, let_it_run = {}"
.format(self.success_avg_threshold, self.failure_avg_threshold, self.let_it_run))
# Each entry will have key = trial_id and value = dict containing the following:
# "stop" (bool) - should this trial be stopped?
# "num_entries" (int) - num meaningful entries in the deque
# "mean_rewards" (deque) - the most recent N mean rewards
# "max_rewards" (deque) - the most recent N max rewards
# "min_rewards" (deque) - the most recent N min rewards
# "worst_mean" (float) - the lowest mean reward achieved at any point in the trial so far
# "best_mean" (float) - the highest mean reward achieved at any point in the trial so far
self.trials = {}
# Other internal variables
self.env = None
self.steps_at_phase_begin = 0
self.steps_prev_call = 0
self.prev_phase = 0
self.crossed_min_timesteps = False
def __call__(self,
trial_id : str, #ID of the trial being considered for stopping
result : dict #collection of results of the trial so far
) -> bool: #return: should the trial be terminated?
""" Will be called after each iteration to decide if learning should be stopped for this trial."""
#print("\n///// StopLogic - result = ")
#for item in result:
# print("{}: {}".format(item, result[item]))
#print("///// - end of result\n")
# Determine if this is a multi-phase trial and what phase we are in, then assign local variables for the phase-dependent items
# NOTE: the iteration count and time step counts don't increase monotonically when a PBT scheduler is used, so these are probably
# not good choices for stopping criteria in such a use case.
# TODO: most calls will experience self.env == None. I don't know why. This logic needs to be rewritten in order to use the
# stop logic for any phase > 0.
phase = 0
if self.env is not None:
phase = self.env.get_task()
#print("///// StopLogic call: env exists and phase is {} of {} total phases".format(phase, self.num_phases))
else:
pass #print("///// WARNING: StopLogic called with no environment passed!")
min_timesteps = 0
success_avg_thresh = -math.inf
failure_avg_thresh = -math.inf
let_it_run = False
if self.num_phases == 1:
min_timesteps = self.required_min_timesteps
success_avg_thresh = self.success_avg_threshold
failure_avg_thresh = self.failure_avg_threshold
let_it_run = self.let_it_run
else:
if phase != self.prev_phase:
print("///// StopLogic: Beginning phase {}".format(phase))
self.crossed_min_timesteps = False
self.steps_at_phase_begin = result["timesteps_total"]
self.prev_phase = phase
min_timesteps = self.required_min_timesteps[phase]
success_avg_thresh = self.success_avg_threshold[phase]
failure_avg_thresh = self.failure_avg_threshold[phase]
let_it_run = self.let_it_run[phase]
# Determine the total iteration count and number of steps passed in the current phase - THESE ARE ONLY APPROXIMATE!
total_iters = result["training_iteration"] #was "iterations_since_restore"
steps_this_phase = result["timesteps_total"] - self.steps_at_phase_begin
# If this trial is already underway and being tracked, then
if trial_id in self.trials:
# Capture the values of max, min and mean rewards for this iteration
ep_mean = result["episode_reward_mean"]
mean_rew = -100.0
if not math.isnan(ep_mean):
mean_rew = ep_mean
self.trials[trial_id]["mean_rewards"].append(mean_rew)
max_rew = -100.0
if not math.isnan(result["episode_reward_max"]):
max_rew = result["episode_reward_max"]
self.trials[trial_id]["max_rewards"].append(max_rew)
min_rew = -100.0
if not math.isnan(result["episode_reward_min"]):
min_rew = result["episode_reward_min"]
self.trials[trial_id]["min_rewards"].append(min_rew)
#print("///// Appending reward ", mean_rew, max_rew)
if ep_mean < self.trials[trial_id]["worst_mean"]:
self.trials[trial_id]["worst_mean"] = ep_mean
if ep_mean > self.trials[trial_id]["best_mean"]:
self.trials[trial_id]["best_mean"] = ep_mean
# If the deque of N most recent rewards is not yet full then increment its count
if self.trials[trial_id]["num_entries"] < self.most_recent:
self.trials[trial_id]["num_entries"] += 1
#print("\n///// StopLogic: trial {} has completed {} iterations.".format(trial_id, self.trials[trial_id]["num_entries"]))
# Else the deque is full so we can start analyzing stop criteria
else:
# Stop if we are in the final phase, avg of mean rewards over recent history is above the succcess threshold and
# its standard deviation is small.
avg_of_mean = mean(self.trials[trial_id]["mean_rewards"])
std_of_mean = stdev(self.trials[trial_id]["mean_rewards"])
#print("///// StopLogic: iter #{}, avg reward = {:.2f}, std of mean = {:.3f}".format(total_iters, avg, std_of_mean))
if phase == self.num_phases-1 and avg_of_mean >= success_avg_thresh and std_of_mean <= self.completion_std_threshold:
print("\n///// Stopping trial - SUCCESS!\n")
return True
if steps_this_phase > min_timesteps:
if not self.crossed_min_timesteps:
print("///// StopLogic: Beyond min time steps for phase {}".format(phase))
self.crossed_min_timesteps = True
# Stop if max iterations reached
if self.max_iterations is not None and total_iters >= self.max_iterations:
print("\n///// Stopping trial - reached max iterations.")
return True
# Stop if max timesteps reached
if self.max_ep_timesteps is not None and result["timesteps_since_restore"] >= self.max_ep_timesteps:
print("\n///// Stopping trial - reached max timesteps of {}.".format(self.max_ep_timesteps))
return True
# Stop if mean and max rewards haven't significantly changed in recent history
std_of_max = stdev(self.trials[trial_id]["max_rewards"])
if std_of_mean <= self.completion_std_threshold and std_of_max <= self.completion_std_threshold:
if avg_of_mean >= success_avg_thresh:
print("\n///// Stopping trial - winner with no further change. Mean avg = {:.1f}, mean std = {:.2f}"
.format(avg_of_mean, std_of_mean))
else:
print("\n///// Stopping trial - loser with no further change. Mean avg = {:.1f}, mean std = {:.2f}"
.format(avg_of_mean, std_of_mean))
return True
# If user chooses to let it run, regardless of reward trends, then let it run
if let_it_run:
return False
# If the avg mean reward over recent history is below the failure threshold then
if avg_of_mean < failure_avg_thresh:
done = False
slope_mean = self._get_slope(self.trials[trial_id]["mean_rewards"])
avg_of_min = mean(list(self.trials[trial_id]["min_rewards"]))
# If the max reward is below success threshold and not climbing significantly, then stop as a failure
dq_max = self.trials[trial_id]["max_rewards"]
avg_of_max = mean(dq_max)
slope_max = self._get_slope(dq_max)
if avg_of_max < success_avg_thresh:
if avg_of_max <= failure_avg_thresh or (slope_max < 0.0 and slope_mean < 0.04):
print("\n///// Stopping trial - max is toast in {} iters with little hope of turnaround.\n".format(self.most_recent))
done = True
# If the avg mean is well below the best achieved so far and going south, then stop as failure
delta = max((self.trials[trial_id]["best_mean"] - self.trials[trial_id]["worst_mean"]) * self.degrade_threshold, 10.0)
thresh_value = self.trials[trial_id]["best_mean"] - delta
if avg_of_mean < thresh_value and slope_mean < 0.0:
print("\n///// Stopping trial - mean reward is failing and {:.0f}% below its peak of {:.1f}."
.format(100*self.degrade_threshold, self.trials[trial_id]["best_mean"]))
done = True
# If the mean curve is heading down and the max is not increasing then stop as a failure
if slope_max < -0.04 and slope_mean < -0.04:
print("\n///// Stopping trial - mean reward bad & getting worse, max is not improving in latest {} iters."
.format(self.most_recent))
done = True
# If the mean is a lot closer to the min than to the max and no sign of improving then stop as failure
if avg_of_mean - avg_of_min < 0.25*(avg_of_max - avg_of_min) and slope_mean <= 0.0:
print("\n///// Stopping trial - no improvement and min reward is dominating in latest {} iters."
.format(self.most_recent))
done = True
if done:
print("///// Trial {} ended; mean avg = {:.1f}, mean slope = {:.2f}, max avg = {:.1f}, max slope = {:.2f}"
.format(trial_id, avg_of_mean, slope_mean, avg_of_max, slope_max))
print(" Phase = {}, steps complete = {}, min timesteps = {}, success threshold = {:.2f}, failure threshold = {:.2f}"
.format(phase, steps_this_phase, min_timesteps, success_avg_thresh, failure_avg_thresh))
print(" latest means:")
for i in range(len(self.trials[trial_id]["mean_rewards"]) // 5):
print(" {:3d}: ".format(5*i), end="")
for j in range(5):
print("{:5.1f}, ".format(self.trials[trial_id]["mean_rewards"][5*i + j]), end="")
print(" ")
return True
# Else, it is a brand new trial
else:
mean_rew = deque(maxlen = self.most_recent)
mean_rew.append(result["episode_reward_mean"])
max_rew = deque(maxlen = self.most_recent)
max_rew.append(result["episode_reward_max"])
min_rew = deque(maxlen = self.most_recent)
min_rew.append(result["episode_reward_min"])
self.trials[trial_id] = {"stop": False, "num_entries": 1, \
"mean_rewards": mean_rew, "max_rewards": max_rew, "min_rewards": min_rew, "worst_mean": math.inf,
"best_mean": -math.inf}
print("///// StopLogic adding new trial: {}".format(trial_id))
return False
def set_environment_model(self,
env: TaskSettableEnv #a reference to the environment model
) -> None:
"""This is required to be called by the environment if multi-phase learning is to be done."""
self.env = env
#print("///// StopLogic: environment model has been made known, at ", self.env)
def get_success_thresholds(self) -> list:
"""Returns a list of the success thresholds defined for the various phases."""
if type(self.success_avg_threshold) == list:
return self.success_avg_threshold
else:
return [self.success_avg_threshold]
"""Not sure when this is called"""
def stop_all(self):
#print("\n\n///// In StopLogic.stop_all /////\n")
return False
def _get_slope(self, dq):
begin = 0.0
end = 0.0
dq_size = len(dq)
if dq_size < 4:
begin = dq[0]
end = dq[-1]
elif dq_size < 21:
begin = 0.333333 * mean([dq[i] for i in range(3)])
end = 0.333333 * mean([dq[i] for i in range(-3, 0)])
else:
begin = 0.1 * mean([dq[i] for i in range(10)])
end = 0.1 * mean([dq[i] for i in range(-10, 0)])
return end - begin