Skip to content

Commit

Permalink
Fixed v10
Browse files Browse the repository at this point in the history
  • Loading branch information
bumseb1ene committed Aug 22, 2024
1 parent 2509a15 commit 07acae6
Showing 1 changed file with 71 additions and 37 deletions.
108 changes: 71 additions & 37 deletions ban_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,26 +179,37 @@ async def consume_unban_messages(connection, channel, queue, api_clients):
await message.ack()
except json.JSONDecodeError:
logging.error("Fehler beim Parsen der JSON-Daten")
await message.nack(requeue(True))
await message.nack(requeue=True)
except Exception as e:
logging.error(f"Unerwarteter Fehler beim Verarbeiten der Nachricht: {e}")
await message.nack(requeue(True))
await message.nack(requeue=True)

async def connect_to_tempban_rabbitmq(client_id):
logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Tempban-Nachrichten herzustellen für Client {client_id}...")
tempban_connection = await aio_pika.connect_robust(
f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/",
loop=asyncio.get_running_loop(),
heartbeat=600,
client_properties={'connection_name': f'tempban_connection_{client_id}'}
)
tempban_channel = await tempban_connection.channel()
exchange_name = f'tempbans_fanout_{client_id}'
tempban_exchange = await tempban_channel.declare_exchange(exchange_name, ExchangeType.FANOUT, durable=True)
tempban_queue = await tempban_channel.declare_queue(f'tempbans_queue_{client_id}', durable=True)
await tempban_queue.bind(tempban_exchange, routing_key='')
logging.info(f"RabbitMQ Queue tempbans_queue_{client_id} deklariert und gebunden.")
return tempban_connection, tempban_channel, tempban_queue
try:
logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Tempban-Nachrichten herzustellen für Client {client_id}...")

tempban_connection = await aio_pika.connect_robust(
f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/",
loop=asyncio.get_running_loop(),
heartbeat=600,
client_properties={'connection_name': f'tempban_connection_{client_id}'}
)

tempban_channel = await tempban_connection.channel()
exchange_name = f'tempbans_fanout_{client_id}'

tempban_exchange = await tempban_channel.declare_exchange(exchange_name, ExchangeType.FANOUT, durable=True)
queue_name = f'tempbans_queue_{client_id}'
tempban_queue = await tempban_channel.declare_queue(queue_name, durable=True)

await tempban_queue.bind(tempban_exchange, routing_key='')
logging.info(f"RabbitMQ Queue {queue_name} deklariert und gebunden.")

return tempban_connection, tempban_channel, tempban_queue

except Exception as e:
logging.error(f"Fehler beim Verbinden mit RabbitMQ für Tempban-Nachrichten: {e}")
raise # Optional: Weiterleiten des Fehlers oder Versuch eines erneuten Verbindungsaufbaus

async def consume_tempban_messages(connection, channel, queue, api_clients):
logging.info("Beginne mit dem Empfang von Tempban-Nachrichten...")
Expand All @@ -210,15 +221,16 @@ async def consume_tempban_messages(connection, channel, queue, api_clients):
ban_data = json.loads(message.body.decode())
logging.info(f"Empfangene Tempban-Daten: {ban_data}")

player_name = ban_data.get('player_name', ban_data.get('player'))
player_id = ban_data.get('player_id', ban_data.get('steam_id_64'))
# Dynamische Zuordnung von player_id und player_name basierend auf der API-Version
player_name = ban_data.get('player_name') or ban_data.get('player')
player_id = ban_data.get('player_id') or ban_data.get('steam_id_64')

if not player_name:
logging.error(f"Fehlendes 'player_name' in den Tempban-Daten: {ban_data}")
if not player_id:
logging.error(f"Fehlendes 'player_id' in den Tempban-Daten: {ban_data}")
if not player_name or not player_id:
await message.nack(requeue(True))
await message.nack(requeue=True)
continue

