From 162a8d8fa1e02036977b4b3d027c48eac6b5774b Mon Sep 17 00:00:00 2001 From: davidkneipp Date: Fri, 17 Nov 2023 06:45:12 +1000 Subject: [PATCH] Staging fix for geored socket failure on single peer --- services/georedService.py | 123 +++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/services/georedService.py b/services/georedService.py index 861e8b8..12f1680 100644 --- a/services/georedService.py +++ b/services/georedService.py @@ -251,91 +251,94 @@ async def handleAsymmetricGeoredQueue(self): """ Collects and processes asymmetric geored messages. """ - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: - while True: - try: - if self.benchmarking: - startTime = time.perf_counter() - georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='asymmetric-geored')))[1]) - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Message: {georedMessage}")) - - georedOperation = georedMessage['operation'] - georedBody = georedMessage['body'] - georedUrls = georedMessage['urls'] - georedTasks = [] - + while True: + try: + if self.benchmarking: + startTime = time.perf_counter() + georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='asymmetric-geored')))[1]) + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Message: {georedMessage}")) + + georedOperation = georedMessage['operation'] + georedBody = georedMessage['body'] + georedUrls = georedMessage['urls'] + georedTasks = [] + + socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) + async with socketSession as session: for georedEndpoint in georedUrls: - georedTasks.append(self.sendGeored(asyncSession=session, url=georedEndpoint, operation=georedOperation, body=georedBody)) + georedTasks.append(self.sendGeored(asyncSession=session, url=georedEndpoint, operation=georedOperation, body=georedBody)) await asyncio.gather(*georedTasks) - if self.benchmarking: - await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleAsymmetricGeoredQueue] Time taken to send asymmetric geored message to specified peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) + if self.benchmarking: + await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleAsymmetricGeoredQueue] Time taken to send asymmetric geored message to specified peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0)) + await(asyncio.sleep(0)) - except Exception as e: - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Error handling asymmetric geored queue: {e}")) - await(asyncio.sleep(0)) - continue + except Exception as e: + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Error handling asymmetric geored queue: {e}")) + await(asyncio.sleep(0)) + continue async def handleGeoredQueue(self): """ Collects and processes queued geored messages. """ - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: - while True: - try: - if self.benchmarking: - startTime = time.perf_counter() - georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='geored')))[1]) - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Message: {georedMessage}")) - - georedOperation = georedMessage['operation'] - georedBody = georedMessage['body'] - georedTasks = [] - + while True: + try: + if self.benchmarking: + startTime = time.perf_counter() + georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='geored')))[1]) + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Message: {georedMessage}")) + + georedOperation = georedMessage['operation'] + georedBody = georedMessage['body'] + georedTasks = [] + + socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) + async with socketSession as session: for remotePeer in self.georedPeers: - georedTasks.append(self.sendGeored(asyncSession=session, url=remotePeer+'/geored/', operation=georedOperation, body=georedBody)) + georedTasks.append(self.sendGeored(asyncSession=session, url=remotePeer+'/geored/', operation=georedOperation, body=georedBody)) await asyncio.gather(*georedTasks) - if self.benchmarking: - await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleGeoredQueue] Time taken to send geored message to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) + if self.benchmarking: + await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleGeoredQueue] Time taken to send geored message to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0)) + await(asyncio.sleep(0)) - except Exception as e: - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Error handling geored queue: {e}")) - await(asyncio.sleep(0)) - continue + except Exception as e: + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Error handling geored queue: {e}")) + await(asyncio.sleep(0)) + continue async def handleWebhookQueue(self): """ Collects and processes queued webhook messages. """ - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: - while True: - try: - if self.benchmarking: - startTime = time.perf_counter() - webhookMessage = json.loads((await(self.redisWebhookMessaging.awaitMessage(key='webhook')))[1]) + while True: + try: + if self.benchmarking: + startTime = time.perf_counter() + webhookMessage = json.loads((await(self.redisWebhookMessaging.awaitMessage(key='webhook')))[1]) - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}")) + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}")) - webhookHeaders = webhookMessage['headers'] - webhookOperation = webhookMessage['operation'] - webhookBody = webhookMessage['body'] - webhookTasks = [] + webhookHeaders = webhookMessage['headers'] + webhookOperation = webhookMessage['operation'] + webhookBody = webhookMessage['body'] + webhookTasks = [] + socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) + async with socketSession as session: for remotePeer in self.webhookPeers: - webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders)) + webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders)) await asyncio.gather(*webhookTasks) - if self.benchmarking: - await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleWebhookQueue] Time taken to send webhook to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) + if self.benchmarking: + await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleWebhookQueue] Time taken to send webhook to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0.001)) + await(asyncio.sleep(0.001)) - except Exception as e: - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Error handling webhook queue: {e}")) - await(asyncio.sleep(0.001)) - continue + except Exception as e: + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Error handling webhook queue: {e}")) + await(asyncio.sleep(0.001)) + continue async def startService(self): """