-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaz_parallel2.py
333 lines (271 loc) · 19.5 KB
/
az_parallel2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import numpy as np
from aMCTS_parallel import MCTSParallel, MCTS_Node
from ataxx import AtaxxBoard
from go import GoBoard
import random
#from tqdm import tqdm
from tqdm.notebook import tqdm
import torch
import torch.nn.functional as F
torch.manual_seed(0)
import warnings
warnings.filterwarnings("ignore")
# for the data augmentation process
def transformations(board_state, action_probs, outcome, gameType, fill_size=0):
if gameType == 'G':
side = board_state.size
transf = []
# Flip vertically
transf.append((board_state.flip_vertical().EncodedGameStateChanged(), np.append(np.flip(np.copy(action_probs)[:-1].reshape(side,side),0).flatten(),action_probs[-1]), outcome)) # flip vertically
# Rotate 90 degrees
transf.append((board_state.rotate90(1).EncodedGameStateChanged(), np.append(np.rot90(np.copy(action_probs)[:-1].reshape(side,side),1).flatten(),action_probs[-1]), outcome)) # rotate 90
# Rotate 90 degrees and flip vertically
transf.append((board_state.rotate90(1).flip_vertical().EncodedGameStateChanged(), np.append(np.rot90(np.flip(np.copy(action_probs)[:-1].reshape(side,side),1),0).flatten(),action_probs[-1]), outcome)) # rotate 90 and flip vertically
# Rotate 180 degrees
transf.append((board_state.rotate90(2).EncodedGameStateChanged(), np.append(np.rot90(np.copy(action_probs)[:-1].reshape(side,side),2).flatten(),action_probs[-1]), outcome)) # rotate 180
# Rotate 180 degrees and flip vertically
transf.append((board_state.rotate90(2).flip_vertical().EncodedGameStateChanged(), np.append(np.rot90(np.flip(np.copy(action_probs)[:-1].reshape(side,side),1),0).flatten(),action_probs[-1]), outcome)) # rotate 180 and flip vertically
# Rotate 270 degrees
transf.append((board_state.rotate90(3).EncodedGameStateChanged(), np.append(np.rot90(np.copy(action_probs)[:-1].reshape(side,side),3).flatten(),action_probs[-1]), outcome)) # rotate 270
# Rotate 270 degrees and flip vertically
transf.append((board_state.rotate90(3).flip_vertical().EncodedGameStateChanged(), np.append(np.rot90(np.flip(np.copy(action_probs)[:-1].reshape(side,side),1),0).flatten(),action_probs[-1]), outcome)) # rotate 270 and flip vertically
return transf
elif gameType == 'A':
if fill_size==0:
side = board_state.size
transf = []
# Flip vertically
transf.append((board_state.flip_vertical().EncodedGameStateChanged(), np.flip(np.flip(np.copy(action_probs).reshape(side,side,side,side),2),0).flatten(), outcome)) # flip vertically
# Rotate 90 degrees
transf.append((board_state.rotate90(1).EncodedGameStateChanged(), np.rot90(np.rot90(np.copy(action_probs).reshape(side,side,side,side),1,(2,3)),1,(0,1)).flatten(), outcome)) # rotate 90
# Rotate 90 degrees and flip vertically
transf.append((board_state.rotate90(1).flip_vertical().EncodedGameStateChanged(), np.flip(np.flip(np.rot90(np.rot90(np.copy(action_probs).reshape(side,side,side,side),1,(2,3)),1,(0,1)),2),0).flatten(), outcome)) # rotate 90 and flip vertically
# Rotate 180 degrees
transf.append((board_state.rotate90(2).EncodedGameStateChanged(), np.rot90(np.rot90(np.copy(action_probs).reshape(side,side,side,side),2,(2,3)),2,(0,1)).flatten(), outcome)) # rotate 180
# Rotate 180 degrees and flip vertically
transf.append((board_state.rotate90(2).flip_vertical().EncodedGameStateChanged(), np.flip(np.flip(np.rot90(np.rot90(np.copy(action_probs).reshape(side,side,side,side),2,(2,3)),2,(0,1)),2),0).flatten(), outcome)) # rotate 180 and flip vertically
# Rotate 270 degrees
transf.append((board_state.rotate90(3).EncodedGameStateChanged(), np.rot90(np.rot90(np.copy(action_probs).reshape(side,side,side,side),3,(2,3)),3,(0,1)).flatten(), outcome)) # rotate 270
# Rotate 270 degrees and flip vertically
transf.append((board_state.rotate90(3).flip_vertical().EncodedGameStateChanged(), np.flip(np.flip(np.rot90(np.rot90(np.copy(action_probs).reshape(side,side,side,side),3,(2,3)),3,(0,1)),2),0).flatten(), outcome)) # rotate 270 and flip vertically
return transf
else:
side = board_state.size
transf = []
# Flip vertically
transf.append((board_state.flip_vertical().EncodedGameStateChanged(fill_size), np.pad(np.flip(np.flip(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],2),0),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # flip vertically
# Rotate 90 degrees
transf.append((board_state.rotate90(1).EncodedGameStateChanged(fill_size),np.pad(np.rot90(np.rot90(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],1,(2,3)),1,(0,1)),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # rotate 90
# Rotate 90 degrees and flip vertically
transf.append((board_state.rotate90(1).flip_vertical().EncodedGameStateChanged(fill_size), np.pad(np.flip(np.flip(np.rot90(np.rot90(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],1,(2,3)),1,(0,1)),2),0),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # rotate 90 and flip vertically
# Rotate 180 degrees
transf.append((board_state.rotate90(2).EncodedGameStateChanged(fill_size), np.pad(np.rot90(np.rot90(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],2,(2,3)),2,(0,1)),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # rotate 180
# Rotate 180 degrees and flip vertically
transf.append((board_state.rotate90(2).flip_vertical().EncodedGameStateChanged(fill_size), np.pad(np.flip(np.flip(np.rot90(np.rot90(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],2,(2,3)),2,(0,1)),2),0),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # rotate 180 and flip vertically
# Rotate 270 degrees
transf.append((board_state.rotate90(3).EncodedGameStateChanged(fill_size), np.pad(np.rot90(np.rot90(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],3,(2,3)),3,(0,1)),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # rotate 270
# Rotate 270 degrees and flip vertically
transf.append((board_state.rotate90(3).flip_vertical().EncodedGameStateChanged(fill_size), np.pad(np.flip(np.flip(np.rot90(np.rot90(np.copy(action_probs).reshape(fill_size,fill_size,fill_size,fill_size)[:side,:side,:side,:side],3,(2,3)),3,(0,1)),2),0),(0,fill_size-side),'constant',constant_values=(0)).flatten(), outcome)) # rotate 270 and flip vertically
return transf
return []
# function that applies temperature to the given probabilities distribution and normalizes the result, for the current AlphaZero iteration
def probs_with_temperature(probabilities, az_iteration):
# returns a vale between 1.25 and 0.75
def temperature_function(az_iter):
return 1 / (1 + np.e**(az_iter-5)) + 0.5
prob_temp = probabilities**(1/temperature_function(az_iteration))
prob_temp /= np.sum(prob_temp)
return prob_temp
class AlphaZeroParallel2:
"""
Class implementing the AlphaZero algorithm with parallelized self-play, MCTS, and training.
Parameters:
- model: The neural network model.
- optimizer: The optimizer used for training the neural network.
- board: The game board.
- gameType: Type of the game ('G' for Go, 'A' for Ataxx).
- data_augmentation: Flag for enabling data augmentation during self-play.
- verbose: Flag for printing progress information.
- fill_size: The fill size (used for Ataxx game with a fill).
- **params: Additional parameters for configuration.
Methods:
- SelfPlay: Perform self-play for a specified number of iterations.
- Train: Train the neural network on a given dataset.
- Learn: Execute the AlphaZero algorithm for a specified number of iterations.
"""
def __init__(self, model, optimizer, board, gameType, data_augmentation=False, verbose=False, fill_size=0, **params):
"""
Initialize the AlphaZeroParallel2 object.
Parameters:
- model: The neural network model.
- optimizer: The optimizer used for training the neural network.
- board: The game board.
- gameType: Type of the game ('G' for Go, 'A' for Ataxx).
- data_augmentation: Flag for enabling data augmentation during self-play.
- verbose: Flag for printing progress information.
- fill_size: The fill size (used for Ataxx game with a fill).
- **params: Additional parameters for configuration.
"""
self.model = model
self.optimizer = optimizer
self.board = board
self.gameType = gameType
self.params = params
self.data_augmentation = data_augmentation
self.verbose = verbose
self.fill_size = fill_size
def SelfPlay(self, az_iteration):
"""
Perform self-play for a specified number of iterations.
Parameters:
- az_iteration: The current iteration of the AlphaZero algorithm.
Returns:
- return_dataset: A list of training samples (state, action probabilities, outcome).
"""
# Set the size of the game board
if self.fill_size != 0:
size = self.fill_size
else:
size = self.board.size
# Initialize the return dataset to store training samples
return_dataset = []
# Track the number of self-plays performed
selfplays_done = 0
# Create a list of game boards, each associated with a separate thread for parallel self-play
boards = [None for _ in range(self.params["n_self_play_parallel"])]
boards_dataset = [[] for _ in range(self.params["n_self_play_parallel"])]
boards_play_count = [0 for _ in range(self.params["n_self_play_parallel"])]
# Initialize game boards based on the specified game type (Ataxx or Go)
for i in range(self.params["n_self_play_parallel"]):
boards[i] = AtaxxBoard(size) if self.gameType == "A" else GoBoard(size)
boards[i].Start(render=False)
# Adjust the size if the fill_size parameter is set
if self.fill_size != 0:
size -= 1
# Reset size for self-play iterations
if self.fill_size != 0:
size = self.fill_size
# Initialize the MCTS object for parallel search
self.mcts = MCTSParallel(self.model, fill_size=self.fill_size)
root_boards = [MCTS_Node(board, fill_size=self.fill_size) for board in boards]
# Main loop for self-play
while len(boards) > 0:
# Use MCTS to get action probabilities for each board
boards_actions_probs = self.mcts.Search(root_boards, self.params["mcts_iterations"])
# Iterate over boards in reverse order to safely remove boards
for i in range(len(boards))[::-1]:
action_probs = boards_actions_probs[i]
# Append the current state, action probabilities, and player to the dataset
boards_dataset[i].append((boards[i].copy(), action_probs, boards[i].player))
# Choose an action based on the probabilities
moves = list(range(len(action_probs)))
action = np.random.choice(moves, p=action_probs)
move = self.mcts.roots[i].children[action].originMove
# Apply the selected move to the board
boards[i].Move(move)
boards[i].NextPlayer()
boards[i].CheckFinish()
boards_play_count[i] += 1
# Update the new root (root is now the played child state)
root_boards[i] = self.mcts.roots[i].children[action]
root_boards[i].parent = None # It is needed to "remove" / "delete" the parent state
# Check if the move cap is reached or the game is finished
if boards_play_count[i] >= self.params["move_cap"] and boards[i].winner == 0:
boards[i].winner = 3
if boards[i].hasFinished():
# Append the final configuration to the dataset
boards_dataset[i].append((boards[i].copy(), action_probs, boards[i].player))
# Switch to the next player and append the state again
boards[i].NextPlayer()
boards_dataset[i].append((boards[i].copy(), action_probs, boards[i].player))
# Process the dataset and add training samples with outcomes
for board, action_probs, player in boards_dataset[i]:
if player == boards[i].winner:
outcome = 1
elif 3 - player == boards[i].winner:
outcome = -1
else:
outcome = 0
# Add the training sample to the return dataset
return_dataset.append((board.EncodedGameStateChanged(self.fill_size), action_probs, outcome))
# Data augmentation process (rotating and flipping the board)
if self.data_augmentation:
for transformed_data in transformations(board, action_probs, outcome, self.gameType, fill_size=self.fill_size):
return_dataset.append(transformed_data)
# Dynamic parallel self-play allocation
if selfplays_done >= self.params["self_play_iterations"] - self.params["n_self_play_parallel"]:
del boards[i]
del root_boards[i]
del boards_play_count[i]
else:
# Initialize a new game board for self-play
boards[i] = AtaxxBoard(size) if self.gameType == "A" else GoBoard(size)
boards[i].Start(render=False)
root_boards[i] = MCTS_Node(boards[i], fill_size=self.fill_size)
boards_dataset[i] = []
boards_play_count[i] = 0
# Adjust the size if the fill_size parameter is set
if self.fill_size != 0:
if (selfplays_done + 1) % (self.fill_size - 3) == 0:
size = self.fill_size
else:
size -= 1
selfplays_done += 1
# Print progress message
if selfplays_done % self.params["n_self_play_parallel"] == 0:
print("\nSELFPLAY:", selfplays_done * 100 // self.params["self_play_iterations"], "%")
print("\nSELFPLAY: 100 %")
return return_dataset
def Train(self, dataset):
"""
Train the neural network on a given dataset.
Parameters:
- dataset: The dataset for training.
"""
random.shuffle(dataset)
# Iterate over the dataset in batches
for batch_index in range(0, len(dataset), self.params['batch_size']):
# Extract a batch of samples from the dataset
sample = dataset[batch_index: batch_index + self.params["batch_size"]]
# Unzip the samples into separate lists for board_encoded, policy_targets, and value_targets
board_encoded, policy_targets, value_targets = zip(*sample)
# Convert the lists to NumPy arrays
board_encoded, policy_targets, value_targets = np.array(board_encoded), np.array(policy_targets), np.array(value_targets).reshape(-1, 1)
# Convert NumPy arrays to PyTorch tensors and move them to the device (GPU, if available)
board_encoded = torch.tensor(board_encoded, dtype=torch.float32, device=self.model.device)
policy_targets = torch.tensor(policy_targets, dtype=torch.float32, device=self.model.device)
value_targets = torch.tensor(value_targets, dtype=torch.float32, device=self.model.device)
# Forward pass: Get the model predictions for policy and value
out_policy, out_value = self.model(board_encoded)
# Calculate policy loss using cross-entropy loss
policy_loss = F.cross_entropy(out_policy, policy_targets)
# Calculate value loss using mean squared error loss
value_loss = F.mse_loss(out_value, value_targets)
# Combine policy and value losses with a weight factor for policy loss
loss = policy_loss * 0.1 + value_loss
# Zero the gradients, perform backward pass, and update model parameters
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def Learn(self):
"""
Execute the AlphaZero algorithm for a specified number of iterations.
"""
for az_iteration in tqdm(range(self.params["n_iterations"]), desc="AlphaZero Algorithm Iterations", leave=False, unit="iter", ncols=100, colour="#fc6a65"):
# Set the model in evaluation mode during self-play
self.model.eval()
# Perform self-play to generate a dataset
dataset = self.SelfPlay(az_iteration)
# Set the model back in training mode for updating parameters
self.model.train()
# Iterate over the specified number of training epochs
for epoch in tqdm(range(self.params["n_epochs"]), desc="Training Model", leave=False, unit="epoch", ncols=100, colour="#9ffc65"):
# Train the model using the generated dataset
self.Train(dataset)
# Save the model and optimizer states after each iteration
if self.fill_size == 0:
torch.save(self.model.state_dict(), f"./Models/{str.upper(self.gameType)}{self.board.size}/{str.upper(self.gameType)}{self.board.size}_{az_iteration}.pt")
torch.save(self.optimizer.state_dict(), f"./Optimizers/{str.upper(self.gameType)}{self.board.size}/{str.upper(self.gameType)}{self.board.size}_{az_iteration}_opt.pt")
else:
torch.save(self.model.state_dict(), f"./Models/{str.upper(self.gameType)}Flex/{str.upper(self.gameType)}Flex_{az_iteration}.pt")
torch.save(self.optimizer.state_dict(), f"./Optimizers/{str.upper(self.gameType)}Flex/{str.upper(self.gameType)}Flex_{az_iteration}_opt.pt")