-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
197 lines (157 loc) · 10 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# By Pelmen#2920, made for Kaiserreich
# The script needs .json config and false positives .txt to work
import json
import logging
import os
import shutil
import subprocess
import sys
import time
import glob
import psutil
import webdriver_form_filler as wdff
# Configuration and reading files
logging.basicConfig(filename='loop_script.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', filemode='w')
APPLICATION_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__)
def read_input_file(filename: str) -> dict:
try:
if ".json" in filename:
with open(f"{APPLICATION_PATH}\\{filename}", 'r') as file:
return json.loads(file.read())
elif ".txt" in filename:
with open(f"{APPLICATION_PATH}\\{filename}", 'r') as file:
return file.read().split('\n')
except Exception as ex:
print(ex)
input(f"Error occurred, can't read input file {filename}")
# -------------------------------- CONFIG ------------------------------
config = read_input_file(filename="config.json") # Read config file and return a dict with config parameters
# 1 - Generate logs or not
generate_logs = config["generate_logs"]
# 1.1 - Logs generation settings
NUM_OF_INSTANCES = config["num_of_instances"] # Number of .exe instances
CPU_AFFINITIES_MODE = config["num_of_threads_per_exe"] if config["num_of_threads_per_exe"] in [2, 4] else 2 # Number of threads per exe. Defaults to 2
TIMES_TO_LAUNCH = config["times_to_launch"] # How many times to launch the game in a row
GAME_DURATION = config["game_duration_minutes"] * 60 # How long each game will last before killed
FALSE_POSITIVE_LINES = read_input_file(filename="false_positives.txt") # list with false positives from external file
# 1.2 Paths
HOI_PATH = config["hoi4_exe_fullpath"] # Path to hoi4 exe
LOGS_PATH = f"{os.environ.get('USERPROFILE')}\\Documents\\Paradox Interactive\\Hearts of Iron IV\\logs" if "custom_logs_path" not in config.keys() else config["custom_logs_path"]
# 1.3 Hoi4 args
ENABLE_CRASH_LOGGING = config["crash_logging"] # hoi4 arg
ENABLE_DEBUG_MODE = config["debug_mode"] # hoi4 arg
ENABLE_HANDS_OFF = True if "hands_off" not in config.keys() else config["hands_off"] # hoi4 arg
affinities_list_for_2_threads_per_exe = [[i, i + 1] for i in [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]]
affinities_list_for_4_threads_per_exe = [[i, i + 1, i + 2, i + 3] for i in [0, 4, 8, 12, 16, 18, 20]]
if "cpu_affinities_override" in config.keys():
CPU_AFFINITIES_LIST = json.loads(config["cpu_affinities_override"])
else:
CPU_AFFINITIES_LIST = affinities_list_for_4_threads_per_exe if CPU_AFFINITIES_MODE == 4 else affinities_list_for_2_threads_per_exe
# 2 - Process logs and send data to google forms
publish_to_google_form = config["publish_to_google_form"]
raw_backup_files_path = f"{APPLICATION_PATH}\\game_log_files\\backup"
# 3 - Additionally clean generated error.log files if needed
if "clean_error_log_files" in config.keys():
clean_error_log_files = config["clean_error_log_files"]
else:
clean_error_log_files = False
# -------------------------------- Functions code ------------------------------
def launch_the_game(run: int, instance: int, cpu_affinity: list):
"""Function that governs game running
Args:
run (int): Number of iteration. Used for logging and files naming
instance (int): Instance number.
cpu_affinity (list): What CPU affinity to assign to the particular instance.
Returns:
game (obj): Game process object. Used to kill it later
"""
game_setup = f'{HOI_PATH} -nolauncher {"-hands_off" if ENABLE_HANDS_OFF else "-debug_smooth=no -start_tag=BHU -start_speed=4"} {"-crash_data_log" if ENABLE_CRASH_LOGGING else ""} {"-debug" if ENABLE_DEBUG_MODE else ""} -historical=no -logpostfix=_{instance}'
game = subprocess.Popen(game_setup)
process_id = game.pid
game_process = psutil.Process(pid=process_id)
game_process.cpu_affinity(cpu_affinity)
print(f"{time.strftime('%H:%M', time.localtime())} - Started the run #{run}/{TIMES_TO_LAUNCH}, run duration - {GAME_DURATION} seconds, logpostfix - _{instance}")
logging.info(f"Started the run #{run}/{TIMES_TO_LAUNCH}, run duration - {GAME_DURATION} seconds, args - {game_setup}")
return game
def process_error_log(filepath):
print(f'Processing error.log {os.path.basename(filepath)}')
logging.info(f'Processing error.log {os.path.basename(filepath)}')
with open(filepath, 'r', encoding='utf-8-sig') as initial_error_log: # Open log file
initial_error_log = initial_error_log.read()
initial_error_log = initial_error_log.split("\n")
cleared_error_log = initial_error_log.copy()
for line in initial_error_log: # Remove lines that contain text from false positives
for check in FALSE_POSITIVE_LINES:
if check in line:
try:
cleared_error_log.remove(line)
except Exception:
continue
os.remove(filepath)
with open(filepath, 'w', encoding='utf-8-sig') as new_file: # Create cleaned log file
new_file.write("\n".join(cleared_error_log))
def copy_log_file(run: int, instance: int, log_type: str):
if log_type == "game_log":
shutil.copyfile(f"{LOGS_PATH}\\game_{instance}.log", f"{APPLICATION_PATH}\\game_log_files\\game_log_instance_{instance}_run#{run}_{time.strftime('%Y%m%d_%H%M', time.localtime())}.log")
shutil.copyfile(f"{LOGS_PATH}\\game_{instance}.log", f"{APPLICATION_PATH}\\error_log_files\\game_log_instance_{instance}_run#{run}_{time.strftime('%Y%m%d_%H%M', time.localtime())}.log")
elif log_type == "error_log":
error_log_name = f"error_log_instance_{instance}_run#{run}_{time.strftime('%Y%m%d_%H%M', time.localtime())}.log"
error_log_path = f"{APPLICATION_PATH}\\error_log_files\\{error_log_name}"
shutil.copyfile(f"{LOGS_PATH}\\error_{instance}.log", error_log_path)
return error_log_path
def main():
print("Hoi4 launch script/form filler by Pelmen#2920. Make sure you set correct paths in config file. Starting the script...\n\n")
if generate_logs:
for run in range(1, TIMES_TO_LAUNCH + 1):
try:
game_processes_list = []
for instance in range(NUM_OF_INSTANCES): # 1. Launch all games
game_processes_list.append(launch_the_game(run, instance, CPU_AFFINITIES_LIST[instance]))
time.sleep(GAME_DURATION)
for game in game_processes_list: # 1.1. Kill all games
game.kill()
except Exception as ex:
logging.error(ex)
print(ex)
continue
for instance in range(NUM_OF_INSTANCES): # 1.2. Process logs for all instances
try:
copy_log_file(run, instance, log_type="game_log") # Copy game.log file to storage
error_log_path = copy_log_file(run, instance, log_type="error_log") # Copy error.log file to storage
process_error_log(error_log_path) # Process error.log
print(f"{time.strftime('%H:%M', time.localtime())} - The run #{run}/{TIMES_TO_LAUNCH} is over!")
logging.info(f"The run #{run}/{TIMES_TO_LAUNCH} is over!")
except Exception as ex:
logging.error(ex)
print(ex)
continue
if publish_to_google_form:
try:
for filename in glob.iglob(APPLICATION_PATH + '\\game_log_files\\game**.log'):
print(f"{time.strftime('%H:%M', time.localtime())} - Processing {os.path.basename(filename)}")
with open(filename, 'r', encoding='utf-8-sig') as file:
log_file = file.read().split('\n') # Extract lines from game.log
x = {line.split(";")[1]: line.split(";")[2] for line in log_file if "KR_Event_Logging" in line}
data_to_report = dict(sorted(x.items(), key=lambda item: item[1][-4:])) # Sort log dict to get correct ending date
data_to_report["END"] = list(data_to_report.values())[-1]
data_to_report.update({line.split(": ")[1]: "8:00, 1 March, 1937" for line in log_file if "_data" in line})
for key, value in data_to_report.items():
if "_data" not in key:
print("\t\t{: <40} {: <40}".format(key, value)) # Print info in console
wdff.fill_google_form(wdff.extract_data_for_webdriver_script(log_data=data_to_report)) # Send logs to forms
shutil.move(filename, raw_backup_files_path)
except Exception as ex:
print(ex)
if clean_error_log_files:
try:
for filename in glob.iglob(APPLICATION_PATH + '\\error_log_files\\error**.log'):
print(f"{time.strftime('%H:%M', time.localtime())} - Processing {os.path.basename(filename)}")
process_error_log(filename)
except Exception as ex:
logging.error(ex)
print(ex)
print(f"{time.strftime('%H:%M', time.localtime())} - The session is finished")
logging.info("The session is finished")
input("Press any key to exit")
if __name__ == '__main__':
main()