From 5c3476514d3bd5022e37847d34b39f73461a2eca Mon Sep 17 00:00:00 2001 From: Till Skrodzki Date: Sun, 21 Jan 2024 05:54:51 +0100 Subject: [PATCH] Add handling of connections not finishing to set up within 5minutes to stop loops of constant connecting --- mapadroid/websocket/WebsocketServer.py | 122 +++++++++++++------------ 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/mapadroid/websocket/WebsocketServer.py b/mapadroid/websocket/WebsocketServer.py index 6b4083d4b..bbe3ec081 100644 --- a/mapadroid/websocket/WebsocketServer.py +++ b/mapadroid/websocket/WebsocketServer.py @@ -1,6 +1,7 @@ import asyncio import logging import random as rand +import time from asyncio import CancelledError from typing import Dict, List, Optional, Set, Tuple @@ -36,6 +37,8 @@ from mapadroid.worker.Worker import Worker from mapadroid.worker.WorkerState import WorkerState +CONNECTING_TIMEOUT = 300 + logging.getLogger('websockets.server').setLevel(logging.DEBUG) logging.getLogger('websockets.protocol').setLevel(logging.DEBUG) logging.getLogger('websockets.server').addHandler(InterceptHandler(log_section=LoggerEnums.websocket)) @@ -65,7 +68,7 @@ def __init__(self, args, mitm_mapper: AbstractMitmMapper, stats_handler: Abstrac # Do think twice before plainly accessing, there's locks to be used self.__current_users: Dict[str, WebsocketConnectedClientEntry] = {} self.__current_users_mutex: Optional[asyncio.Lock] = None - self.__users_connecting: Set[str] = set() + self.__users_connecting: Dict[str, int] = {} self.__users_connecting_mutex: Optional[asyncio.Lock] = None self.__strategy_factory: StrategyFactory = StrategyFactory(self.__args, self.__mapping_manager, @@ -144,66 +147,15 @@ async def __connection_handler(self, websocket_client_connection: websockets.Web with logger.contextualize(identifier=origin, name="websocket"): logger.info("New connection from {}", websocket_client_connection.remote_address) async with self.__users_connecting_mutex: - if origin in self.__users_connecting: - # TODO: Limit the timeframe within a device has to be connected... + if origin in self.__users_connecting and self.__users_connecting[origin] + CONNECTING_TIMEOUT > int(time.time()): logger.info("Client is already connecting") return else: - self.__users_connecting.add(origin) + self.__users_connecting[origin] = int(time.time()) entry: Optional[WebsocketConnectedClientEntry] = None try: - device: Optional[SettingsDevice] = None - device_paused: bool = self.__enable_configmode - device_id: int = -1 - if not self.__enable_configmode: - logger.debug("Fetching device settings") - async with self.__db_wrapper as session, session: - device = await SettingsDeviceHelper.get_by_origin(session, self.__db_wrapper.get_instance_id(), - origin) - if not device: - logger.warning("Device {} unknown", origin) - return - else: - device_id = device.device_id - logger.debug("Checking if device is active") - if not await self.__mapping_manager.is_device_active(device.device_id): - logger.warning('Origin is currently paused. Unpause through MADmin to begin working') - device_paused = True - - async with self.__current_users_mutex: - logger.debug("Checking if an entry is already present") - entry = self.__current_users.get(origin, None) - - # First check if an entry is present, worker running etc... - if entry and entry.websocket_client_connection: - await self.__handle_existing_connection(entry, origin) - entry.websocket_client_connection = websocket_client_connection - elif not entry: - async with self.__db_wrapper as session, session: - current_auth: Optional[SettingsPogoauth] = await SettingsPogoauthHelper \ - .get_assigned_to_device(session, device_id) - if current_auth: - session.expunge(current_auth) - # Just create a new entry... - worker_state: WorkerState = WorkerState(origin=origin, - device_id=device_id, - stop_worker_event=asyncio.Event(), - pogo_windows=self.__pogo_window_manager, - active_event=self.__pogo_event, - current_auth=current_auth) - entry = WebsocketConnectedClientEntry(origin=origin, - websocket_client_connection=websocket_client_connection, - worker_instance=None, - worker_state=worker_state) - self.__current_users[origin] = entry - - # No connection known or already at a point where we can continue creating worker - # -> we can just create a new task - if not await self.__add_worker_and_thread_to_entry(entry, origin, use_configmode=device_paused): - logger.warning("Failed to start worker for {}", origin) - raise WebsocketAbortRegistrationException("Failed starting worker") - else: - logger.info("Worker added/started successfully for {}", origin) + entry = await self.__setup_client_entry(origin=origin, + websocket_client_connection=websocket_client_connection) except WebsocketAbortRegistrationException: await asyncio.sleep(rand.uniform(3, 15)) return @@ -226,10 +178,66 @@ async def __connection_handler(self, websocket_client_connection: websockets.Web logger.info("Done with connection ({})", websocket_client_connection.remote_address) + async def __setup_client_entry(self, origin: str, websocket_client_connection: websockets.WebSocketClientProtocol) \ + -> Optional[WebsocketConnectedClientEntry]: + device: Optional[SettingsDevice] = None + device_paused: bool = self.__enable_configmode + device_id: int = -1 + if not self.__enable_configmode: + logger.debug("Fetching device settings") + async with self.__db_wrapper as session, session: + device = await SettingsDeviceHelper.get_by_origin(session, self.__db_wrapper.get_instance_id(), + origin) + if not device: + logger.warning("Device {} unknown", origin) + return None + else: + device_id = device.device_id + logger.debug("Checking if device is active") + if not await self.__mapping_manager.is_device_active(device.device_id): + logger.warning('Origin is currently paused. Unpause through MADmin to begin working') + device_paused = True + entry: Optional[WebsocketConnectedClientEntry] = None + async with self.__current_users_mutex: + logger.debug("Checking if an entry is already present") + entry = self.__current_users.get(origin, None) + + # First check if an entry is present, worker running etc... + if entry and entry.websocket_client_connection: + await self.__handle_existing_connection(entry, origin) + entry.websocket_client_connection = websocket_client_connection + elif not entry: + async with self.__db_wrapper as session, session: + current_auth: Optional[SettingsPogoauth] = await SettingsPogoauthHelper \ + .get_assigned_to_device(session, device_id) + if current_auth: + session.expunge(current_auth) + # Just create a new entry... + worker_state: WorkerState = WorkerState(origin=origin, + device_id=device_id, + stop_worker_event=asyncio.Event(), + pogo_windows=self.__pogo_window_manager, + active_event=self.__pogo_event, + current_auth=current_auth) + entry = WebsocketConnectedClientEntry(origin=origin, + websocket_client_connection=websocket_client_connection, + worker_instance=None, + worker_state=worker_state) + self.__current_users[origin] = entry + + # No connection known or already at a point where we can continue creating worker + # -> we can just create a new task + if not await self.__add_worker_and_thread_to_entry(entry, origin, use_configmode=device_paused): + logger.warning("Failed to start worker for {}", origin) + raise WebsocketAbortRegistrationException("Failed starting worker") + else: + logger.info("Worker added/started successfully for {}", origin) + return entry + async def __remove_from_users_connecting(self, origin): async with self.__users_connecting_mutex: if origin in self.__users_connecting: - self.__users_connecting.remove(origin) + self.__users_connecting.pop(origin) async def __remove_from_current_users(self, origin): async with self.__current_users_mutex: