-
Notifications
You must be signed in to change notification settings - Fork 43
/
lichess-listener.py
92 lines (72 loc) · 3.17 KB
/
lichess-listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""Stream listener for Irwin. Acts on player status updates, and analysis requests"""
from default_imports import *
from conf.ConfigWrapper import ConfigWrapper
import requests
from requests.exceptions import ChunkedEncodingError, ConnectionError
from requests.packages.urllib3.exceptions import NewConnectionError, ProtocolError, MaxRetryError
from http.client import IncompleteRead
from socket import gaierror
from webapp.Env import Env
from modules import http
from modules.lichess.Request import Request
from modules.queue.EngineQueue import EngineQueue
import json
import argparse
import logging
import sys
from datetime import datetime, timedelta
from time import sleep
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--quiet", dest="loglevel",
default=logging.DEBUG, action="store_const", const=logging.INFO,
help="reduce the number of logged messages")
settings = parser.parse_args()
logging.basicConfig(format="%(message)s", level=settings.loglevel, stream=sys.stdout)
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARNING)
logging.getLogger("chess.uci").setLevel(logging.WARNING)
logging.getLogger("modules.fishnet.fishnet").setLevel(logging.WARNING)
config = ConfigWrapper.new('conf/server_config.json')
env = Env(config)
"""
Possible messages that lichess will emit
{'t':'request', 'origin': 'moderator', 'user': {'id': 'userId', 'titled': bool, 'engine': bool, 'games': int}, 'games': [<game>]}
"""
def handleLine(payload: Dict):
request = Request.fromJson(payload)
playerId = request.player.id
if request is not None:
logging.info(f'Processing request for {request.player}')
# store user
env.gameApi.writePlayer(request.player)
# store games
env.gameApi.writeGames(request.games)
existingEngineQueue = env.queue.engineQueueById(playerId)
newEngineQueue = EngineQueue.new(
playerId=playerId,
origin=request.origin,
gamesAndPredictions=list(zip(request.games, env.irwin.basicGameModel.predict(playerId, request.games))))
if existingEngineQueue is not None and not existingEngineQueue.completed:
newEngineQueue = EngineQueue.merge(existingEngineQueue, newEngineQueue)
requiredGames = env.gameApi.gamesForAnalysis(playerId, newEngineQueue.requiredGameIds)
if len(requiredGames) > 0:
env.queue.queueEngineAnalysis(newEngineQueue)
session = http.get_requests_session_with_keepalive()
while True:
try:
r = session.get(
config.api.url + 'api/stream/irwin',
headers = {
'User-Agent': 'Irwin',
'Authorization': f'Bearer {config.api.token}'
},
stream = True
)
for line in r.iter_lines():
try:
payload = json.loads(line.decode("utf-8"))
handleLine(payload)
except json.decoder.JSONDecodeError:
logging.warning(f"Failed to decode: {line.text}")
except (ChunkedEncodingError, ConnectionError, NewConnectionError, ProtocolError, MaxRetryError, IncompleteRead, gaierror):
sleep(5)
continue