-
Notifications
You must be signed in to change notification settings - Fork 38
/
point_mass.py
203 lines (170 loc) · 5.99 KB
/
point_mass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from gym.utils import EzPickle
from gym import spaces
from gym.envs.mujoco.mujoco_env import MujocoEnv
class MultiGoalEnv(MujocoEnv, EzPickle):
"""
Move a 2D point mass as far as possible from the origin.
State: position.
Action: velocity.
"""
def __init__(self,
goal_reward=10,
actuation_cost_coeff=30.0,
distance_cost_coeff=1.0,
init_sigma=0.1):
EzPickle.__init__(**locals())
self.dynamics = PointDynamics(dim=2, sigma=0)
self.init_mu = np.zeros(2, dtype=np.float32)
self.init_sigma = init_sigma
self.goal_positions = np.array(
(
(-5, 0),
(5, 0),
(0, 5),
(0, -5),
),
dtype=np.float32)
self.goal_threshold = 1.0
self.goal_reward = goal_reward
self.action_cost_coeff = actuation_cost_coeff
self.distance_cost_coeff = distance_cost_coeff
self.xlim = (-7, 7)
self.ylim = (-7, 7)
self.vel_bound = 1.
self.reset()
self.observation = None
self._ax = None
self._env_lines = []
self.fixed_plots = None
self.dynamic_plots = []
def reset(self):
unclipped_observation = (
self.init_mu
+ self.init_sigma
* np.random.normal(size=self.dynamics.s_dim))
self.observation = np.clip(
unclipped_observation,
self.observation_space.low,
self.observation_space.high)
return self.observation
@property
def observation_space(self):
return spaces.Box(
low=np.array((self.xlim[0], self.ylim[0])),
high=np.array((self.xlim[1], self.ylim[1])),
dtype=np.float32,
shape=None)
@property
def action_space(self):
return spaces.Box(
low=-self.vel_bound,
high=self.vel_bound,
shape=(self.dynamics.a_dim, ),
dtype=np.float32)
def get_current_obs(self):
return np.copy(self.observation)
def step(self, action):
action = action.ravel()
action = np.clip(
action,
self.action_space.low,
self.action_space.high).ravel()
observation = self.dynamics.forward(self.observation, action)
observation = np.clip(
observation,
self.observation_space.low,
self.observation_space.high)
reward = self.compute_reward(observation, action)
dist_to_goal = np.amin([
np.linalg.norm(observation - goal_position)
for goal_position in self.goal_positions
])
done = dist_to_goal < self.goal_threshold
# if done:
# reward += self.goal_reward
self.observation = np.copy(observation)
return observation, reward, done, {'pos': observation}
def _init_plot(self):
fig_env = plt.figure(figsize=(7, 7))
self._ax = fig_env.add_subplot(111)
self._ax.axis('equal')
self._env_lines = []
self._ax.set_xlim((-7, 7))
self._ax.set_ylim((-7, 7))
self._ax.set_title('Multigoal Environment')
self._ax.set_xlabel('x')
self._ax.set_ylabel('y')
self._plot_position_cost(self._ax)
def render_rollouts(self, paths=()):
"""Render for rendering the past rollouts of the environment."""
if self._ax is None:
self._init_plot()
# noinspection PyArgumentList
[line.remove() for line in self._env_lines]
self._env_lines = []
for path in paths:
positions = np.stack([info['pos'] for info in path['infos']])
xx = positions[:, 0]
yy = positions[:, 1]
self._env_lines += self._ax.plot(xx, yy, 'b')
plt.draw()
plt.pause(0.01)
def render(self, mode='human'):
"""Render for rendering the current state of the environment."""
pass
def compute_reward(self, observation, action):
# penalize the L2 norm of acceleration
# noinspection PyTypeChecker
action_cost = np.sum(action ** 2) * self.action_cost_coeff
# penalize squared dist to goal
cur_position = observation
# noinspection PyTypeChecker
goal_cost = self.distance_cost_coeff * np.amin([
np.sum((cur_position - goal_position) ** 2)
for goal_position in self.goal_positions
])
# penalize staying with the log barriers
dist_cost = self.distance_cost_coeff * np.sqrt(np.sum(cur_position**2))
# costs = [action_cost, goal_cost]
costs = [-action_cost, dist_cost]
reward = np.sum(costs)
return reward
def _plot_position_cost(self, ax):
delta = 0.01
x_min, x_max = tuple(1.1 * np.array(self.xlim))
y_min, y_max = tuple(1.1 * np.array(self.ylim))
X, Y = np.meshgrid(
np.arange(x_min, x_max, delta),
np.arange(y_min, y_max, delta)
)
goal_costs = np.amin([
(X - goal_x) ** 2 + (Y - goal_y) ** 2
for goal_x, goal_y in self.goal_positions
], axis=0)
costs = goal_costs
contours = ax.contour(X, Y, costs, 20)
ax.clabel(contours, inline=1, fontsize=10, fmt='%.0f')
ax.set_xlim([x_min, x_max])
ax.set_ylim([y_min, y_max])
goal = ax.plot(self.goal_positions[:, 0],
self.goal_positions[:, 1], 'ro')
return [contours, goal]
class PointDynamics(object):
"""
State: position.
Action: velocity.
"""
def __init__(self, dim, sigma):
self.dim = dim
self.sigma = sigma
self.s_dim = dim
self.a_dim = dim
def forward(self, state, action):
mu_next = state + action
state_next = mu_next + self.sigma * \
np.random.normal(size=self.s_dim)
return state_next