This repository has been archived by the owner on Oct 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
player.py
221 lines (177 loc) · 7.32 KB
/
player.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# -*- coding: utf-8 -*-
# @Time : 2021/3/8 18:51
# @Author : He Ruizhi
# @File : player.py
# @Software: PyCharm
from threading import Thread
import numpy as np
from time import sleep
from mcts import MCTS, evaluate_rollout
from policy_value_net import PolicyValueNet
import paddle
import os
class Player:
def __init__(self):
# 是否允许启动线程计算下一步action标记
self.allow = True
# 下一步action
self.action = None
# Player名字
self.name = 'Player'
# 该Player是否有效,用于提前退出计算循环
self.valid = True
# 表明落子计算进度的量(仅在Player为MCTS或AlphaGo时生效)
self.speed = None
def play(self, game):
if self.allow and self.action is None:
self.allow = False
# daemon=True可以使得主线程结束时,所有子线程全部退出,使得点击退出游戏按钮后,不用等待子线程结束
Thread(target=self.step, args=(game, ), daemon=True).start()
def step(self, game):
"""
根据当前游戏状态,获得执行动作
:param game: 游戏模拟器对象
:return:
"""
print('Hello!')
class HumanPlayer(Player):
def __init__(self):
super().__init__()
self.name = '人类玩家'
class RandomPlayer(Player):
def __init__(self):
super().__init__()
self.name = '随机落子'
def step(self, game):
sleep(1)
self.action = self.get_action(game)
@staticmethod
def get_action(game):
valid_move_idcs = game.game_state.advanced_valid_move_idcs()
if len(valid_move_idcs) > 1:
valid_move_idcs = valid_move_idcs[:-1]
action = np.random.choice(valid_move_idcs)
return action
class MCTSPlayer(Player):
def __init__(self, c_puct=5, n_playout=20):
super().__init__()
self.name = '蒙特卡洛{}'.format(n_playout)
def rollout_policy_fn(game_state_simulator):
# 选择随机动作
availables = game_state_simulator.valid_move_idcs()
action_probs = np.random.rand(len(availables))
return zip(availables, action_probs)
def policy_value_fn(game_state_simulator):
# 返回均匀概率及通过随机方法获得的节点价值
availables = game_state_simulator.valid_move_idcs()
action_probs = np.ones(len(availables)) / len(availables)
return zip(availables, action_probs), evaluate_rollout(game_state_simulator, rollout_policy_fn)
self.mcts = MCTS(policy_value_fn, c_puct, n_playout)
def step(self, game):
action = self.get_action(game)
if action == -1:
action = None
self.allow = True
self.action = action
# 获得动作后将速度区域清空
self.speed = (0, 1)
def reset_player(self):
self.mcts.update_with_move(-1)
def get_action(self, game):
move = self.mcts.get_move(game, self)
self.mcts.update_with_move(-1)
return move
class AlphaGoPlayer(Player):
def __init__(self, model_path='models/pdparams', c_puct=5, n_playout=400, is_selfplay=False):
super(AlphaGoPlayer, self).__init__()
if model_path == 'models/alpha_go.pdparams':
self.name = '阿尔法狗'
elif model_path == 'models/my_alpha_go.pdparams':
self.name = '幼生阿尔法狗'
else:
self.name = '预期之外的错误名称'
self.policy_value_net = PolicyValueNet()
self.policy_value_net.eval()
if os.path.exists(model_path):
state_dict = paddle.load(model_path)
self.policy_value_net.set_state_dict(state_dict)
self.mcts = MCTS(self.policy_value_net.policy_value_fn, c_puct, n_playout)
self.is_selfplay = is_selfplay
def reset_player(self):
self.mcts.update_with_move(-1)
def step(self, game):
action = self.get_action(game)
if action == -1:
action = None
self.allow = True
self.action = action
self.speed = (0, 1)
def get_action(self, game, temp=1e-3, return_probs=False):
move_probs = np.zeros(game.board_size ** 2 + 1)
acts, probs = self.mcts.get_move_probs(game, temp, self)
if acts == -1 and probs == -1:
return -1
move_probs[list(acts)] = probs
if self.is_selfplay:
# 增加Dirichlet噪声用于探索(在训练时候)
move = np.random.choice(acts, p=0.75*probs + 0.25*np.random.dirichlet(0.3*np.ones(len(probs))))
# 更新蒙特卡洛搜索树
self.mcts.update_with_move(move) # 因为在生成自对弈棋谱时,落子是黑白交替,均由自己做出决策
else:
move = np.random.choice(acts, p=probs)
self.mcts.update_with_move(-1) # 与其它对手对弈时,只控制黑方或白方落子,因此每步均置为-1
if return_probs:
return move, move_probs
else:
return move
class PolicyNetPlayer(Player):
def __init__(self, model_path='models/model.pdparams'):
super(PolicyNetPlayer, self).__init__()
self.name = '策略网络'
self.policy_value_net = PolicyValueNet()
if os.path.exists(model_path):
state_dict = paddle.load(model_path)
self.policy_value_net.set_state_dict(state_dict)
self.policy_value_net.eval()
def step(self, game):
sleep(1)
self.action = self.get_action(game)
def get_action(self, game):
valid_moves = game.game_state.valid_moves()
valid_moves = paddle.to_tensor(valid_moves)
current_state = game.game_state.get_board_state()
current_state = paddle.to_tensor([current_state], dtype='float32')
probs, _ = self.policy_value_net(current_state)
probs = probs[0]
probs *= valid_moves
probs = probs / paddle.sum(probs)
action = np.random.choice(range(82), p=probs.numpy())
return action
class ValueNetPlayer(Player):
def __init__(self, model_path='models/model.pdparams'):
super(ValueNetPlayer, self).__init__()
self.name = '价值网络'
self.policy_value_net = PolicyValueNet()
if os.path.exists(model_path):
state_dict = paddle.load(model_path)
self.policy_value_net.set_state_dict(state_dict)
self.policy_value_net.eval()
def step(self, game):
sleep(1)
self.action = self.get_action(game)
def get_action(self, game):
valid_move_idcs = game.game_state.valid_move_idcs()
# 计算所有可落子位置,对手的局面价值,选择对手局面价值最小的落子
max_value = 1
action = game.board_size ** 2
for simulate_action in valid_move_idcs:
simulate_game_state = game.game_state_simulator()
simulate_game_state.step(simulate_action)
current_state = simulate_game_state.get_board_state()
current_state = paddle.to_tensor([current_state], dtype='float32')
_, value = self.policy_value_net(current_state)
value = value.numpy().flatten()[0]
if value < max_value:
max_value = value
action = simulate_action
return action