Skip to content

Commit

Permalink
Add handling of connections not finishing to set up within 5minutes t…
Browse files Browse the repository at this point in the history
…o stop loops of constant connecting
  • Loading branch information
Grennith committed Jan 21, 2024
1 parent d09c85e commit 5c34765
Showing 1 changed file with 65 additions and 57 deletions.
122 changes: 65 additions & 57 deletions mapadroid/websocket/WebsocketServer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import random as rand
import time
from asyncio import CancelledError
from typing import Dict, List, Optional, Set, Tuple

Expand Down Expand Up @@ -36,6 +37,8 @@
from mapadroid.worker.Worker import Worker
from mapadroid.worker.WorkerState import WorkerState

CONNECTING_TIMEOUT = 300

logging.getLogger('websockets.server').setLevel(logging.DEBUG)
logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
logging.getLogger('websockets.server').addHandler(InterceptHandler(log_section=LoggerEnums.websocket))
Expand Down Expand Up @@ -65,7 +68,7 @@ def __init__(self, args, mitm_mapper: AbstractMitmMapper, stats_handler: Abstrac
# Do think twice before plainly accessing, there's locks to be used
self.__current_users: Dict[str, WebsocketConnectedClientEntry] = {}
self.__current_users_mutex: Optional[asyncio.Lock] = None
self.__users_connecting: Set[str] = set()
self.__users_connecting: Dict[str, int] = {}
self.__users_connecting_mutex: Optional[asyncio.Lock] = None

self.__strategy_factory: StrategyFactory = StrategyFactory(self.__args, self.__mapping_manager,
Expand Down Expand Up @@ -144,66 +147,15 @@ async def __connection_handler(self, websocket_client_connection: websockets.Web
with logger.contextualize(identifier=origin, name="websocket"):
logger.info("New connection from {}", websocket_client_connection.remote_address)
async with self.__users_connecting_mutex:
if origin in self.__users_connecting:
# TODO: Limit the timeframe within a device has to be connected...
if origin in self.__users_connecting and self.__users_connecting[origin] + CONNECTING_TIMEOUT > int(time.time()):
logger.info("Client is already connecting")
return
else:
self.__users_connecting.add(origin)
self.__users_connecting[origin] = int(time.time())
entry: Optional[WebsocketConnectedClientEntry] = None
try:
device: Optional[SettingsDevice] = None
device_paused: bool = self.__enable_configmode
device_id: int = -1
if not self.__enable_configmode:
logger.debug("Fetching device settings")
async with self.__db_wrapper as session, session:
device = await SettingsDeviceHelper.get_by_origin(session, self.__db_wrapper.get_instance_id(),
origin)
if not device:
logger.warning("Device {} unknown", origin)
return
else:
device_id = device.device_id
logger.debug("Checking if device is active")
if not await self.__mapping_manager.is_device_active(device.device_id):
logger.warning('Origin is currently paused. Unpause through MADmin to begin working')
device_paused = True

async with self.__current_users_mutex:
logger.debug("Checking if an entry is already present")
entry = self.__current_users.get(origin, None)

# First check if an entry is present, worker running etc...
if entry and entry.websocket_client_connection:
await self.__handle_existing_connection(entry, origin)
entry.websocket_client_connection = websocket_client_connection
elif not entry:
async with self.__db_wrapper as session, session:
current_auth: Optional[SettingsPogoauth] = await SettingsPogoauthHelper \
.get_assigned_to_device(session, device_id)
if current_auth:
session.expunge(current_auth)
# Just create a new entry...
worker_state: WorkerState = WorkerState(origin=origin,
device_id=device_id,
stop_worker_event=asyncio.Event(),
pogo_windows=self.__pogo_window_manager,
active_event=self.__pogo_event,
current_auth=current_auth)
entry = WebsocketConnectedClientEntry(origin=origin,
websocket_client_connection=websocket_client_connection,
worker_instance=None,
worker_state=worker_state)
self.__current_users[origin] = entry

# No connection known or already at a point where we can continue creating worker
# -> we can just create a new task
if not await self.__add_worker_and_thread_to_entry(entry, origin, use_configmode=device_paused):
logger.warning("Failed to start worker for {}", origin)
raise WebsocketAbortRegistrationException("Failed starting worker")
else:
logger.info("Worker added/started successfully for {}", origin)
entry = await self.__setup_client_entry(origin=origin,
websocket_client_connection=websocket_client_connection)
except WebsocketAbortRegistrationException:
await asyncio.sleep(rand.uniform(3, 15))
return
Expand All @@ -226,10 +178,66 @@ async def __connection_handler(self, websocket_client_connection: websockets.Web

logger.info("Done with connection ({})", websocket_client_connection.remote_address)

async def __setup_client_entry(self, origin: str, websocket_client_connection: websockets.WebSocketClientProtocol) \
-> Optional[WebsocketConnectedClientEntry]:
device: Optional[SettingsDevice] = None
device_paused: bool = self.__enable_configmode
device_id: int = -1
if not self.__enable_configmode:
logger.debug("Fetching device settings")
async with self.__db_wrapper as session, session:
device = await SettingsDeviceHelper.get_by_origin(session, self.__db_wrapper.get_instance_id(),
origin)
if not device:
logger.warning("Device {} unknown", origin)
return None
else:
device_id = device.device_id
logger.debug("Checking if device is active")
if not await self.__mapping_manager.is_device_active(device.device_id):
logger.warning('Origin is currently paused. Unpause through MADmin to begin working')
device_paused = True
entry: Optional[WebsocketConnectedClientEntry] = None
async with self.__current_users_mutex:
logger.debug("Checking if an entry is already present")
entry = self.__current_users.get(origin, None)

# First check if an entry is present, worker running etc...
if entry and entry.websocket_client_connection:
await self.__handle_existing_connection(entry, origin)
entry.websocket_client_connection = websocket_client_connection
elif not entry:
async with self.__db_wrapper as session, session:
current_auth: Optional[SettingsPogoauth] = await SettingsPogoauthHelper \
.get_assigned_to_device(session, device_id)
if current_auth:
session.expunge(current_auth)
# Just create a new entry...
worker_state: WorkerState = WorkerState(origin=origin,
device_id=device_id,
stop_worker_event=asyncio.Event(),
pogo_windows=self.__pogo_window_manager,
active_event=self.__pogo_event,
current_auth=current_auth)
entry = WebsocketConnectedClientEntry(origin=origin,
websocket_client_connection=websocket_client_connection,
worker_instance=None,
worker_state=worker_state)
self.__current_users[origin] = entry

# No connection known or already at a point where we can continue creating worker
# -> we can just create a new task
if not await self.__add_worker_and_thread_to_entry(entry, origin, use_configmode=device_paused):
logger.warning("Failed to start worker for {}", origin)
raise WebsocketAbortRegistrationException("Failed starting worker")
else:
logger.info("Worker added/started successfully for {}", origin)
return entry

async def __remove_from_users_connecting(self, origin):
async with self.__users_connecting_mutex:
if origin in self.__users_connecting:
self.__users_connecting.remove(origin)
self.__users_connecting.pop(origin)

async def __remove_from_current_users(self, origin):
async with self.__current_users_mutex:
Expand Down

0 comments on commit 5c34765

Please sign in to comment.