-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection_manager.py
152 lines (104 loc) · 3.86 KB
/
connection_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from typing import Any, Union
from fastapi import WebSocket
from logger import Logger
class PairedList:
""" Dictionary like data structure without hashing and support for repeating pairs
"""
def __init__(self, *elements: tuple) -> None:
""" Initialise PairedList
Args:
elements (tuple): tuples of (key, value) pairs
"""
self.pairs = list(elements)
self.keys = [i[0] for i in self.pairs]
self.values = [i[1] for i in self.pairs]
def append(self, element: tuple) -> None:
""" Add key-value pair
Args:
element (tuple): tuple containing (key, value)
"""
if element[0] not in self.keys:
self.pairs.append(element)
self.keys = [i[0] for i in self.pairs]
self.values = [i[1] for i in self.pairs]
else:
key_index = self.keys.index(element[0])
self.values[key_index] = element[1]
self.pairs = list(zip(self.keys, self.values))
def remove(self, key: Any) -> None:
""" Delete key-value pair
Args:
key (Any): The key corresponding to the pair which is to be deleted
"""
key_index = self.keys.index(key)
self.keys.pop(key_index)
self.values.pop(key_index)
self.pairs = list(zip(self.keys, self.values))
def clear(self) -> None:
""" Remove all key-value pairs
"""
self.pairs.clear()
self.keys.clear()
self.values.clear()
def get(self, key: Any) -> Union[WebSocket, Any]:
""" Return the value corresponding to key in
Args:
key (Any): The key corresponding to the pair
Returns:
[Any]: Default: WebSocket object
"""
key_index = self.keys.index(key)
return self.values[key_index]
def __str__(self) -> str:
return str(self.pairs)
def __repr__(self) -> str:
return str(self.pairs)
class ConnectionManager:
""" Handles incoming and outgoing websocket connections and messages
"""
def __init__(self) -> None:
""" Initialise Connection Manger
"""
self.active_connections = PairedList()
self.logger = Logger("CHATROOM-SERVER-LOG", "chatroom_server.log")
self.logger.info("Connection Manager Initialised")
async def connect(self, uid: str, websocket: WebSocket) -> None:
""" Connect to websocket
Args:
uid (str): Unique Identification (UID) of user
websocket (WebSocket): WebSocket object
"""
try:
await websocket.accept()
self.active_connections.append((uid, websocket))
self.logger.info(f"Connected : {uid}")
except:
self.logger.error("Connection Failed")
def disconnect(self, uid: str) -> None:
""" Disconnect from user
Args:
uid (str): Unique Identification (UID) of user
"""
try:
self.active_connections.remove(uid)
self.logger.info(f"Disconnected : {uid}")
except:
pass
async def send_message(self, message: str, websocket: WebSocket) -> None:
""" Send message to websocket
Args:
message (str): Message to be sent
websocket (WebSocket): WebSocket object
"""
await websocket.send_text(message)
async def broadcast_message(self, sender_websocket: WebSocket, message: str) -> None:
""" Broadcast message to all users
Args:
sender_websocket (WebSocket): WebSocket object of sender
message (str): Message to be sent
"""
for connection in self.active_connections.values:
if sender_websocket == connection:
continue
else:
await connection.send_text(message)