Skip to content

Commit

Permalink
Staging fix for geored socket failure on single peer
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkneipp committed Nov 16, 2023
1 parent a6f8dce commit 162a8d8
Showing 1 changed file with 63 additions and 60 deletions.
123 changes: 63 additions & 60 deletions services/georedService.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,91 +251,94 @@ async def handleAsymmetricGeoredQueue(self):
"""
Collects and processes asymmetric geored messages.
"""
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
while True:
try:
if self.benchmarking:
startTime = time.perf_counter()
georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='asymmetric-geored')))[1])
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Message: {georedMessage}"))

georedOperation = georedMessage['operation']
georedBody = georedMessage['body']
georedUrls = georedMessage['urls']
georedTasks = []

while True:
try:
if self.benchmarking:
startTime = time.perf_counter()
georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='asymmetric-geored')))[1])
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Message: {georedMessage}"))

georedOperation = georedMessage['operation']
georedBody = georedMessage['body']
georedUrls = georedMessage['urls']
georedTasks = []

socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False))
async with socketSession as session:
for georedEndpoint in georedUrls:
georedTasks.append(self.sendGeored(asyncSession=session, url=georedEndpoint, operation=georedOperation, body=georedBody))
georedTasks.append(self.sendGeored(asyncSession=session, url=georedEndpoint, operation=georedOperation, body=georedBody))
await asyncio.gather(*georedTasks)
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleAsymmetricGeoredQueue] Time taken to send asymmetric geored message to specified peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleAsymmetricGeoredQueue] Time taken to send asymmetric geored message to specified peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))

await(asyncio.sleep(0))
await(asyncio.sleep(0))

except Exception as e:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Error handling asymmetric geored queue: {e}"))
await(asyncio.sleep(0))
continue
except Exception as e:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Error handling asymmetric geored queue: {e}"))
await(asyncio.sleep(0))
continue

async def handleGeoredQueue(self):
"""
Collects and processes queued geored messages.
"""
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
while True:
try:
if self.benchmarking:
startTime = time.perf_counter()
georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='geored')))[1])
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Message: {georedMessage}"))

georedOperation = georedMessage['operation']
georedBody = georedMessage['body']
georedTasks = []

while True:
try:
if self.benchmarking:
startTime = time.perf_counter()
georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='geored')))[1])
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Message: {georedMessage}"))

georedOperation = georedMessage['operation']
georedBody = georedMessage['body']
georedTasks = []

socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False))
async with socketSession as session:
for remotePeer in self.georedPeers:
georedTasks.append(self.sendGeored(asyncSession=session, url=remotePeer+'/geored/', operation=georedOperation, body=georedBody))
georedTasks.append(self.sendGeored(asyncSession=session, url=remotePeer+'/geored/', operation=georedOperation, body=georedBody))
await asyncio.gather(*georedTasks)
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleGeoredQueue] Time taken to send geored message to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleGeoredQueue] Time taken to send geored message to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))

await(asyncio.sleep(0))
await(asyncio.sleep(0))

except Exception as e:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Error handling geored queue: {e}"))
await(asyncio.sleep(0))
continue
except Exception as e:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Error handling geored queue: {e}"))
await(asyncio.sleep(0))
continue

async def handleWebhookQueue(self):
"""
Collects and processes queued webhook messages.
"""
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
while True:
try:
if self.benchmarking:
startTime = time.perf_counter()
webhookMessage = json.loads((await(self.redisWebhookMessaging.awaitMessage(key='webhook')))[1])
while True:
try:
if self.benchmarking:
startTime = time.perf_counter()
webhookMessage = json.loads((await(self.redisWebhookMessaging.awaitMessage(key='webhook')))[1])

await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}"))
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}"))

webhookHeaders = webhookMessage['headers']
webhookOperation = webhookMessage['operation']
webhookBody = webhookMessage['body']
webhookTasks = []
webhookHeaders = webhookMessage['headers']
webhookOperation = webhookMessage['operation']
webhookBody = webhookMessage['body']
webhookTasks = []

socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False))
async with socketSession as session:
for remotePeer in self.webhookPeers:
webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders))
webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders))
await asyncio.gather(*webhookTasks)
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleWebhookQueue] Time taken to send webhook to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleWebhookQueue] Time taken to send webhook to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))

await(asyncio.sleep(0.001))
await(asyncio.sleep(0.001))

except Exception as e:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Error handling webhook queue: {e}"))
await(asyncio.sleep(0.001))
continue
except Exception as e:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Error handling webhook queue: {e}"))
await(asyncio.sleep(0.001))
continue

async def startService(self):
"""
Expand Down

0 comments on commit 162a8d8

Please sign in to comment.