-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwsgi.py
67 lines (46 loc) · 1.49 KB
/
wsgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import gevent.monkey
# Monkey patching을 활성화합니다.
gevent.monkey.patch_all()
from app import create_app
import socketio
from gevent.lock import Semaphore
import sys
socket_lock = Semaphore(1)
socket_io = socketio.Server(
async_mode="gevent",
cors_allowed_origins="*",
logger=True,
)
application = create_app()
application = socketio.WSGIApp(socket_io, application)
#### connect && disconnect ####
@socket_io.on("connect")
def handle_connect(sid, environ, auth=None):
if not auth or not auth["id"]:
print("auth error: ", auth, flush=True)
return False
try:
int(auth["id"])
except:
print("auth error: ", auth, flush=True)
return False
from app.socket import socketService as socketServ
socketServ.handle_connect(int(auth["id"]), sid)
@socket_io.on("disconnect")
def handle_disconnect(sid):
from app.socket import socketService as socketServ
socketServ.handle_disconnect(sid)
# #### chat ####
@socket_io.on("send_message")
def send_message(sid, data):
from app.socket import socketService as socketServ
socketServ.send_message(data, sid)
@socket_io.on("read_message")
def read_message(sid, data):
from app.socket import socketService as socketServ
socketServ.read_message(data, sid)
if __name__ == "__main__":
# # [TEST]
# application = create_app()
# application = socketio.WSGIApp(socket_io, application)
socket_io.run(application, debug=application.config["DEBUG"])