-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathars.py
141 lines (111 loc) · 4.71 KB
/
ars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# AI 2018
# Importing the libraries
import os
import numpy as np
import gym
from gym import wrappers
import pybullet_envs
# Setting the Hyper Parameters
class Hp():
def __init__(self):
self.nb_steps = 1000
self.episode_length = 1000
self.learning_rate = 0.02
self.nb_directions = 16
self.nb_best_directions = 16
assert self.nb_best_directions <= self.nb_directions
self.noise = 0.03
self.seed = 1
self.env_name = 'HalfCheetahBulletEnv-v0'
# Normalizing the states
class Normalizer():
def __init__(self, nb_inputs):
self.n = np.zeros(nb_inputs)
self.mean = np.zeros(nb_inputs)
self.mean_diff = np.zeros(nb_inputs)
self.var = np.zeros(nb_inputs)
def observe(self, x):
self.n += 1.
last_mean = self.mean.copy()
self.mean += (x - self.mean) / self.n
self.mean_diff += (x - last_mean) * (x - self.mean)
self.var = (self.mean_diff / self.n).clip(min = 1e-2)
def normalize(self, inputs):
obs_mean = self.mean
obs_std = np.sqrt(self.var)
return (inputs - obs_mean) / obs_std
# Building the AI
class Policy():
def __init__(self, input_size, output_size):
self.theta = np.zeros((output_size, input_size))
def evaluate(self, input, delta = None, direction = None):
if direction is None:
return self.theta.dot(input)
elif direction == "positive":
return (self.theta + hp.noise*delta).dot(input)
else:
return (self.theta - hp.noise*delta).dot(input)
def sample_deltas(self):
return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)]
def update(self, rollouts, sigma_r):
step = np.zeros(self.theta.shape)
for r_pos, r_neg, d in rollouts:
step += (r_pos - r_neg) * d
self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step
# Exploring the policy on one specific direction and over one episode
def explore(env, normalizer, policy, direction = None, delta = None):
state = env.reset()
done = False
num_plays = 0.
sum_rewards = 0
while not done and num_plays < hp.episode_length:
normalizer.observe(state)
state = normalizer.normalize(state)
action = policy.evaluate(state, delta, direction)
state, reward, done, _ = env.step(action)
reward = max(min(reward, 1), -1)
sum_rewards += reward
num_plays += 1
return sum_rewards
# Training the AI
def train(env, policy, normalizer, hp):
for step in range(hp.nb_steps):
# Initializing the perturbations deltas and the positive/negative rewards
deltas = policy.sample_deltas()
positive_rewards = [0] * hp.nb_directions
negative_rewards = [0] * hp.nb_directions
# Getting the positive rewards in the positive directions
for k in range(hp.nb_directions):
positive_rewards[k] = explore(env, normalizer, policy, direction = "positive", delta = deltas[k])
# Getting the negative rewards in the negative/opposite directions
for k in range(hp.nb_directions):
negative_rewards[k] = explore(env, normalizer, policy, direction = "negative", delta = deltas[k])
# Gathering all the positive/negative rewards to compute the standard deviation of these rewards
all_rewards = np.array(positive_rewards + negative_rewards)
sigma_r = all_rewards.std()
# Sorting the rollouts by the max(r_pos, r_neg) and selecting the best directions
scores = {k:max(r_pos, r_neg) for k,(r_pos,r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
order = sorted(scores.keys(), key = lambda x:scores[x], reverse = True)[:hp.nb_best_directions]
rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
# Updating our policy
policy.update(rollouts, sigma_r)
# Printing the final reward of the policy after the update
reward_evaluation = explore(env, normalizer, policy)
print('Step:', step, 'Reward:', reward_evaluation)
# Running the main code
def mkdir(base, name):
path = os.path.join(base, name)
if not os.path.exists(path):
os.makedirs(path)
return path
work_dir = mkdir('exp', 'brs')
monitor_dir = mkdir(work_dir, 'monitor')
hp = Hp()
np.random.seed(hp.seed)
env = gym.make(hp.env_name)
env = wrappers.Monitor(env, monitor_dir, force = True)
nb_inputs = env.observation_space.shape[0]
nb_outputs = env.action_space.shape[0]
policy = Policy(nb_inputs, nb_outputs)
normalizer = Normalizer(nb_inputs)
train(env, policy, normalizer, hp)