-
Notifications
You must be signed in to change notification settings - Fork 0
/
globals.py
235 lines (201 loc) · 6.34 KB
/
globals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import redis, time
import simplejson as json
from decimal import Decimal
from fastapi.security import HTTPBearer
from fastapi import Request
from datetime import datetime
token_auth_scheme = HTTPBearer()
# Config
with open("config.json", "r") as f:
config = json.load(f)
# Requests Log
with open("requests.json") as fp:
log = json.load(fp)
# Denied Log
with open("denied.json") as fd:
denied = json.load(fd)
# Initiate Redis connection
redis_client = redis.Redis(
host=config["REDIS"]["HOST"],
port=config["REDIS"]["PORT"],
password=config["REDIS"]["PASSWORD"],
)
tags_metadata = [
{
"name": "ck_bonus",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_checkpoints",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_latestrecords",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_maptier",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_playeroptions2",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_playerrank",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_playertemp",
"description": "All queries concerning this table from `queries.sp`",
},
# {
# "name": "ck_playertimes",
# "description": "All queries concerning this table from `queries.sp`",
# },
{
"name": "ck_prinfo",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_replays",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_spawnlocations",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "ck_zones",
"description": "All queries concerning this table from `queries.sp`",
},
{
"name": "strays",
"description": "All queries that were NOT contained in `queries.sp` for each table",
},
{
"name": "Points Calculation",
"description": "Queries that are involved in **player points** calculation",
},
{
"name": "Wipe",
"description": "Wiping player related data",
},
{
"name": "Refactored",
"description": "Queries that have been refactored or merged together",
},
]
# Whitelisted IPs
WHITELISTED_IPS = config["WHITELISTED_IPS"]
# All styles
# 0 = normal, 1 = SW, 2 = HSW, 3 = BW, 4 = Low-Gravity, 5 = Slow Motion, 6 = Fast Forward, 7 = Freestyle
all_styles = [
"Normal",
"Sideways",
"Half-Sideways",
"Backwards",
"Low-Gravity",
"Slow Motion",
"Fast Forward",
"Freestyle",
]
def append_request_log(request: Request):
"""Logs some general info about the request recieved in `requests.json`"""
log.append(
{
"url": str(request.url),
"ip": request.client.host,
"method": request.method,
"headers": dict(request.headers),
"time": str(datetime.now()),
}
)
with open("requests.json", "w") as json_file:
json.dump(log, json_file, indent=4, separators=(",", ": "))
def append_denied_log(request: Request):
"""Logs some general info about the denied request recieved in `denied.json`"""
denied.append(
{
"url": str(request.url),
"ip": request.client.host,
"method": request.method,
"cookies": request.cookies,
"headers": dict(request.headers),
"time": str(datetime.now()),
}
)
with open("denied.json", "w") as json_file:
json.dump(denied, json_file, indent=4, separators=(",", ": "))
def set_cache(cache_key: str, data):
"""Cache the data in Redis\n
`Decimal` values are converted to `String`\n
### Still returns `True` if Redis functionality is disabled"""
if config["REDIS"]["ENABLED"] == 0:
return True
redis_client.set(
cache_key,
json.dumps(
data,
use_decimal=True,
encoding="utf-8",
ensure_ascii=False,
default=default_serializer,
allow_nan=True,
),
ex=config["REDIS"]["EXPIRY"],
)
return True
def get_cache(cache_key: str):
"""Try and get cached data from Redis\n
### Still returns `None` if Redis functionality is disabled"""
if config["REDIS"]["ENABLED"] == 0:
return None
cached_data = redis_client.get(cache_key)
if cached_data:
# Return cached data
# print(json.loads(cached_data))
return cached_data
else:
return None
def ordinal(n):
suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]
if 11 <= (n % 100) <= 13:
suffix = "th"
return str(n) + suffix
def custom_date_format(dt):
day = ordinal(dt.day)
month = dt.strftime("%B")
year = dt.year
time = dt.strftime("%H:%M:%S")
return f"{day} of {month} {year}, {time}"
def custom_time_format(time_value):
# Convert to Decimal for precise arithmetic
time_value = Decimal(time_value)
# Calculate minutes and remaining seconds
minutes = int(time_value // 60)
seconds = time_value % 60
# Format the time
formatted_time = f"{minutes}:{seconds:06.4f}" if minutes > 0 else f"{seconds:.4f}"
return formatted_time
def default_serializer(obj):
if isinstance(obj, datetime):
return custom_date_format(obj)
elif isinstance(obj, Decimal):
return custom_time_format(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
def json_decimal(obj):
"""Convert all instances of `Decimal` to `String`
`"runtime": 14.7363` > `"runtime": "14.736300"`
`"runtime": 11.25` > `"runtime": "11.250000"`"""
if isinstance(obj, Decimal):
return str(obj)
elif isinstance(obj, list):
for i in range(len(obj)):
if isinstance(obj[i], dict):
for key, value in obj[i].items():
if isinstance(value, Decimal):
obj[i][key] = str(value)
return obj
# If it's neither a Decimal nor a list of dictionaries, return it as is
return str(obj)