From 07acae60e036b4002f0800acda2ccbdce98cd030 Mon Sep 17 00:00:00 2001 From: bumseb1ene <122762145+bumseb1ene@users.noreply.github.com> Date: Thu, 22 Aug 2024 22:00:56 +0200 Subject: [PATCH] Fixed v10 --- ban_client.py | 108 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 37 deletions(-) diff --git a/ban_client.py b/ban_client.py index 05b69cb..fccdf64 100644 --- a/ban_client.py +++ b/ban_client.py @@ -179,26 +179,37 @@ async def consume_unban_messages(connection, channel, queue, api_clients): await message.ack() except json.JSONDecodeError: logging.error("Fehler beim Parsen der JSON-Daten") - await message.nack(requeue(True)) + await message.nack(requeue=True) except Exception as e: logging.error(f"Unerwarteter Fehler beim Verarbeiten der Nachricht: {e}") - await message.nack(requeue(True)) + await message.nack(requeue=True) async def connect_to_tempban_rabbitmq(client_id): - logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Tempban-Nachrichten herzustellen für Client {client_id}...") - tempban_connection = await aio_pika.connect_robust( - f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/", - loop=asyncio.get_running_loop(), - heartbeat=600, - client_properties={'connection_name': f'tempban_connection_{client_id}'} - ) - tempban_channel = await tempban_connection.channel() - exchange_name = f'tempbans_fanout_{client_id}' - tempban_exchange = await tempban_channel.declare_exchange(exchange_name, ExchangeType.FANOUT, durable=True) - tempban_queue = await tempban_channel.declare_queue(f'tempbans_queue_{client_id}', durable=True) - await tempban_queue.bind(tempban_exchange, routing_key='') - logging.info(f"RabbitMQ Queue tempbans_queue_{client_id} deklariert und gebunden.") - return tempban_connection, tempban_channel, tempban_queue + try: + logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Tempban-Nachrichten herzustellen für Client {client_id}...") + + tempban_connection = await aio_pika.connect_robust( + f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/", + loop=asyncio.get_running_loop(), + heartbeat=600, + client_properties={'connection_name': f'tempban_connection_{client_id}'} + ) + + tempban_channel = await tempban_connection.channel() + exchange_name = f'tempbans_fanout_{client_id}' + + tempban_exchange = await tempban_channel.declare_exchange(exchange_name, ExchangeType.FANOUT, durable=True) + queue_name = f'tempbans_queue_{client_id}' + tempban_queue = await tempban_channel.declare_queue(queue_name, durable=True) + + await tempban_queue.bind(tempban_exchange, routing_key='') + logging.info(f"RabbitMQ Queue {queue_name} deklariert und gebunden.") + + return tempban_connection, tempban_channel, tempban_queue + + except Exception as e: + logging.error(f"Fehler beim Verbinden mit RabbitMQ für Tempban-Nachrichten: {e}") + raise # Optional: Weiterleiten des Fehlers oder Versuch eines erneuten Verbindungsaufbaus async def consume_tempban_messages(connection, channel, queue, api_clients): logging.info("Beginne mit dem Empfang von Tempban-Nachrichten...") @@ -210,15 +221,16 @@ async def consume_tempban_messages(connection, channel, queue, api_clients): ban_data = json.loads(message.body.decode()) logging.info(f"Empfangene Tempban-Daten: {ban_data}") - player_name = ban_data.get('player_name', ban_data.get('player')) - player_id = ban_data.get('player_id', ban_data.get('steam_id_64')) + # Dynamische Zuordnung von player_id und player_name basierend auf der API-Version + player_name = ban_data.get('player_name') or ban_data.get('player') + player_id = ban_data.get('player_id') or ban_data.get('steam_id_64') if not player_name: logging.error(f"Fehlendes 'player_name' in den Tempban-Daten: {ban_data}") if not player_id: logging.error(f"Fehlendes 'player_id' in den Tempban-Daten: {ban_data}") if not player_name or not player_id: - await message.nack(requeue(True)) + await message.nack(requeue=True) continue for api_client in api_clients: @@ -234,7 +246,7 @@ async def consume_tempban_messages(connection, channel, queue, api_clients): await message.nack(requeue=True) except Exception as e: logging.error(f"Unerwarteter Fehler beim Verarbeiten der Nachricht: {e}") - await message.nack(requeue(True)) + await message.nack(requeue=True) async def connect_to_watchlist_rabbitmq(client_id): logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Watchlist-Nachrichten herzustellen für Client {client_id}...") @@ -258,36 +270,58 @@ async def consume_watchlist_messages(connection, channel, queue, api_clients): try: watchlist_data = json.loads(message.body.decode()) logging.info(f"Empfangene Watchlist-Daten: {watchlist_data}") - - player_name = watchlist_data.get('player_name', watchlist_data.get('player')) - player_id = watchlist_data.get('player_id', watchlist_data.get('steam_id_64')) - - # Überprüfung auf fehlende oder ungültige player_id - if not player_id: - logging.error(f"Fehlendes 'player_id' in den Watchlist-Daten: {watchlist_data}") - await message.nack(requeue=False) # Nachricht nicht erneut in die Queue einreihen - continue - # Überprüfung auf fehlende erforderliche Felder - if not player_name: - logging.error(f"Fehlende erforderliche Datenfelder in den Watchlist-Daten: {watchlist_data}") - await message.nack(requeue(False)) - continue + processed = False # Flag to track if the message has been processed + # Iteriere über die API-Clients, um die Nachricht an den richtigen API-Client zu senden for api_client in api_clients: version = api_client.api_version + if version.startswith("v10"): + # v10 API verwendet player_name und player_id + player_name = watchlist_data.get('player_name') + player_id = watchlist_data.get('player_id') + else: + # v9.x API verwendet player und steam_id_64 + player_name = watchlist_data.get('player') + player_id = watchlist_data.get('steam_id_64') + + # Überprüfung auf fehlende oder ungültige player_id + if not player_id: + logging.error(f"Fehlendes 'player_id' oder 'steam_id_64' in den Watchlist-Daten: {watchlist_data}") + break # Verlassen der Schleife, um die Nachricht nicht weiter zu verarbeiten + + # Überprüfung auf fehlende erforderliche Felder + if not player_name: + logging.error(f"Fehlende erforderliche Datenfelder in den Watchlist-Daten: {watchlist_data}") + break # Verlassen der Schleife, um die Nachricht nicht weiter zu verarbeiten + + # Call the appropriate API method based on the client version if api_client.do_watch_player(player_name, player_id, watchlist_data['reason'], watchlist_data['by']): logging.info(f"Spieler erfolgreich zur Watchlist hinzugefügt: {player_id}") + processed = True else: logging.error(f"Fehler beim Hinzufügen zur Watchlist für Player ID: {player_id}") - await message.ack() + break # Nachricht nicht weiterverarbeiten + + if processed: + await message.ack() + else: + await message.nack(requeue=False) + except json.JSONDecodeError: logging.error("Fehler beim Parsen der JSON-Daten") - await message.nack(requeue(True)) + try: + await message.nack(requeue=True) + except aio_pika.exceptions.MessageProcessError: + logging.error("Nachricht konnte nicht zurückgestellt werden, da sie bereits verarbeitet wurde.") + except Exception as e: logging.error(f"Unerwarteter Fehler beim Verarbeiten der Nachricht: {e}") - await message.nack(requeue(True)) + try: + await message.nack(requeue=True) + except aio_pika.exceptions.MessageProcessError: + logging.error("Nachricht konnte nicht zurückgestellt werden, da sie bereits verarbeitet wurde.") async def connect_to_unwatch_rabbitmq(client_id): logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Unwatch-Nachrichten herzustellen für Client {client_id}...")