-
Notifications
You must be signed in to change notification settings - Fork 0
/
Fianchetto_fastBot_buffer.py
412 lines (336 loc) · 20.7 KB
/
Fianchetto_fastBot_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
import multiprocessing as mp
import os
from termcolor import colored
from collections import defaultdict
from functools import partial
from time import time
from tqdm import tqdm
from typing import Optional, List, Tuple, Set, Callable, DefaultDict
from csv import writer
import chess.engine
from reconchess import Player, Color, GameHistory, WinReason, Square
from Fianchetto_fast_tournament_2022 import defaults
from Fianchetto_fast_tournament_2022.strategies.auxiliary_strategies import contingency_strategy
from Fianchetto_fast_tournament_2022.utilities import board_matches_sense, move_would_happen_on_board, union_dict#, populate_next_board_set
from Fianchetto_fast_tournament_2022.utilities.player_logging import create_main_logger
from Fianchetto_fast_tournament_2022.utilities.timing import Timer
import copy
# Parameters for minor bot behaviors
RC_DISABLE_PBAR = os.getenv('RC_DISABLE_PBAR', 'false').lower() == 'true' # Flag to disable the tqdm progress bars
WAIT_LOOP_RATE_LIMIT = 1 # minimum seconds spent looping in self.while_we_wait()
# Parameters for switching to the emergency backup plan
BOARD_SET_LIMIT = 1_000_000 # number of boards in set at which we stop processing and revert to backup plan
TIMEOUT_RESIGN_LIMIT = 10 # number of seconds left at which we stop processing and revert to backup plan
AVG_BOARD_EXP = 33 # number of moves on each board: mean 33, std 10 according to ~320k boards in logs
class Fianchetto_fastBot_buffer(Player):
"""
StrangeFish is the main skeleton of our reconchess-playing bot. Its primary role is to manage the set of all
possible board states based on the given information. Decision making for sense and move choices are handed off to
one of our strategy functions.
StrangeFish alone does not use the Stockfish chess engine, but most of our strategies do use it to make sensing and
moving decisions. In order to run StrangeFish with one of those strategies, you'll need to download Stockfish from
https://stockfishchess.org/download/ and create an environment variable called STOCKFISH_EXECUTABLE that is the path
to the downloaded Stockfish executable.
"""
def __init__(
self,move_list,sense_list,
fast_divide_factor,
modifier,
# choose_sense: Callable[[DefaultDict[str, float], bool, List[Square], List[chess.Move], float], Square] = defaults.choose_sense,
choose_sense: Callable[[DefaultDict[str, float], bool, List[Square], List[chess.Move], float, float], Square] = defaults.choose_sense,
# choose_move: Callable[[DefaultDict[str, float], bool, List[chess.Move], float], chess.Move] = defaults.choose_move,
choose_move: Callable[[DefaultDict[str, float], bool, List[chess.Move], float, float], chess.Move] = defaults.choose_move,
while_we_wait: Optional[Callable[[DefaultDict[str, float], bool], None]] = defaults.while_we_wait,
end_game: Optional[Callable[[DefaultDict[str, float]], None]] = defaults.end_game,
populate_next_board_set: Optional[Callable] = defaults.populate_next_board_set,
# get_next_boards_and_capture_squares: Optional[Callable] = defaults.get_next_boards_and_capture_squares,
batch_size: Optional[int] = 1024,
pool_size: Optional[int] = 2,
log_to_file=True,
save_debug_history=False,
rc_disable_pbar=RC_DISABLE_PBAR,
):
"""
Set up StrangeFish with decision-making capabilities inherited from another function.
:param choose_sense: A callable produced by the strategy function which chooses and returns the sense square
:param choose_move: A callable produced by the strategy function which chooses and returns the move
:param while_we_wait: An optional callable produced by the strategy function which uses time between our turns
:param end_game: An optional callable produced by the strategy function which (typically) shuts down StockFish
:param pool_size: Number of processes to use when multiprocessing board set expansion and filtering
:param log_to_file: A boolean flag to turn on/off logging to file gameLogs/StrangeFish.log
:param save_debug_history: A boolean flag to turn on/off the generation of a turn-by-turn internal history
:param rc_disable_pbar: A boolean flag to turn on/off the tqdm progress bars
"""
self._choose_sense = choose_sense
self._choose_move = choose_move
self._while_we_wait = while_we_wait
self._end_game = end_game
self._populate_next_board_set = populate_next_board_set
self.batch_size = batch_size
self.fast_divide_factor = fast_divide_factor
self.modifier = modifier
# self._get_next_boards_and_capture_squares = get_next_boards_and_capture_squares
self.boards: DefaultDict[float] = defaultdict(float)
self.next_turn_boards: DefaultDict[DefaultDict[float]] = defaultdict(lambda: defaultdict(float))
self.next_turn_boards_unsorted: DefaultDict[float] = defaultdict(float)
self.color = None
self.turn_num = None
self.pool = mp.Pool(pool_size)
self.save_debug_history = save_debug_history
self.debug_memory = []
self.rc_disable_pbar = rc_disable_pbar
self.timeout_resign = None # flag used to skip later turn processes if we have run out of time
self.logger = create_main_logger(log_to_file=log_to_file)
self.logger.debug("A new StrangeFish player was initialized.")
self.move_number=0
self.sense_number=0
self.move_list=move_list
self.sense_list=sense_list
self.allfen={}
def _game_state_log(self, step_name='-'): # Save game state for advanced replay
if self.save_debug_history:
info = {
'name': __name__,
'color': chess.COLOR_NAMES[self.color],
'turn': self.turn_num,
'step': step_name,
'boards': list(self.boards),
}
self.debug_memory.append(info)
def _emergency_plan(self): # Switch to emergency backup plan
self.boards: DefaultDict[float] = defaultdict(float)
self.next_turn_boards: DefaultDict[DefaultDict[float]] = defaultdict(lambda: defaultdict(float))
self._choose_sense, self._choose_move = contingency_strategy()
setattr(self, 'while_we_wait', None)
def get_debug_history(self): # Get possible board states from each turn
return self.debug_memory
def fen_to_board(self,fen,prob):
print(fen," ",prob)
board = []
for row in fen.split('/'):
brow = []
for c in row:
if c == ' ':
break
elif c in '12345678':
brow.extend( ['--'] * int(c) )
elif c == 'p':
brow.append( 'bp' )
elif c == 'P':
brow.append( 'wp' )
elif c > 'Z':
brow.append( 'b'+c.upper() )
else:
brow.append( 'w'+c )
board.append( brow )
return board
def handle_game_start(self, color: Color, board: chess.Board, opponent_name: str):
color_name = chess.COLOR_NAMES[color]
self.logger.info('Starting a new game as %s against %s.', color_name, opponent_name)
self.boards = {board.epd(): 1}
self.color = color
self.turn_num = 0
self.timeout_resign = False
# Save game state for advanced replay
if self.color == chess.BLACK:
self._game_state_log()
self._game_state_log()
def handle_opponent_move_result(self, captured_my_piece: bool, capture_square: Optional[Square]):
self.turn_num += 1
self.logger.debug("Starting turn %d.", self.turn_num)
# Do not "handle_opponent_move_result" if no one has moved yet
if self.turn_num == 1 and self.color == chess.WHITE:
self._game_state_log()
return
if captured_my_piece:
self.logger.debug('Opponent captured my piece at %s.', chess.SQUARE_NAMES[capture_square])
else:
self.logger.debug("Opponent's move was not a capture.")
self.logger.debug('Already calculated scores for %d possible boards, '
'approximately %d x %d = %d boards left to analyse.',
len(self.next_turn_boards[None]), len(self.boards),
AVG_BOARD_EXP, (AVG_BOARD_EXP * len(self.boards)))
# Check for board set over-growth and switch to emergency plan if needed
if not captured_my_piece and \
(len(self.next_turn_boards[None]) + (AVG_BOARD_EXP * len(self.boards))) > BOARD_SET_LIMIT:
self.logger.warning("Board set grew too large, switching to contingency plan. "
"Set size expected to grow to %d; limit is %d",
len(self.next_turn_boards[None]) + (AVG_BOARD_EXP * len(self.boards)),
BOARD_SET_LIMIT)
self._emergency_plan()
# If creation of new board set didn't complete during op's turn (self.boards will not be empty)
if self.boards:
new_board_set = self._populate_next_board_set(self.boards, self.color,
rc_disable_pbar=self.rc_disable_pbar, pool=self.pool, turn_num = self.turn_num)
for square in new_board_set.keys():
self.next_turn_boards[square] = union_dict(self.next_turn_boards[square], new_board_set[square])
# Get this turn's board set from a dictionary keyed by the possible capture squares
self.boards = self.next_turn_boards[capture_square]
total_prob = sum(self.boards.values())
self.boards = {board_epd: prob/total_prob for board_epd, prob in self.boards.items()}
self.logger.debug('Finished expanding and filtering the set of possible board states. '
'There are %d possible boards at the start of our turn %d.',
len(self.boards), self.turn_num)
# Save game state for advanced replay
self._game_state_log('post-op-move')
def choose_sense(self, sense_actions: List[Square], move_actions: List[chess.Move], seconds_left: float
) -> Optional[Square]:
# Check if time is up (or if we already changed to the emergency plan)
if not self.timeout_resign and seconds_left <= TIMEOUT_RESIGN_LIMIT:
self.logger.warning(f'Time is nearly up, go to backup plan.')
self._emergency_plan()
self.timeout_resign = True
self.logger.debug('Choosing a sensing square for turn %d with %d boards and %.0f seconds remaining.',
self.turn_num, len(self.boards), seconds_left)
# The option to pass isn't included in the reconchess input
move_actions += [chess.Move.null()]
with Timer(self.logger.debug, 'choosing sense location'):
# Pass the needed information to the decision-making function to choose a sense square
# sense_choice = self._choose_sense(self.boards, self.color, sense_actions, move_actions, seconds_left, self.pool)
sense_choice = self._choose_sense(self.boards, self.color, sense_actions, move_actions, seconds_left, self.pool, self.turn_num)
self.logger.debug('Chose to sense %s', chess.SQUARE_NAMES[sense_choice] if sense_choice else 'nowhere')
if sense_choice:
print(sense_choice,chess.SQUARE_NAMES[sense_choice],chess.SQUARE_NAMES)
self.sense_number+=1
if self.sense_number<=len(self.sense_list):
print("--------------------Here-------------------",self.sense_number)
return self.sense_list[self.sense_number-1]
return sense_choice
def handle_sense_result(self, sense_result: List[Tuple[Square, Optional[chess.Piece]]]):
# Filter the possible board set to only boards which would have produced the observed sense result
num_before = len(self.boards)
i = tqdm(
self.boards.items(),
disable=self.rc_disable_pbar,
desc=f'{chess.COLOR_NAMES[self.color]} Filtering {len(self.boards)} boards by sense results',
unit='boards',
)
# self.boards = {board_epd: prob for board_epd, prob in
# map(partial(board_matches_sense, sense_result=sense_result), i)
# if board_epd is not None}
self.boards = {board_epd: prob for board_epd, prob in
self.pool.imap_unordered(partial(board_matches_sense, sense_result=sense_result), i)
if board_epd is not None}
total_prob = sum(self.boards.values())
self.boards = {board: prob/total_prob for board, prob in self.boards.items()}
self.logger.debug('There were %d possible boards before sensing and %d after.', num_before, len(self.boards))
# Save game state for advanced replay
self._game_state_log('post-sense')
def choose_move(self, move_actions: List[chess.Move], seconds_left: float) -> Optional[chess.Move]:
# Currently, move_actions is passed by reference, so if we add the null move here it will be in the list twice
# since we added it in choose_sense also. Instead of removing this line altogether, I'm leaving a check so we
# are prepared in the case that reconchess is updated to pass a copy of the move_actions list instead.
if chess.Move.null() not in move_actions:
move_actions += [chess.Move.null()]
self.logger.debug('Choosing move for turn %d from %d moves over %d boards with %.2f seconds remaining.',
self.turn_num, len(move_actions), len(self.boards), seconds_left)
# print(f"Choosing move for turn {self.turn_num}")
with Timer(self.logger.debug, 'choosing move'):
# Pass the needed information to the decision-making function to choose a move
# move_choice = self._choose_move(self.boards, self.color, move_actions, seconds_left, self.pool)
move_choice = self._choose_move(self.boards, self.color, move_actions, seconds_left, self.pool, self.turn_num)
print("move_choice=",move_choice)
self.logger.debug('The chosen move was %s', move_choice)
# reconchess uses None for the null move, so correct the function output if that was our choice
#print the top 3 boardset
# k = Counter(self.boards)
# topk=k.most_common(3)
# if self.move_number>=16 and self.move_number<=18:
# fen="r1b2rk1/pp3ppp/2nQp3/4P3/8/2N3P1/PP4BP/3R1RqK w - -"
# if fen in self.boards:
# print(self.boards[fen])
# else :
# print("Not Found")
# for i in topk:
# pprint(self.fen_to_board(i[0],i[1]))
# print("\n\n")
self.allfen[self.move_number]=[]
for i in self.boards:
self.allfen[self.move_number].append((self.boards[i],i))
#print(self.allfen)
self.move_number+=1
if self.move_number<=len(self.move_list):
print("-----------------Here---------------",self.move_list[self.move_number-1])
return self.move_list[self.move_number-1]
return move_choice if move_choice != chess.Move.null() else None
def handle_move_result(self, requested_move: Optional[chess.Move], taken_move: Optional[chess.Move],
captured_opponent_piece: bool, capture_square: Optional[Square]):
self.logger.debug('The requested move was %s and the taken move was %s.', requested_move, taken_move)
if captured_opponent_piece:
self.logger.debug('Move %s was a capture!', taken_move)
num_boards_before_filtering = len(self.boards)
if requested_move is None:
requested_move = chess.Move.null()
if taken_move is None:
taken_move = chess.Move.null()
# Filter the possible board set to only boards on which the requested move would have resulted in the taken move
i = tqdm(
self.boards.items(),
disable=self.rc_disable_pbar,
desc=f'{chess.COLOR_NAMES[self.color]} Filtering {len(self.boards)} boards by move results',
unit='boards',
)
# boards_before = copy.deepcopy(self.boards)
self.boards = {
board_epd: prob for board_epd, prob in
# map(partial(move_would_happen_on_board, requested_move, taken_move,
# captured_opponent_piece, capture_square), i)
self.pool.imap_unordered(partial(move_would_happen_on_board, requested_move, taken_move,
captured_opponent_piece, capture_square), i)
if board_epd is not None
}
# if requested_move == chess.Move.null():
# print('checking boards in null move case')
# print(boards_before)
# print('new boards')
# print(self.boards)
total_prob = sum(self.boards.values())
self.boards = {board: prob/total_prob for board, prob in self.boards.items()}
self.logger.debug('There were %d possible boards before filtering and %d after.',
num_boards_before_filtering, len(self.boards))
# Save game state for advanced replay
self._game_state_log('post-move')
self._game_state_log()
# Re-initialize the set of boards for next turn (filled in while_we_wait and/or handle_opponent_move_result)
self.next_turn_boards = defaultdict(lambda: defaultdict(float))
self.next_turn_boards_unsorted = defaultdict(float)
def while_we_wait(self):
start_time = time()
# print(colored('www(ofbot) starts', 'blue'))
self.logger.debug('Running the "while_we_wait" method. '
f'{len(self.boards)} boards left to expand for next turn.')
our_king_square = chess.Board(tuple(self.boards)[0]).king(self.color) if len(self.boards) else None
while time() - start_time < WAIT_LOOP_RATE_LIMIT:
# If there are still boards in the set from last turn, remove one and expand it by all possible moves
if len(self.boards):
popped_boards = {}
while len(self.boards) and len(popped_boards) < self.batch_size:
board, prob = self.boards.popitem()
if prob > 0:
popped_boards[board] = prob
new_board_set = self._populate_next_board_set(popped_boards, self.color, rc_disable_pbar=True, pool=self.pool, turn_num = self.turn_num)
for square in new_board_set.keys():
self.next_turn_boards[square] = union_dict(self.next_turn_boards[square], new_board_set[square])
if square != our_king_square:
self.next_turn_boards_unsorted = union_dict(self.next_turn_boards_unsorted, new_board_set[square])
# If all of last turn's boards have been expanded, pass to the sense/move function's waiting method
elif self._while_we_wait:
self._while_we_wait(self.next_turn_boards_unsorted, self.color, self.turn_num)
def handle_game_end(self, winner_color: Optional[Color], win_reason: Optional[WinReason], game_history: GameHistory, op_name = "None"
):
self.logger.info('I %s by %s against %s with a fast_divide_factor of %d', "won" if winner_color == self.color else "lost",
win_reason.name if hasattr(win_reason, "name") else win_reason, op_name, self.fast_divide_factor)
#column names = ["opponent", "result", "fianchetto_color", "fast_divide_factor"]
if winner_color != True and winner_color != False:
result = "draw"
elif winner_color == self.color:
result = "win"
else:
result = "loss"
row = [op_name, result, "white" if self.color else "black", self.fast_divide_factor, self.modifier]
with open('fianchetto_game_results.csv', 'a') as csv_file:
writer_object = writer(csv_file)
writer_object.writerow(row)
csv_file.close()
self.pool.terminate()
self._end_game()