for api_client in api_clients:
Expand All @@ -234,7 +246,7 @@ async def consume_tempban_messages(connection, channel, queue, api_clients):
await message.nack(requeue=True)
except Exception as e:
logging.error(f"Unerwarteter Fehler beim Verarbeiten der Nachricht: {e}")
await message.nack(requeue(True))
await message.nack(requeue=True)

async def connect_to_watchlist_rabbitmq(client_id):
logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Watchlist-Nachrichten herzustellen für Client {client_id}...")
Expand All @@ -258,36 +270,58 @@ async def consume_watchlist_messages(connection, channel, queue, api_clients):
try:
watchlist_data = json.loads(message.body.decode())
logging.info(f"Empfangene Watchlist-Daten: {watchlist_data}")

player_name = watchlist_data.get('player_name', watchlist_data.get('player'))
player_id = watchlist_data.get('player_id', watchlist_data.get('steam_id_64'))

# Überprüfung auf fehlende oder ungültige player_id
if not player_id:
logging.error(f"Fehlendes 'player_id' in den Watchlist-Daten: {watchlist_data}")
await message.nack(requeue=False) # Nachricht nicht erneut in die Queue einreihen
continue

# Überprüfung auf fehlende erforderliche Felder
if not player_name:
logging.error(f"Fehlende erforderliche Datenfelder in den Watchlist-Daten: {watchlist_data}")
await message.nack(requeue(False))
continue
processed = False # Flag to track if the message has been processed

# Iteriere über die API-Clients, um die Nachricht an den richtigen API-Client zu senden
for api_client in api_clients:
version = api_client.api_version

if version.startswith("v10"):
# v10 API verwendet player_name und player_id
player_name = watchlist_data.get('player_name')
player_id = watchlist_data.get('player_id')
else:
# v9.x API verwendet player und steam_id_64
player_name = watchlist_data.get('player')
player_id = watchlist_data.get('steam_id_64')

# Überprüfung auf fehlende oder ungültige player_id
if not player_id:
logging.error(f"Fehlendes 'player_id' oder 'steam_id_64' in den Watchlist-Daten: {watchlist_data}")
break # Verlassen der Schleife, um die Nachricht nicht weiter zu verarbeiten

# Überprüfung auf fehlende erforderliche Felder
if not player_name:
logging.error(f"Fehlende erforderliche Datenfelder in den Watchlist-Daten: {watchlist_data}")
break # Verlassen der Schleife, um die Nachricht nicht weiter zu verarbeiten

# Call the appropriate API method based on the client version
if api_client.do_watch_player(player_name, player_id, watchlist_data['reason'], watchlist_data['by']):
logging.info(f"Spieler erfolgreich zur Watchlist hinzugefügt: {player_id}")
processed = True
else:
logging.error(f"Fehler beim Hinzufügen zur Watchlist für Player ID: {player_id}")
await message.ack()
break # Nachricht nicht weiterverarbeiten

if processed:
await message.ack()
else:
await message.nack(requeue=False)

except json.JSONDecodeError:
logging.error("Fehler beim Parsen der JSON-Daten")
await message.nack(requeue(True))
try:
await message.nack(requeue=True)
except aio_pika.exceptions.MessageProcessError:
logging.error("Nachricht konnte nicht zurückgestellt werden, da sie bereits verarbeitet wurde.")

except Exception as e:
logging.error(f"Unerwarteter Fehler beim Verarbeiten der Nachricht: {e}")
await message.nack(requeue(True))
try:
await message.nack(requeue=True)
except aio_pika.exceptions.MessageProcessError:
logging.error("Nachricht konnte nicht zurückgestellt werden, da sie bereits verarbeitet wurde.")

async def connect_to_unwatch_rabbitmq(client_id):
logging.info(f"Versuche, eine Verbindung zu RabbitMQ für Unwatch-Nachrichten herzustellen für Client {client_id}...")
Expand Down

0 comments on commit 07acae6

Please sign in to comment.