-
Notifications
You must be signed in to change notification settings - Fork 0
/
game.py
223 lines (170 loc) · 6.8 KB
/
game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
""" Air Hockey Training Simulator """
import argparse
import json
import logging
import os
import random
import sys
import time
from datetime import datetime
from typing import Dict, Tuple, Union
import numpy as np
import redis
from pytz import timezone
from environment import AirHockey
from lib.agents import Agent
from lib.strategy import Strategy
from lib.types import Observation, State
from lib.utils.connect import RedisConnection
from lib.utils.io import get_runid, record_data, record_data_csv
# Initiate Logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Game:
def __init__(self, args: Dict[str, Union[str, int, bool]]):
# Parse cli args
self.args = args
# Set up Redis
self.redis = RedisConnection()
# Load Environment
self.env = AirHockey()
runid, path = get_runid(os.path.join("/", "data", "air-hockey", "output"))
logger.info(f"Run id: {runid}")
logger.info(f"Run path: {path}")
os.environ["LOAD_RUN"] = self.args.get("run") # Past run to load and use
os.environ["PROJECT"] = path
# Set up our robot
self.robot = Strategy.make(env=self.env, strategy=self.args.get("model"), train=self.args.get("play"))
self.robot.name = "robot"
# Set up our opponent. The opponent can also be a human player.
if self.args.get("human"):
self.opponent = Agent(self.env)
self.opponent.name = "human"
# Save model architectures and rewards with an unique run id
record_data(self.args.get("model"))
# We begin..
self.init = True
# Cumulative scores, Cumulative wins
self.robot_cumulative_score, self.opponent_cumulative_score = 0, 0
self.robot_cumulative_win, self.opponent_cumulative_win = 0, 0
# Initial time
if self.args.get("play"):
logger.info(f"Game mode: Train")
self.time = time.time()
self.wait = (60 ** 2) * float(self.args.get("time")) # Defaults to 3 hours
logger.info(f"Training time: {self.args.get('time')} hours")
@property
def human_location(self) -> Tuple[int, int]:
""" Get human location from redis """
retval = self.redis.get("new-opponent-location")
location = tuple([retval["new-opponent-location"]["x"], retval["new-opponent-location"]["y"]])
return location
def stats(self) -> None:
""" Record training stats """
results = {
"created_at": datetime.now(timezone("America/Chicago")),
"robot_goal": 0,
"opponent_goal": 0,
"robot_win": 0,
"opponent_win": 0,
}
if self.env.robot_score > self.robot_cumulative_score:
results["robot_goal"] = 1
self.robot_cumulative_score += 1
if self.env.opponent_score > self.opponent_cumulative_score:
results["opponent_goal"] = 1
self.opponent_cumulative_score += 1
if self.env.robot_score == 10:
results["robot_win"] = 1
self.robot_cumulative_score = 0
self.opponent_cumulative_score = 0
if self.env.opponent_score == 10:
results["opponent_win"] = 1
self.robot_cumulative_score = 0
self.opponent_cumulative_score = 0
# Save to csv
record_data_csv("scores", results)
def robot_player(self) -> None:
""" Main player """
# For first move, move in a random direction
if self.init:
# Continuous actions
if getattr(self.robot, "continuous", False):
action = np.random.uniform(-3, 3), np.random.uniform(-3, 3)
else:
# Disrete actions
action = np.random.randint(0, 4)
# Update game state
self.robot.move(action)
self.init = False
else:
# Determine next action
action = self.robot.get_action()
# Update game state
self.robot.move(action)
# Take a new step in the MDP
score, observation = self.robot.step(action)
# Update environment if the action we took was one that scores
self.env.update_score(score)
# Save stats to CSV
self.stats()
return None
def human_player(self) -> None:
""" Opponent player """
# If the opponent is human
self.opponent.move(self.human_location)
return None
def play(self) -> None:
""" Play a round for training """
# Our Agent
self.robot_player()
# Our opponent
if self.args.get("human"):
self.human_player()
else:
self.env.update_state(agent_name="computer")
# Compute scores
if self.env.opponent_score == 10:
logger.info(f"Robot {self.env.robot_score}, Computer {self.env.opponent_score}")
logger.info("Computer wins!")
self.env.reset(total=True)
if self.env.robot_score == 10:
logger.info(f"Robot {self.env.robot_score}, Computer {self.env.opponent_score} ")
logger.info("Robot wins!")
self.env.reset(total=True)
def run(self) -> None:
""" Main guts of training """
# Game loop
while True:
# Train for an alotted amount of time
if self.args.get("train") and time.time() - self.time > self.wait:
logger.info("Training time elasped")
sys.exit()
# Alert the positions are different
self.redis.publish("position-update")
# Play a frame
self.play()
if self.args.get("fps") > 0:
time.sleep(1 / self.args.get("fps"))
if __name__ == "__main__":
""" Start Training """
parser = argparse.ArgumentParser(description="Process stuff for training.")
parser.add_argument("--model", type=str, help="Robot strategy")
parser.add_argument("--run", type=str, default="", help="Specific past run for robot strategy to use")
parser.add_argument("--time", default=3, type=float, help="Time per train. Units in hours. (Default to 3 hours)")
parser.add_argument("--fps", default=-1, type=int, help="Frame per second")
parser.add_argument("--human", action="store_true", help="Human players")
parser.add_argument("--play", action="store_false", help="Play game instead of train")
args = vars(parser.parse_args())
# Validation
if not args.get("model"):
logger.error("Robot strategy Undefined")
sys.exit()
# Run program
try:
game = Game(args)
except redis.ConnectionError:
logger.error("Cannot connect to Redis. Please make sure Redis is up and active.")
sys.exit()
game.run()