-
Notifications
You must be signed in to change notification settings - Fork 0
/
policy_parameterizations.py
225 lines (188 loc) · 6.88 KB
/
policy_parameterizations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
from typing import Tuple, List, Callable, Union, Optional
import numpy as np
import torch
class DMP:
"""Dynamic movement primitives.
Attributes:
n_dmps: Number of dmps. For the experiment this
corresponds to the degrees of freedom.
n_bfs: Number of basis functions.
time: Execution time for each trial.
"""
def __init__(
self,
n_dmps: int,
n_bfs: int,
time: float,
):
self.n_dmps = n_dmps
self.n_bfs = n_bfs
self.time = time
self.len_params = self.n_bfs * self.n_dmps
def __call__(self, state: torch.Tensor, params: torch.Tensor) -> torch.Tensor:
is_real = params[:,-1]
param_dmp = params.clone().detach()
param_dmp = param_dmp[:,:-1]
w = param_dmp.view(self.n_dmps, self.n_bfs)
c = torch.linspace(0, 2 * np.pi, self.n_bfs + 1)
c = c[0:-1]
h = torch.ones(self.n_bfs) * self.n_bfs
psi = torch.exp(h * (torch.cos((2 * np.pi) / self.time * state[0].item() - c) - 1))
out = torch.tensor([torch.dot(psi, w[0, :]), torch.dot(psi, w[1, :]), is_real])
return out
class MLP:
"""Multilayer perceptrone.
Consists of at least two layers of nodes: an input layer and an output
layer. Optionally one can extend it with arbitrary many hidden layers.
Except for the input nodes, each node is a neuron that can optionally use a
nonlinear activation function.
Attributes:
L0: Number of input nodes. For a gym environment objective this
corresponds to the states.
Ls: List of numbers for nodes of optional hidden layers and the output
layer. For a gym environment objective the last number of the list
has to correspond to the actions.
add_bias: If True every layer has one bias vector of the same dimension
as the output dimension of the layer.
nonlinearity: Opportunity to hand over a nonlinearity function.
"""
def __init__(
self,
L0: int,
*Ls: List[int],
add_bias: bool = False,
nonlinearity: Optional[Callable[[torch.Tensor], torch.Tensor]] = None,
):
"""Inits MLP."""
self.L0 = L0
self.Ls = Ls
self.add_bias = add_bias
self.len_params = sum(
[
(in_size + 1 * add_bias) * out_size
for in_size, out_size in zip((L0,) + Ls[:-1], Ls)
]
)
if nonlinearity is None:
nonlinearity = lambda x: x
self.nonlinearity = nonlinearity
def __call__(self, state: torch.Tensor, params: torch.Tensor) -> torch.Tensor:
"""Maps states and parameters of MLP to its actions.
Args:
state: The state tensor.
params: Parameters of the MLP.
Returns:
Output of the MLP/actions.
"""
with torch.no_grad():
params = params.view(self.len_params)
out = state
start, end = (0, 0)
in_size = self.L0
for out_size in self.Ls:
# Linear mapping.
start, end = end, end + in_size * out_size
out = out @ params[start:end].view(in_size, out_size)
# Add bias.
if self.add_bias:
start, end = end, end + out_size
out = out + params[start:end]
# Apply nonlinearity.
out = self.nonlinearity(out)
in_size = out_size
return out
def _select_first_layer(
self, params: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Helper function for state normalization.
Normalization is only applied for the first layer out = A @ in + b.
Args:
params: Parameters of MLP.
Returns:
A tuple of the layer nodes A and its biases b.
Raises:
ValueError: If the mlp has no bias vectors in every layer.
"""
if self.add_bias is False:
raise ValueError(
f"For state normalization the MLP should have biases, but add_bias is {self.add_bias}."
)
in_size = self.L0
out_size = self.Ls[0]
A = params[:, : in_size * out_size].view(-1, in_size, out_size)
b = params[:, in_size * out_size : (in_size + 1) * out_size]
return A, b
def normalize_params(
self,
params: torch.Tensor,
mean: Union[float, torch.Tensor],
std: Union[float, torch.Tensor],
) -> torch.Tensor:
"""State normalization for a MLP.
Only the first layer is transformed affine linear.
For further information see thesis 4.2 Extensions.
Args:
params: Parameters of MLP.
mean: Mean of states.
std: Standard deviation of states.
Returns:
Normalized parameters of MLP.
"""
params = params.clone()
A, b = self._select_first_layer(params)
if type(mean) is torch.Tensor:
b += mean @ A
else:
b += (mean * torch.ones(A.shape[1])) @ A
if type(std) is torch.Tensor:
A *= std.view(-1, 1)
else:
A *= std
return params
def unnormalize_params(
self,
params: torch.Tensor,
mean: Union[float, torch.Tensor],
std: Union[float, torch.Tensor],
) -> torch.Tensor:
"""State unnormalization for a MLP.
Only the first layer is transformed affine linear.
For further information see thesis 4.2 Extensions.
Args:
params: Parameters of MLP.
mean: Mean of states.
std: Standard deviation of states.
Returns:
Unnormalized parameters of MLP.
"""
params = params.clone()
A, b = self._select_first_layer(params)
if type(std) is torch.Tensor:
A /= std.view(-1, 1)
else:
A /= std
if type(mean) is torch.Tensor:
b -= mean @ A
else:
b -= (mean * torch.ones(A.shape[1])) @ A
return params
def discretize(function: Callable, num_actions: int):
"""Discretize output/actions of MLP.
For instance necessary for the CartPole environment.
Args:
function: Mapping states with parameters to actions, e.g. MLP.
num_actions: Number of function outputs.
Returns:
Function with num_actions discrete outputs.
"""
def discrete_policy_2(state, params):
return (function(state, params) > 0.0) * 1
def discrete_policy_n(state, params):
return torch.argmax(function(state, params))
if num_actions == 2:
discrete_policy = discrete_policy_2
elif num_actions > 2:
discrete_policy = discrete_policy_n
else:
raise (f"Argument num_actions is {num_actions} but has to be greater than 1.")
return discrete_policy