From 457b9f975d174eb0f50f8829c2c2090b25a157bb Mon Sep 17 00:00:00 2001 From: Kozejin Date: Sat, 23 Mar 2024 21:48:03 -0400 Subject: [PATCH] Python Black Format --- cogs/connect.py | 28 ++- cogs/help.py | 74 ++++-- cogs/kits.py | 42 +++- cogs/palcon.py | 206 +++++++++++++---- cogs/palguard.py | 198 ++++++++++++---- cogs/query.py | 83 +++++-- cogs/restart.py | 18 +- cogs/statustracker.py | 18 +- cogs/steam.py | 80 +++++-- cogs/whitelist.py | 251 +++++++++++++++------ gamedata/{kits.json => kits.json.template} | 0 main.py | 24 +- settings.py | 8 +- util/rconutility.py | 39 +++- util/steam_protocol.py | 14 +- 15 files changed, 805 insertions(+), 278 deletions(-) rename gamedata/{kits.json => kits.json.template} (100%) diff --git a/cogs/connect.py b/cogs/connect.py index e28e093..b51e034 100644 --- a/cogs/connect.py +++ b/cogs/connect.py @@ -18,7 +18,7 @@ def load_config(self): with open(config_path) as config_file: config = json.load(config_file) return config["PALWORLD_SERVERS"] - + async def run_command(self, server_name): try: response = await self.rcon_util.rcon_command(server_name, "ShowPlayers") @@ -26,7 +26,7 @@ async def run_command(self, server_name): except Exception as e: print(f"Error sending command to {server_name}: {e}") return "" - + async def monitor_player_activity(self): while True: for server_name, server_info in self.servers.items(): @@ -52,10 +52,10 @@ async def announce_player_changes(self, server_name, current_players): def extract_players(self, player_data): players = set() - lines = player_data.split('\n')[1:] + lines = player_data.split("\n")[1:] for line in lines: if line.strip(): - parts = line.split(',') + parts = line.split(",") if len(parts) == 3: name, _, steamid = parts players.add((name.strip(), steamid.strip())) @@ -64,24 +64,32 @@ def extract_players(self, player_data): async def announce_player_join(self, server_name, player): name, steamid = player if "CONNECTION_CHANNEL" in self.servers[server_name]: - announcement_channel_id = self.servers[server_name]["CONNECTION_CHANNEL"] - channel = self.bot.get_channel(announcement_channel_id) + channel_id = self.servers[server_name]["CONNECTION_CHANNEL"] + channel = self.bot.get_channel(channel_id) if channel: now = datetime.datetime.now() timestamp = now.strftime("%m-%d-%Y at %I:%M:%S %p") - embed = nextcord.Embed(title="Player Joined", description=f"Player joined {server_name}: {name} (SteamID: {steamid})", color=nextcord.Color.blurple()) + embed = nextcord.Embed( + title="Player Joined", + description=f"Player joined {server_name}: {name} (SteamID: {steamid})", + color=nextcord.Color.blurple(), + ) embed.set_footer(text=f"Time: {timestamp}") await channel.send(embed=embed) async def announce_player_leave(self, server_name, player): name, steamid = player if "CONNECTION_CHANNEL" in self.servers[server_name]: - announcement_channel_id = self.servers[server_name]["CONNECTION_CHANNEL"] - channel = self.bot.get_channel(announcement_channel_id) + channel_id = self.servers[server_name]["CONNECTION_CHANNEL"] + channel = self.bot.get_channel(channel_id) if channel: now = datetime.datetime.now() timestamp = now.strftime("%m-%d-%Y at %I:%M:%S %p") - embed = nextcord.Embed(title="Player Left", description=f"Player left {server_name}: {name} (SteamID: {steamid})", color=nextcord.Color.red()) + embed = nextcord.Embed( + title="Player Left", + description=f"Player left {server_name}: {name} (SteamID: {steamid})", + color=nextcord.Color.red(), + ) embed.set_footer(text=f"Time: {timestamp}") await channel.send(embed=embed) diff --git a/cogs/help.py b/cogs/help.py index fcaa3b4..16dfcde 100644 --- a/cogs/help.py +++ b/cogs/help.py @@ -10,16 +10,30 @@ def __init__(self, bot): self.current_page = 0 async def generate_help_embed(self): - embed = nextcord.Embed(title="Help Menu", description="List of all available commands.", color=nextcord.Color.blue()) - embed.set_footer(text=f"{constants.FOOTER_TEXT}: Page {self.current_page + 1}", icon_url=constants.FOOTER_IMAGE) - - commands = self.bot.all_slash_commands if hasattr(self.bot, 'all_slash_commands') else [] + embed = nextcord.Embed( + title="Help Menu", + description="List of all available commands.", + color=nextcord.Color.blue(), + ) + embed.set_footer( + text=f"{constants.FOOTER_TEXT}: Page {self.current_page + 1}", + icon_url=constants.FOOTER_IMAGE, + ) + + commands = ( + self.bot.all_slash_commands + if hasattr(self.bot, "all_slash_commands") + else [] + ) start = self.current_page * 9 end = min(start + 9, len(commands)) for command in commands[start:end]: - embed.add_field(name=f"`/{command.name}`", value=command.description or "No description", inline=True) - + embed.add_field( + name=f"`/{command.name}`", + value=command.description or "No description", + inline=True, + ) return embed @nextcord.ui.button(label="Previous", style=nextcord.ButtonStyle.grey) @@ -30,7 +44,11 @@ async def previous_button_callback(self, button, interaction): @nextcord.ui.button(label="Next", style=nextcord.ButtonStyle.grey) async def next_button_callback(self, button, interaction): - if (self.current_page + 1) * 9 < len(self.bot.all_slash_commands if hasattr(self.bot, 'all_slash_commands') else []): + if (self.current_page + 1) * 9 < len( + self.bot.all_slash_commands + if hasattr(self.bot, "all_slash_commands") + else [] + ): self.current_page += 1 await self.update_help_message(interaction) @@ -53,15 +71,41 @@ async def help(self, interaction: nextcord.Interaction): @nextcord.slash_command(description="Information about the Palworld bot.") async def about(self, interaction: nextcord.Interaction): - embed = nextcord.Embed(title="Palworld Bot", color=nextcord.Color.blue(), url=constants.TITLE_URL) + embed = nextcord.Embed( + title="Palworld Bot", color=nextcord.Color.blue(), url=constants.TITLE_URL + ) embed.set_footer(text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE) - embed.add_field(name="About", value="The bot is an open-source project available [here](https://github.com/dkoz/palworld-bot). You can find more info on our readme. I'm always looking for code contributions and support! If there is something wrong with the bot itself, please let me know!", inline=False) - embed.add_field(name="Creator", value="This bot was created by [Kozejin](https://kozejin.dev). Feel free to add `koz#1337` on discord if you have any questions.", inline=False) - embed.add_field(name="Support", value="If you wish to support the bot, you can join our [Discord](https://discord.gg/3HUq8cJSrX).", inline=False) - - discord_button = Button(label="Discord", url="https://discord.gg/3HUq8cJSrX", style=nextcord.ButtonStyle.link) - github_button = Button(label="GitHub", url="https://github.com/dkoz/palworld-bot", style=nextcord.ButtonStyle.link) - kofi_button = Button(label="Support", url="https://ko-fi.com/kozejin", style=nextcord.ButtonStyle.link) + embed.add_field( + name="About", + value="The bot is an open-source project available [here](https://github.com/dkoz/palworld-bot). You can find more info on our readme. I'm always looking for code contributions and support! If there is something wrong with the bot itself, please let me know!", + inline=False, + ) + embed.add_field( + name="Creator", + value="This bot was created by [Kozejin](https://kozejin.dev). Feel free to add `koz#1337` on discord if you have any questions.", + inline=False, + ) + embed.add_field( + name="Support", + value="If you wish to support the bot, you can join our [Discord](https://discord.gg/3HUq8cJSrX).", + inline=False, + ) + + discord_button = Button( + label="Discord", + url="https://discord.gg/3HUq8cJSrX", + style=nextcord.ButtonStyle.link, + ) + github_button = Button( + label="GitHub", + url="https://github.com/dkoz/palworld-bot", + style=nextcord.ButtonStyle.link, + ) + kofi_button = Button( + label="Support", + url="https://ko-fi.com/kozejin", + style=nextcord.ButtonStyle.link, + ) view = View() view.add_item(discord_button) diff --git a/cogs/kits.py b/cogs/kits.py index 6328778..b9dfaa5 100644 --- a/cogs/kits.py +++ b/cogs/kits.py @@ -17,15 +17,33 @@ def load_config(self): config = json.load(config_file) self.servers = config["PALWORLD_SERVERS"] - async def autocomplete_server(self, interaction: nextcord.Interaction, current: str): - choices = [server for server in self.servers if current.lower() in server.lower()] + async def autocomplete_server( + self, interaction: nextcord.Interaction, current: str + ): + choices = [ + server for server in self.servers if current.lower() in server.lower() + ] await interaction.response.send_autocomplete(choices) - @nextcord.slash_command(name="kit", description="Give a kit to a player.", default_member_permissions=nextcord.Permissions(administrator=True)) - async def givekit(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), kit_name: str = nextcord.SlashOption(description="The name of the kit.", autocomplete=True), server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + @nextcord.slash_command( + name="kit", + description="Give a kit to a player.", + default_member_permissions=nextcord.Permissions(administrator=True), + ) + async def givekit( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), + kit_name: str = nextcord.SlashOption( + description="The name of the kit.", autocomplete=True + ), + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer() - packages_path = os.path.join('gamedata', 'kits.json') + packages_path = os.path.join("gamedata", "kits.json") with open(packages_path) as packages_file: kits = json.load(packages_file) @@ -39,17 +57,23 @@ async def givekit(self, interaction: nextcord.Interaction, steamid: str = nextco asyncio.create_task(self.rcon_util.rcon_command(server, command)) await asyncio.sleep(1) - embed = nextcord.Embed(title=f"Package Delivery - {server}", color=nextcord.Color.green()) + embed = nextcord.Embed( + title=f"Package Delivery - {server}", color=nextcord.Color.green() + ) embed.description = f"Delivering {kit_name} kit to {steamid}." await interaction.followup.send(embed=embed) @givekit.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @givekit.on_autocomplete("kit_name") - async def on_autocomplete_packages(self, interaction: nextcord.Interaction, current: str): - packages_path = os.path.join('gamedata', 'kits.json') + async def on_autocomplete_packages( + self, interaction: nextcord.Interaction, current: str + ): + packages_path = os.path.join("gamedata", "kits.json") with open(packages_path) as packages_file: kits = json.load(packages_file) choices = [name for name in kits if current.lower() in name.lower()][:25] diff --git a/cogs/palcon.py b/cogs/palcon.py index 0d83f15..5cbf477 100644 --- a/cogs/palcon.py +++ b/cogs/palcon.py @@ -17,136 +17,252 @@ def load_config(self): config = json.load(config_file) self.servers = config["PALWORLD_SERVERS"] - async def autocomplete_server(self, interaction: nextcord.Interaction, current: str): - choices = [server for server in self.servers if current.lower() in server.lower()] + async def autocomplete_server( + self, interaction: nextcord.Interaction, current: str + ): + choices = [ + server for server in self.servers if current.lower() in server.lower() + ] await interaction.response.send_autocomplete(choices) - @nextcord.slash_command(default_member_permissions=nextcord.Permissions(administrator=True)) + @nextcord.slash_command( + default_member_permissions=nextcord.Permissions(administrator=True) + ) async def palcon(self, interaction: nextcord.Interaction): pass @palcon.subcommand(description="Send a remote command to your Palworld server.") - async def command(self, interaction: nextcord.Interaction, command: str, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def command( + self, + interaction: nextcord.Interaction, + command: str, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, command) embed = nextcord.Embed(title=server, color=nextcord.Color.green()) embed.description = f"**Response:** {response}" - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @command.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Show the current player list for a server.") - async def showplayers(self, interaction: nextcord.Interaction, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def showplayers( + self, + interaction: nextcord.Interaction, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, "ShowPlayers") - embed = nextcord.Embed(title=f"Player List: {server}", color=nextcord.Color.red()) + embed = nextcord.Embed( + title=f"Player List: {server}", color=nextcord.Color.red() + ) embed.description = f"{response}" - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @showplayers.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Kick a player from a server using their SteamID.") - async def kickplayer(self, interaction: nextcord.Interaction, steamid: str, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def kickplayer( + self, + interaction: nextcord.Interaction, + steamid: str, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, f"KickPlayer {steamid}") - embed = nextcord.Embed(title=f"KickPlayer Command - {server}", color=nextcord.Color.orange()) + embed = nextcord.Embed( + title=f"KickPlayer Command - {server}", color=nextcord.Color.orange() + ) embed.add_field(name="Server", value=server, inline=True) embed.add_field(name="SteamID", value=steamid, inline=True) embed.add_field(name="Response", value=response, inline=False) - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @kickplayer.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Ban a player from a server using their SteamID.") - async def banplayer(self, interaction: nextcord.Interaction, steamid: str, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def banplayer( + self, + interaction: nextcord.Interaction, + steamid: str, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, f"BanPlayer {steamid}") - embed = nextcord.Embed(title=f"BanPlayer Command - {server}", color=nextcord.Color.red()) + embed = nextcord.Embed( + title=f"BanPlayer Command - {server}", color=nextcord.Color.red() + ) embed.add_field(name="Server", value=server, inline=True) embed.add_field(name="SteamID", value=steamid, inline=True) embed.add_field(name="Response", value=response, inline=False) - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @banplayer.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Show information about the server.") - async def info(self, interaction: nextcord.Interaction, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def info( + self, + interaction: nextcord.Interaction, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, f"Info") embed = nextcord.Embed(title=f"Info - {server}", color=nextcord.Color.blue()) embed.description = f"**Response:** {response}" - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @info.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Shutdown the server.") - async def shutdown(self, interaction: nextcord.Interaction, time: str = nextcord.SlashOption(description="Time for the shutdown"), reason: str = nextcord.SlashOption(description="Reason for the shutdown"), server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def shutdown( + self, + interaction: nextcord.Interaction, + time: str = nextcord.SlashOption(description="Time for the shutdown"), + reason: str = nextcord.SlashOption(description="Reason for the shutdown"), + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) reason_format = reason.replace(" ", "\u001f") - response = await self.rcon_util.rcon_command(server, f"Shutdown {time} {reason_format}") - embed = nextcord.Embed(title=f"Shutdown - {server}", color=nextcord.Color.blue()) + response = await self.rcon_util.rcon_command( + server, f"Shutdown {time} {reason_format}" + ) + embed = nextcord.Embed( + title=f"Shutdown - {server}", color=nextcord.Color.blue() + ) embed.description = f"**Response:** {response}" - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @shutdown.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Save the server.") - async def save(self, interaction: nextcord.Interaction, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def save( + self, + interaction: nextcord.Interaction, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, f"Save") embed = nextcord.Embed(title=f"Save - {server}", color=nextcord.Color.blue()) embed.description = f"**Response:** {response}" - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @save.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palcon.subcommand(description="Broadcast a message to the server.") - async def broadcast(self, interaction: nextcord.Interaction, message: str, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def broadcast( + self, + interaction: nextcord.Interaction, + message: str, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) message_format = message.replace(" ", "\u001f") - response = await self.rcon_util.rcon_command(server, f"Broadcast {message_format}") - embed = nextcord.Embed(title=f"Broadcast - {server}", color=nextcord.Color.blue()) + response = await self.rcon_util.rcon_command( + server, f"Broadcast {message_format}" + ) + embed = nextcord.Embed( + title=f"Broadcast - {server}", color=nextcord.Color.blue() + ) embed.description = f"**Response:** {response}" - embed.set_footer(text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", icon_url=constants.FOOTER_IMAGE) + embed.set_footer( + text=f"{constants.FOOTER_TEXT} • {datetime.datetime.now().strftime('%m-%d at %I:%M %p')}", + icon_url=constants.FOOTER_IMAGE, + ) await interaction.followup.send(embed=embed) @broadcast.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) def setup(bot): cog = PalconCog(bot) bot.add_cog(cog) - if not hasattr(bot, 'all_slash_commands'): + if not hasattr(bot, "all_slash_commands"): bot.all_slash_commands = [] - bot.all_slash_commands.extend([ - cog.command, - cog.showplayers, - cog.kickplayer, - cog.banplayer, - cog.info, - cog.shutdown, - cog.save, - cog.broadcast - ]) \ No newline at end of file + bot.all_slash_commands.extend( + [ + cog.command, + cog.showplayers, + cog.kickplayer, + cog.banplayer, + cog.info, + cog.shutdown, + cog.save, + cog.broadcast, + ] + ) \ No newline at end of file diff --git a/cogs/palguard.py b/cogs/palguard.py index d6f4c30..4081ed1 100644 --- a/cogs/palguard.py +++ b/cogs/palguard.py @@ -22,149 +22,255 @@ def load_config(self): self.servers = config["PALWORLD_SERVERS"] def load_pals(self): - pals_path = os.path.join('gamedata', 'pals.json') + pals_path = os.path.join("gamedata", "pals.json") with open(pals_path) as pals_file: self.pals = json.load(pals_file)["creatures"] def load_items(self): - items_path = os.path.join('gamedata', 'items.json') + items_path = os.path.join("gamedata", "items.json") with open(items_path) as items_file: self.items = json.load(items_file)["items"] def load_eggs(self): - eggs_path = os.path.join('gamedata', 'eggs.json') + eggs_path = os.path.join("gamedata", "eggs.json") with open(eggs_path) as eggs_file: self.eggs = json.load(eggs_file)["eggs"] - async def autocomplete_server(self, interaction: nextcord.Interaction, current: str): - choices = [server for server in self.servers if current.lower() in server.lower()] + async def autocomplete_server( + self, interaction: nextcord.Interaction, current: str + ): + choices = [ + server for server in self.servers if current.lower() in server.lower() + ] await interaction.response.send_autocomplete(choices) async def autocomplete_palid(self, interaction: nextcord.Interaction, current: str): - choices = [pal["name"] for pal in self.pals if current.lower() in pal["name"].lower()][:25] + choices = [ + pal["name"] for pal in self.pals if current.lower() in pal["name"].lower() + ][:25] await interaction.response.send_autocomplete(choices) - async def autocomplete_itemid(self, interaction: nextcord.Interaction, current: str): - choices = [item["name"] for item in self.items if current.lower() in item["name"].lower()][:25] + async def autocomplete_itemid( + self, interaction: nextcord.Interaction, current: str + ): + choices = [ + item["name"] + for item in self.items + if current.lower() in item["name"].lower() + ][:25] await interaction.response.send_autocomplete(choices) async def autocomplete_eggid(self, interaction: nextcord.Interaction, current: str): - choices = [egg["name"] for egg in self.eggs if current.lower() in egg["name"].lower()][:25] + choices = [ + egg["name"] for egg in self.eggs if current.lower() in egg["name"].lower() + ][:25] await interaction.response.send_autocomplete(choices) - @nextcord.slash_command(default_member_permissions=nextcord.Permissions(administrator=True)) + @nextcord.slash_command( + default_member_permissions=nextcord.Permissions(administrator=True) + ) async def palguard(self, _interaction: nextcord.Interaction): pass - @palguard.subcommand(name="reload" ,description="Reload server configuration.") - async def reloadcfg(self, interaction: nextcord.Interaction, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + @palguard.subcommand(name="reload", description="Reload server configuration.") + async def reloadcfg( + self, + interaction: nextcord.Interaction, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, "reloadcfg") await interaction.followup.send(f"**Response:** {response}") @reloadcfg.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palguard.subcommand(description="Give a Pal to a player.") - async def givepal(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), palid: str = nextcord.SlashOption(description="The ID of the Pal.", autocomplete=True), level: str = nextcord.SlashOption(description="Level of the Pal"), server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def givepal( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), + palid: str = nextcord.SlashOption( + description="The ID of the Pal.", autocomplete=True + ), + level: str = nextcord.SlashOption(description="Level of the Pal"), + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) pal_id = next((pal["id"] for pal in self.pals if pal["name"] == palid), None) if not pal_id: await interaction.followup.send("Pal ID not found.", ephemeral=True) return - asyncio.create_task(self.rcon_util.rcon_command(server, f"givepal {steamid} {pal_id} {level}")) - embed = nextcord.Embed(title=f"Palguard Pal - {server}", color=nextcord.Color.blue()) + asyncio.create_task( + self.rcon_util.rcon_command(server, f"givepal {steamid} {pal_id} {level}") + ) + embed = nextcord.Embed( + title=f"Palguard Pal - {server}", color=nextcord.Color.blue() + ) embed.description = f"Giving {palid} to {steamid}." await interaction.followup.send(embed=embed) @givepal.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @givepal.on_autocomplete("palid") - async def on_autocomplete_pals(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_pals( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_palid(interaction, current) @palguard.subcommand(description="Give an item to a player.") - async def giveitem(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), itemid: str = nextcord.SlashOption(description="The ID of the Item.", autocomplete=True), amount: str = nextcord.SlashOption(description="Item amount"), server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def giveitem( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), + itemid: str = nextcord.SlashOption( + description="The ID of the Item.", autocomplete=True + ), + amount: str = nextcord.SlashOption(description="Item amount"), + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) - item_id = next((item["id"] for item in self.items if item["name"] == itemid), None) + item_id = next( + (item["id"] for item in self.items if item["name"] == itemid), None + ) if not item_id: await interaction.followup.send("Item ID not found.", ephemeral=True) return - asyncio.create_task(self.rcon_util.rcon_command(server, f"give {steamid} {item_id} {amount}")) - embed = nextcord.Embed(title=f"Palguard Item - {server}", color=nextcord.Color.blue()) + asyncio.create_task( + self.rcon_util.rcon_command(server, f"give {steamid} {item_id} {amount}") + ) + embed = nextcord.Embed( + title=f"Palguard Item - {server}", color=nextcord.Color.blue() + ) embed.description = f"Giving {itemid} to {steamid}." await interaction.followup.send(embed=embed) @giveitem.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @giveitem.on_autocomplete("itemid") - async def on_autocomplete_items(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_items( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_itemid(interaction, current) @palguard.subcommand(description="Give experience to a player.") - async def giveexp(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), amount: str = nextcord.SlashOption(description="Experience amount"), server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def giveexp( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), + amount: str = nextcord.SlashOption(description="Experience amount"), + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) - asyncio.create_task(self.rcon_util.rcon_command(server, f"give_exp {steamid} {amount}")) - embed = nextcord.Embed(title=f"Palguard Experience - {server}", color=nextcord.Color.blue()) + asyncio.create_task( + self.rcon_util.rcon_command(server, f"give_exp {steamid} {amount}") + ) + embed = nextcord.Embed( + title=f"Palguard Experience - {server}", color=nextcord.Color.blue() + ) embed.description = f"Giving {amount} experience to {steamid}." await interaction.followup.send(embed=embed) @giveexp.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @palguard.subcommand(description="Give an egg to a player.") - async def giveegg(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), eggid: str = nextcord.SlashOption(description="The ID of the Egg.", autocomplete=True), server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + async def giveegg( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption(description="SteamID/UID of the player."), + eggid: str = nextcord.SlashOption( + description="The ID of the Egg.", autocomplete=True + ), + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) egg_id = next((egg["id"] for egg in self.eggs if egg["name"] == eggid), None) if not egg_id: await interaction.followup.send("Egg ID not found.", ephemeral=True) return - asyncio.create_task(self.rcon_util.rcon_command(server, f"giveegg {steamid} {egg_id}")) - embed = nextcord.Embed(title=f"Palguard Egg - {server}", color=nextcord.Color.blue()) + asyncio.create_task( + self.rcon_util.rcon_command(server, f"giveegg {steamid} {egg_id}") + ) + embed = nextcord.Embed( + title=f"Palguard Egg - {server}", color=nextcord.Color.blue() + ) embed.description = f"Giving {eggid} to {steamid}." await interaction.followup.send(embed=embed) @giveegg.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) @giveegg.on_autocomplete("eggid") - async def on_autocomplete_eggs(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_eggs( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_eggid(interaction, current) - @palguard.subcommand(name="help" ,description="List of Palguard commands.") - async def palguardhelp(self, interaction: nextcord.Interaction, server: str = nextcord.SlashOption(description="Select a server", autocomplete=True)): + @palguard.subcommand(name="help", description="List of Palguard commands.") + async def palguardhelp( + self, + interaction: nextcord.Interaction, + server: str = nextcord.SlashOption( + description="Select a server", autocomplete=True + ), + ): await interaction.response.defer(ephemeral=True) response = await self.rcon_util.rcon_command(server, "getrconcmds") await interaction.followup.send(f"{response}") @palguardhelp.on_autocomplete("server") - async def on_autocomplete_rcon(self, interaction: nextcord.Interaction, current: str): + async def on_autocomplete_rcon( + self, interaction: nextcord.Interaction, current: str + ): await self.autocomplete_server(interaction, current) def setup(bot): config_path = "config.json" with open(config_path) as config_file: config = json.load(config_file) - + if config.get("PALGUARD_ACTIVE", False): cog = PalguardCog(bot) bot.add_cog(cog) - if not hasattr(bot, 'all_slash_commands'): + if not hasattr(bot, "all_slash_commands"): bot.all_slash_commands = [] - bot.all_slash_commands.extend([ - cog.palguard, - cog.reloadcfg, - cog.givepal, - cog.giveitem, - cog.giveexp, - cog.giveegg - ]) + bot.all_slash_commands.extend( + [ + cog.palguard, + cog.reloadcfg, + cog.givepal, + cog.giveitem, + cog.giveexp, + cog.giveegg, + ] + ) else: print("Palguard disabled by default. Please enable it in config.json") \ No newline at end of file diff --git a/cogs/query.py b/cogs/query.py index 73c5adc..1ab548f 100644 --- a/cogs/query.py +++ b/cogs/query.py @@ -24,13 +24,13 @@ def load_config(self): self.servers = config["PALWORLD_SERVERS"] def load_message_ids(self): - ids_path = os.path.join('data', 'server_status.json') + ids_path = os.path.join("data", "server_status.json") if os.path.exists(ids_path): with open(ids_path) as ids_file: self.message_ids = json.load(ids_file) - + def save_message_ids(self): - with open(os.path.join('data', 'server_status.json'), 'w') as file: + with open(os.path.join("data", "server_status.json"), "w") as file: json.dump(self.message_ids, file, indent=4) def create_task(self): @@ -40,7 +40,7 @@ def create_task(self): # Split player list into chunks def split_players(self, lst, chunk_size): for i in range(0, len(lst), chunk_size): - yield lst[i:i + chunk_size] + yield lst[i : i + chunk_size] async def server_status_check(self, server_name): await self.bot.wait_until_ready() @@ -50,27 +50,58 @@ async def server_status_check(self, server_name): channel = self.bot.get_channel(channel_id) if channel: status = await self.check_server_status(server_name) - player_count = await self.get_player_count(server_name) if status == "Online" else 0 - players = await self.get_player_names(server_name) if status == "Online" else [] + player_count = ( + await self.get_player_count(server_name) + if status == "Online" + else 0 + ) + players = ( + await self.get_player_names(server_name) + if status == "Online" + else [] + ) version, description = await self.extract_server_info(server_name) server_config = self.servers[server_name] - max_players = server_config.get('SERVER_SLOTS', 32) - - embed = nextcord.Embed(title=f"{server_name} Status", description=description, - color=nextcord.Color.green() if status == "Online" else nextcord.Color.red()) + max_players = server_config.get("SERVER_SLOTS", 32) + + embed = nextcord.Embed( + title=f"{server_name} Status", + description=description, + color=( + nextcord.Color.green() + if status == "Online" + else nextcord.Color.red() + ), + ) embed.add_field(name="Status", value=status, inline=True) embed.add_field(name="Version", value=version, inline=True) - embed.add_field(name="Players", value=f"{player_count}/{max_players}", inline=False) - embed.add_field(name="Connection Info", value=f"```{server_config['RCON_HOST']}:{server_config['SERVER_PORT']}```", inline=False) - embed.set_footer(text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE) + embed.add_field( + name="Players", + value=f"{player_count}/{max_players}", + inline=False, + ) + embed.add_field( + name="Connection Info", + value=f"```{server_config['RCON_HOST']}:{server_config['SERVER_PORT']}```", + inline=False, + ) + embed.set_footer( + text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE + ) players_chunks = list(self.split_players(players, 11)) - players_embed = nextcord.Embed(title=f"Players Online", color=nextcord.Color.blue()) - + players_embed = nextcord.Embed( + title=f"Players Online", color=nextcord.Color.blue() + ) + for chunk in players_chunks: - players_list = '\n'.join(chunk) if chunk else "No players online." - players_embed.add_field(name="\u200b", value=players_list, inline=True) + players_list = ( + "\n".join(chunk) if chunk else "No players online." + ) + players_embed.add_field( + name="\u200b", value=players_list, inline=True + ) message_key = f"{server_name}_{channel_id}" message_id = self.message_ids.get(message_key) @@ -89,7 +120,9 @@ async def server_status_check(self, server_name): player_message_id = self.message_ids.get(player_message_key) if player_message_id: try: - player_message = await channel.fetch_message(player_message_id) + player_message = await channel.fetch_message( + player_message_id + ) await player_message.edit(embed=players_embed) except nextcord.NotFound: player_message = await channel.send(embed=players_embed) @@ -115,7 +148,9 @@ async def check_server_status(self, server_name): async def get_player_count(self, server_name): try: - players_output = await self.rcon_util.rcon_command(server_name, "ShowPlayers") + players_output = await self.rcon_util.rcon_command( + server_name, "ShowPlayers" + ) if players_output: return len(self.parse_players(players_output)) return 0 @@ -124,7 +159,9 @@ async def get_player_count(self, server_name): async def get_player_names(self, server_name): try: - players_output = await self.rcon_util.rcon_command(server_name, "ShowPlayers") + players_output = await self.rcon_util.rcon_command( + server_name, "ShowPlayers" + ) if players_output: return self.parse_players(players_output) return [] @@ -133,13 +170,13 @@ async def get_player_names(self, server_name): def parse_players(self, players_output): players = [] - lines = players_output.split('\n') + lines = players_output.split("\n") for line in lines[1:]: - parts = line.split(',') + parts = line.split(",") if len(parts) >= 3: players.append(parts[0]) return players - + async def extract_server_info(self, server_name): try: response = await self.rcon_util.rcon_command(server_name, "Info") diff --git a/cogs/restart.py b/cogs/restart.py index dd67003..eb3e143 100644 --- a/cogs/restart.py +++ b/cogs/restart.py @@ -33,11 +33,15 @@ async def shutdown_schedule(self): now_local = now_utc.astimezone(self.timezone) for shutdown_time_str in self.shutdown_config.get("times", []): shutdown_time = datetime.strptime(shutdown_time_str, "%H:%M").time() - shutdown_datetime_local = datetime.now(self.timezone).replace(hour=shutdown_time.hour, minute=shutdown_time.minute, second=0, microsecond=0) + shutdown_datetime_local = datetime.now(self.timezone).replace( + hour=shutdown_time.hour, + minute=shutdown_time.minute, + second=0, + microsecond=0, + ) if shutdown_datetime_local < now_local: shutdown_datetime_local += timedelta(days=1) time_until_shutdown = (shutdown_datetime_local - now_local).total_seconds() - if 300 <= time_until_shutdown < 360: await self.broadcast_warning("Server restart in 5 minutes") @@ -52,7 +56,9 @@ async def broadcast_warning(self, message): for server_name in self.servers: try: message_format = message.replace(" ", "\u001f") - await self.rcon_util.rcon_command(server_name, f"Broadcast {message_format}") + await self.rcon_util.rcon_command( + server_name, f"Broadcast {message_format}" + ) print(f"Broadcasted to {server_name}: {message}") except Exception as e: print(f"Error broadcasting to {server_name}: {e}") @@ -82,7 +88,11 @@ async def announce_restart(self, server_name): now = datetime.now(self.timezone) timestamp = now.strftime("%m-%d-%Y %H:%M:%S") timestamp_desc = now.strftime("%I:%M %p") - embed = nextcord.Embed(title="Server Restart", description=f"The {server_name} server has been restarted at {timestamp_desc}.", color=nextcord.Color.blurple()) + embed = nextcord.Embed( + title="Server Restart", + description=f"The {server_name} server has been restarted at {timestamp_desc}.", + color=nextcord.Color.blurple(), + ) embed.set_footer(text=f"Time: {timestamp}") await channel.send(embed=embed) else: diff --git a/cogs/statustracker.py b/cogs/statustracker.py index 2055af3..fde1d04 100644 --- a/cogs/statustracker.py +++ b/cogs/statustracker.py @@ -24,16 +24,24 @@ async def update_status(self): await self.bot.wait_until_ready() while not self.bot.is_closed() and self.config.get("STATUS_TRACKING", False): total_players = await self.get_total_players() - max_players = sum(server.get("SERVER_SLOTS", 32) for server in self.servers.values()) + max_players = sum( + server.get("SERVER_SLOTS", 32) for server in self.servers.values() + ) status_message = f"{total_players}/{max_players} players" - await self.bot.change_presence(activity=nextcord.Activity(type=nextcord.ActivityType.watching, name=status_message)) + await self.bot.change_presence( + activity=nextcord.Activity( + type=nextcord.ActivityType.watching, name=status_message + ) + ) await asyncio.sleep(60) async def get_total_players(self): total_players = 0 for server_name in self.servers: try: - players_output = await self.rcon_util.rcon_command(server_name, "ShowPlayers") + players_output = await self.rcon_util.rcon_command( + server_name, "ShowPlayers" + ) players = self.parse_players(players_output) total_players += len(players) except Exception as e: @@ -42,9 +50,9 @@ async def get_total_players(self): def parse_players(self, players_output): players = [] - lines = players_output.split('\n') + lines = players_output.split("\n") for line in lines[1:]: - parts = line.split(',') + parts = line.split(",") if len(parts) >= 3: players.append(parts[0]) return players diff --git a/cogs/steam.py b/cogs/steam.py index d5c0712..204d76b 100644 --- a/cogs/steam.py +++ b/cogs/steam.py @@ -10,7 +10,11 @@ def __init__(self, bot): self.bot = bot @nextcord.slash_command(name="steam", description="Search up a steam profile URL.") - async def steam(self, interaction: Interaction, profile_url: str = SlashOption(description="The full URL to the Steam profile")): + async def steam( + self, + interaction: Interaction, + profile_url: str = SlashOption(description="The full URL to the Steam profile"), + ): await interaction.response.defer() try: @@ -20,48 +24,74 @@ async def steam(self, interaction: Interaction, profile_url: str = SlashOption(d if vanity_url: steamid64 = await steam_protocol.resolve_vanity_url(vanity_url) if not steamid64: - await interaction.followup.send("Could not resolve Steam profile URL.") + await interaction.followup.send( + "Could not resolve Steam profile URL." + ) return else: await interaction.followup.send("Invalid Steam profile URL.") return - summary_data, bans_data = await steam_protocol.fetch_steam_profile(steamid64) + summary_data, bans_data = await steam_protocol.fetch_steam_profile( + steamid64 + ) except steam_protocol.InvalidSteamAPIKeyException: - await interaction.followup.send("Error: Invalid Steam API Key. Please configure a valid API key.") + await interaction.followup.send( + "Error: Invalid Steam API Key. Please configure a valid API key." + ) return await self.display_steam_profile(interaction, summary_data, bans_data) async def display_steam_profile(self, interaction, summary_data, bans_data): - player = summary_data['response']['players'][0] if summary_data['response']['players'] else None - ban_info = bans_data['players'][0] if bans_data['players'] else None - + player = ( + summary_data["response"]["players"][0] + if summary_data["response"]["players"] + else None + ) + ban_info = bans_data["players"][0] if bans_data["players"] else None + if player and ban_info: - embed = Embed(title=player.get('personaname'), url=f"https://steamcommunity.com/profiles/{player.get('steamid')}", color=0x1b2838) - embed.set_thumbnail(url=player.get('avatarfull')) + embed = Embed( + title=player.get("personaname"), + url=f"https://steamcommunity.com/profiles/{player.get('steamid')}", + color=0x1B2838, + ) + embed.set_thumbnail(url=player.get("avatarfull")) + + if player.get("realname"): + embed.add_field(name="Name", value=player.get("realname"), inline=False) + + if player.get("gameextrainfo"): + embed.add_field( + name="Playing", value=player.get("gameextrainfo"), inline=False + ) - if player.get('realname'): - embed.add_field(name="Name", value=player.get('realname'), inline=False) - - if player.get('gameextrainfo'): - embed.add_field(name="Playing", value=player.get('gameextrainfo'), inline=False) + embed.add_field( + name="SteamID64", value=f"```{player.get('steamid')}```", inline=False + ) - embed.add_field(name="SteamID64", value=f"```{player.get('steamid')}```", inline=False) - utc_zone = pytz.utc - local_zone = pytz.timezone('UTC') - creation_timestamp = datetime.datetime.fromtimestamp(player.get('timecreated'), tz=utc_zone) if player.get('timecreated') else None + local_zone = pytz.timezone("UTC") + creation_timestamp = ( + datetime.datetime.fromtimestamp(player.get("timecreated"), tz=utc_zone) + if player.get("timecreated") + else None + ) if creation_timestamp: - creation_date = creation_timestamp.astimezone(local_zone).strftime('%Y-%m-%d') + creation_date = creation_timestamp.astimezone(local_zone).strftime( + "%Y-%m-%d" + ) else: - creation_date = 'Not available' + creation_date = "Not available" embed.add_field(name="Account Creation", value=creation_date, inline=False) - - if player.get('loccountrycode'): - embed.add_field(name="Country", value=player.get('loccountrycode'), inline=False) - - vac_banned = "Yes" if ban_info['VACBanned'] else "No" + + if player.get("loccountrycode"): + embed.add_field( + name="Country", value=player.get("loccountrycode"), inline=False + ) + + vac_banned = "Yes" if ban_info["VACBanned"] else "No" embed.add_field(name="VAC Banned", value=vac_banned, inline=False) await interaction.followup.send(embed=embed) diff --git a/cogs/whitelist.py b/cogs/whitelist.py index de1308d..110f9e2 100644 --- a/cogs/whitelist.py +++ b/cogs/whitelist.py @@ -11,8 +11,8 @@ class PlayerInfoCog(commands.Cog): def __init__(self, bot): self.bot = bot - self.data_folder = 'data' - self.player_data_file = os.path.join(self.data_folder, 'players.json') + self.data_folder = "data" + self.player_data_file = os.path.join(self.data_folder, "players.json") self.servers = self.load_servers_config() self.rcon_util = RconUtility(self.servers) self.ensure_data_file() @@ -26,7 +26,7 @@ def load_servers_config(self): def ensure_data_file(self): if not os.path.exists(self.player_data_file): - with open(self.player_data_file, 'w') as file: + with open(self.player_data_file, "w") as file: json.dump({}, file) async def run_showplayers_command(self, server_name): @@ -47,7 +47,7 @@ async def update_players(self): player_data = await self.run_showplayers_command(server_name) if player_data: self.process_and_save_player_data(server_name, player_data) - if self.servers[server_name].get('WHITELIST_ENABLED', False): + if self.servers[server_name].get("WHITELIST_ENABLED", False): await self.whitelist_check(server_name, player_data) await asyncio.sleep(15) @@ -55,94 +55,169 @@ async def whitelist_check(self, server_name, player_data): with open(self.player_data_file) as file: players = json.load(file) - for line in player_data.split('\n')[1:]: + for line in player_data.split("\n")[1:]: if line.strip(): - _, playeruid, steamid = line.split(',')[:3] - if not any(info.get("whitelist", False) for player, info in players.items() if info.get("playeruid") == playeruid): + _, playeruid, steamid = line.split(",")[:3] + if not any( + info.get("whitelist", False) + for player, info in players.items() + if info.get("playeruid") == playeruid + ): await self.kick_player(server_name, steamid, playeruid=playeruid) - async def kick_player(self, server_name, steamid, playeruid=None, reason="not being whitelisted"): + async def kick_player( + self, server_name, steamid, playeruid=None, reason="not being whitelisted" + ): try: - identifier = steamid if steamid and self.is_valid_steamid(steamid) else playeruid + identifier = ( + steamid if steamid and self.is_valid_steamid(steamid) else playeruid + ) command = f"KickPlayer {identifier}" - + result = await self.rcon_util.rcon_command(server_name, command) - + if "Failed" in result: - print(f"Failed to execute kick command for {identifier} on {server_name}: {result}") + print( + f"Failed to execute kick command for {identifier} on {server_name}: {result}" + ) else: - print(f"Successfully executed kick command for {identifier} on {server_name}: {result}") - + print( + f"Successfully executed kick command for {identifier} on {server_name}: {result}" + ) + server_info = self.servers[server_name] if "CONNECTION_CHANNEL" in server_info: channel = self.bot.get_channel(server_info["CONNECTION_CHANNEL"]) if channel: - embed_description = f"Player `{identifier}` kicked for {reason}." if "Failed" not in result else f"Failed to kick Player `{identifier}`: {reason}." - embed = nextcord.Embed(title="Whitelist Check", description=embed_description, color=nextcord.Color.red() if "Failed" in result else nextcord.Color.green()) - embed.set_footer(text=f"Timestamp: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + embed_description = ( + f"Player `{identifier}` kicked for {reason}." + if "Failed" not in result + else f"Failed to kick Player `{identifier}`: {reason}." + ) + embed = nextcord.Embed( + title="Whitelist Check", + description=embed_description, + color=( + nextcord.Color.red() + if "Failed" in result + else nextcord.Color.green() + ), + ) + embed.set_footer( + text=f"Timestamp: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) await channel.send(embed=embed) except Exception as e: - print(f"Exception during kick command for {identifier} on {server_name}: {e}") + print( + f"Exception during kick command for {identifier} on {server_name}: {e}" + ) def is_valid_steamid(self, steamid): - return bool(re.match(r'^7656119[0-9]{10}$', steamid)) + return bool(re.match(r"^7656119[0-9]{10}$", steamid)) def process_and_save_player_data(self, server_name, data): if data.strip(): - with open(self.player_data_file, 'r+') as file: + with open(self.player_data_file, "r+") as file: existing_players = json.load(file) - for line in data.split('\n')[1:]: + for line in data.split("\n")[1:]: if line.strip(): - name, playeruid, steamid = [part.strip() for part in line.split(',')[:3]] + name, playeruid, steamid = [ + part.strip() for part in line.split(",")[:3] + ] if self.is_valid_steamid(steamid): - existing_players[steamid] = {"name": name, "playeruid": playeruid, "whitelist": existing_players.get(steamid, {}).get("whitelist", False)} + existing_players[steamid] = { + "name": name, + "playeruid": playeruid, + "whitelist": existing_players.get(steamid, {}).get( + "whitelist", False + ), + } file.seek(0) json.dump(existing_players, file) file.truncate() - @nextcord.slash_command(description="Search the user database.", default_member_permissions=nextcord.Permissions(administrator=True)) + @nextcord.slash_command( + description="Search the user database.", + default_member_permissions=nextcord.Permissions(administrator=True), + ) async def paldb(self, interaction: nextcord.Interaction): pass - async def steamid_autocomplete(self, interaction: nextcord.Interaction, current: str): - with open(self.player_data_file, 'r') as file: + async def steamid_autocomplete( + self, interaction: nextcord.Interaction, current: str + ): + with open(self.player_data_file, "r") as file: players = json.load(file) matches = [steamid for steamid in players if current.lower() in steamid.lower()] return matches[:25] @paldb.subcommand(name="steam", description="Find player by SteamID") - async def search(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="Enter SteamID", autocomplete=True)): - with open(self.player_data_file, 'r') as file: + async def search( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption( + description="Enter SteamID", autocomplete=True + ), + ): + with open(self.player_data_file, "r") as file: players = json.load(file) player_info = players.get(steamid) if player_info: - embed = nextcord.Embed(title="Player Information", color=nextcord.Color.blue()) - embed.add_field(name="Name", value=f"```{player_info['name']}```", inline=False) - embed.add_field(name="Player UID", value=f"```{player_info['playeruid']}```", inline=False) + embed = nextcord.Embed( + title="Player Information", color=nextcord.Color.blue() + ) + embed.add_field( + name="Name", value=f"```{player_info['name']}```", inline=False + ) + embed.add_field( + name="Player UID", + value=f"```{player_info['playeruid']}```", + inline=False, + ) embed.add_field(name="SteamID", value=f"```{steamid}```", inline=False) - embed.add_field(name="Whitelist", value=f"```{str(player_info['whitelist'])}```", inline=False) - embed.set_footer(text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE) + embed.add_field( + name="Whitelist", + value=f"```{str(player_info['whitelist'])}```", + inline=False, + ) + embed.set_footer( + text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE + ) await interaction.response.send_message(embed=embed) else: - await interaction.response.send_message(f"No player found with SteamID {steamid}", ephemeral=True) + await interaction.response.send_message( + f"No player found with SteamID {steamid}", ephemeral=True + ) @search.on_autocomplete("steamid") - async def on_steamid_autocomplete(self, interaction: nextcord.Interaction, current: str): + async def on_steamid_autocomplete( + self, interaction: nextcord.Interaction, current: str + ): choices = await self.steamid_autocomplete(interaction, current) await interaction.response.send_autocomplete(choices) async def name_autocomplete(self, interaction: nextcord.Interaction, current: str): - with open(self.player_data_file, 'r') as file: + with open(self.player_data_file, "r") as file: players = json.load(file) - matches = [player["name"] for steamid, player in players.items() if player["name"] and current.lower() in player["name"].lower()] + matches = [ + player["name"] + for steamid, player in players.items() + if player["name"] and current.lower() in player["name"].lower() + ] return matches[:25] @paldb.subcommand(name="name", description="Find player by name") - async def searchname(self, interaction: nextcord.Interaction, name: str = nextcord.SlashOption(description="Enter player name", autocomplete=True)): - with open(self.player_data_file, 'r') as file: + async def searchname( + self, + interaction: nextcord.Interaction, + name: str = nextcord.SlashOption( + description="Enter player name", autocomplete=True + ), + ): + with open(self.player_data_file, "r") as file: players = json.load(file) player_info = None @@ -154,41 +229,81 @@ async def searchname(self, interaction: nextcord.Interaction, name: str = nextco break if player_info and player_steamid: - embed = nextcord.Embed(title="Player Information", color=nextcord.Color.blue()) - embed.add_field(name="Name", value=f"```{player_info['name']}```", inline=False) - embed.add_field(name="Player UID", value=f"```{player_info['playeruid']}```", inline=False) - embed.add_field(name="SteamID", value=f"```{player_steamid}```", inline=False) - embed.add_field(name="Whitelist", value=f"```{str(player_info['whitelist'])}```", inline=False) - embed.set_footer(text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE) + embed = nextcord.Embed( + title="Player Information", color=nextcord.Color.blue() + ) + embed.add_field( + name="Name", value=f"```{player_info['name']}```", inline=False + ) + embed.add_field( + name="Player UID", + value=f"```{player_info['playeruid']}```", + inline=False, + ) + embed.add_field( + name="SteamID", value=f"```{player_steamid}```", inline=False + ) + embed.add_field( + name="Whitelist", + value=f"```{str(player_info['whitelist'])}```", + inline=False, + ) + embed.set_footer( + text=constants.FOOTER_TEXT, icon_url=constants.FOOTER_IMAGE + ) await interaction.response.send_message(embed=embed) else: - await interaction.response.send_message(f"No player found with name '{name}'", ephemeral=True) + await interaction.response.send_message( + f"No player found with name '{name}'", ephemeral=True + ) @searchname.on_autocomplete("name") - async def on_name_autocomplete(self, interaction: nextcord.Interaction, current: str): + async def on_name_autocomplete( + self, interaction: nextcord.Interaction, current: str + ): choices = await self.name_autocomplete(interaction, current) await interaction.response.send_autocomplete(choices) - @nextcord.slash_command(description="Manage the whitelist", default_member_permissions=nextcord.Permissions(administrator=True)) + @nextcord.slash_command( + description="Manage the whitelist", + default_member_permissions=nextcord.Permissions(administrator=True), + ) async def whitelist(self, interaction: nextcord.Interaction): pass @whitelist.subcommand(name="add", description="Add player to whitelist") - async def whitelist_add(self, interaction: nextcord.Interaction, steamid: str = nextcord.SlashOption(description="Enter SteamID", required=True), playeruid: str = nextcord.SlashOption(description="Enter PlayerUID", required=False)): + async def whitelist_add( + self, + interaction: nextcord.Interaction, + steamid: str = nextcord.SlashOption(description="Enter SteamID", required=True), + playeruid: str = nextcord.SlashOption( + description="Enter PlayerUID", required=False + ), + ): if not steamid and not playeruid: - await interaction.response.send_message("Please provide either a SteamID or PlayerUID.", ephemeral=True) + await interaction.response.send_message( + "Please provide either a SteamID or PlayerUID.", ephemeral=True + ) return identifier = steamid if steamid else playeruid - with open(self.player_data_file, 'r+') as file: + with open(self.player_data_file, "r+") as file: players = json.load(file) if identifier not in players: - players[identifier] = {"name": None, "playeruid": playeruid if playeruid else None, "whitelist": True} + players[identifier] = { + "name": None, + "playeruid": playeruid if playeruid else None, + "whitelist": True, + } file.seek(0) json.dump(players, file) file.truncate() - message = f"Player {identifier} added to whitelist and will be fully registered upon joining." if steamid else f"Player with PlayerUID {playeruid} added to whitelist and will be fully registered upon joining." + message = ( + f"Player {identifier} added to whitelist and will be fully registered upon joining." + if steamid + else f"Player with PlayerUID {playeruid} added to whitelist and will be fully registered upon joining." + ) else: players[identifier]["whitelist"] = True if playeruid: @@ -196,31 +311,37 @@ async def whitelist_add(self, interaction: nextcord.Interaction, steamid: str = file.seek(0) json.dump(players, file) file.truncate() - message = f"Player {identifier} added to whitelist." if steamid else f"Player with PlayerUID {playeruid} added to whitelist." + message = ( + f"Player {identifier} added to whitelist." + if steamid + else f"Player with PlayerUID {playeruid} added to whitelist." + ) await interaction.response.send_message(message, ephemeral=True) @whitelist.subcommand(name="remove", description="Remove player from whitelist") async def whitelist_remove(self, interaction: nextcord.Interaction, steamid: str): - with open(self.player_data_file, 'r') as file: + with open(self.player_data_file, "r") as file: players = json.load(file) if steamid in players and players[steamid]["whitelist"]: players[steamid]["whitelist"] = False - with open(self.player_data_file, 'w') as file: + with open(self.player_data_file, "w") as file: json.dump(players, file) - await interaction.response.send_message(f"Player {steamid} removed from whitelist.", ephemeral=True) + await interaction.response.send_message( + f"Player {steamid} removed from whitelist.", ephemeral=True + ) else: - await interaction.response.send_message(f"Player {steamid} not found or not on whitelist.", ephemeral=True) + await interaction.response.send_message( + f"Player {steamid} not found or not on whitelist.", ephemeral=True + ) + def setup(bot): cog = PlayerInfoCog(bot) bot.add_cog(cog) - if not hasattr(bot, 'all_slash_commands'): + if not hasattr(bot, "all_slash_commands"): bot.all_slash_commands = [] - bot.all_slash_commands.extend([ - cog.search, - cog.searchname, - cog.whitelist_add, - cog.whitelist_remove - ]) \ No newline at end of file + bot.all_slash_commands.extend( + [cog.search, cog.searchname, cog.whitelist_add, cog.whitelist_remove] + ) \ No newline at end of file diff --git a/gamedata/kits.json b/gamedata/kits.json.template similarity index 100% rename from gamedata/kits.json rename to gamedata/kits.json.template diff --git a/main.py b/main.py index beda488..cd83ab6 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,9 @@ import importlib.util intents = nextcord.Intents.all() -bot = commands.Bot(command_prefix=settings.bot_prefix, intents=intents, help_command=None) +bot = commands.Bot( + command_prefix=settings.bot_prefix, intents=intents, help_command=None +) @bot.event async def on_ready(): @@ -18,15 +20,17 @@ async def on_ready(): \/ \/ """ print(ascii_art) - print(f'{bot.user} is ready! Created by koz') - activity = nextcord.Activity(type=nextcord.ActivityType.playing, name=settings.bot_activity) + print(f"{bot.user} is ready! Created by koz") + activity = nextcord.Activity( + type=nextcord.ActivityType.playing, name=settings.bot_activity + ) await bot.change_presence(activity=activity) # Error Handling @bot.event async def on_command_error(ctx, error): if isinstance(error, commands.CommandNotFound): - await ctx.send('This command does not exist.') + await ctx.send("This command does not exist.") elif isinstance(error, commands.MissingPermissions): await ctx.send("You don't have the required permissions to use this command.") elif isinstance(error, commands.MissingRequiredArgument): @@ -34,11 +38,11 @@ async def on_command_error(ctx, error): elif isinstance(error, commands.CommandOnCooldown): await ctx.send("This command is on cooldown. Please try again later.") else: - await ctx.send(f'An error occured: {error}') + await ctx.send(f"An error occured: {error}") @bot.command() async def ping(ctx): - await ctx.send(f'Pong! {round(bot.latency * 1000)}ms') + await ctx.send(f"Pong! {round(bot.latency * 1000)}ms") def has_setup_function(module_name): module_spec = importlib.util.find_spec(module_name) @@ -46,19 +50,19 @@ def has_setup_function(module_name): return False module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) - return hasattr(module, 'setup') + return hasattr(module, "setup") for entry in os.listdir("cogs"): - if entry.endswith('.py'): + if entry.endswith(".py"): module_name = f"cogs.{entry[:-3]}" if has_setup_function(module_name): bot.load_extension(module_name) elif os.path.isdir(f"cogs/{entry}"): for filename in os.listdir(f"cogs/{entry}"): - if filename.endswith('.py'): + if filename.endswith(".py"): module_name = f"cogs.{entry}.{filename[:-3]}" if has_setup_function(module_name): bot.load_extension(module_name) -if __name__ == '__main__': +if __name__ == "__main__": bot.run(settings.bot_token) \ No newline at end of file diff --git a/settings.py b/settings.py index f8b5ab4..4806b5a 100644 --- a/settings.py +++ b/settings.py @@ -2,7 +2,7 @@ from dotenv import load_dotenv load_dotenv() -bot_token = os.getenv('BOT_TOKEN', "No token found") -bot_prefix = os.getenv('BOT_PREFIX', "!") -bot_activity = os.getenv('BOT_ACTIVITY', "Palworld") -steam_api_key = os.getenv('STEAM_API_KEY', "No key found") \ No newline at end of file +bot_token = os.getenv("BOT_TOKEN", "No token found") +bot_prefix = os.getenv("BOT_PREFIX", "!") +bot_activity = os.getenv("BOT_ACTIVITY", "Palworld") +steam_api_key = os.getenv("STEAM_API_KEY", "No key found") \ No newline at end of file diff --git a/util/rconutility.py b/util/rconutility.py index 5d64bee..cf8ed8c 100644 --- a/util/rconutility.py +++ b/util/rconutility.py @@ -1,6 +1,12 @@ import asyncio import base64 -from gamercon_async import GameRCON, GameRCONBase64, ClientError, TimeoutError, InvalidPassword +from gamercon_async import ( + GameRCON, + GameRCONBase64, + ClientError, + TimeoutError, + InvalidPassword, +) import time class RconUtility: @@ -13,30 +19,40 @@ def __init__(self, servers, timeout=30, encoding_info_ttl=50): async def check_encoding(self, server_name): current_time = time.time() encoding_info = self.memory_encoding.get(server_name) - - if encoding_info and (current_time - encoding_info['timestamp'] < self.encoding_info_ttl): - return encoding_info['needs_base64'] + + if encoding_info and ( + current_time - encoding_info["timestamp"] < self.encoding_info_ttl + ): + return encoding_info["needs_base64"] server = self.servers.get(server_name) if not server: raise ValueError(f"Server '{server_name}' not found.") try: - async with GameRCON(server["RCON_HOST"], server["RCON_PORT"], server["RCON_PASS"], self.timeout) as rcon: + async with GameRCON( + server["RCON_HOST"], + server["RCON_PORT"], + server["RCON_PASS"], + self.timeout, + ) as rcon: response = await rcon.send("Info") needs_base64 = self.base64_encoded(response) except (ClientError, TimeoutError, InvalidPassword) as e: print(f"Error connecting to server {server_name}: {e}") needs_base64 = False - self.memory_encoding[server_name] = {'needs_base64': needs_base64, 'timestamp': current_time} + self.memory_encoding[server_name] = { + "needs_base64": needs_base64, + "timestamp": current_time, + } return needs_base64 def base64_encoded(self, s): try: if not s: return False - return base64.b64encode(base64.b64decode(s)).decode('utf-8') == s + return base64.b64encode(base64.b64decode(s)).decode("utf-8") == s except Exception: return False @@ -49,10 +65,15 @@ async def rcon_command(self, server_name, command): needs_base64 = await self.check_encoding(server_name) ProtocolClass = GameRCONBase64 if needs_base64 else GameRCON - async with ProtocolClass(server["RCON_HOST"], server["RCON_PORT"], server["RCON_PASS"], self.timeout) as rcon: + async with ProtocolClass( + server["RCON_HOST"], + server["RCON_PORT"], + server["RCON_PASS"], + self.timeout, + ) as rcon: response = await rcon.send(command) if needs_base64 and self.base64_encoded(response): - response = base64.b64decode(response).decode('utf-8') + response = base64.b64decode(response).decode("utf-8") return response except (ClientError, TimeoutError, InvalidPassword) as e: return f"Failed to execute command on server {server_name}: {e}" diff --git a/util/steam_protocol.py b/util/steam_protocol.py index ed39bab..089c45c 100644 --- a/util/steam_protocol.py +++ b/util/steam_protocol.py @@ -3,7 +3,6 @@ import asyncio from settings import steam_api_key -# exceptions class InvalidSteamAPIKeyException(Exception): pass @@ -14,26 +13,25 @@ async def resolve_vanity_url(vanity_url): if response.status == 403: raise InvalidSteamAPIKeyException("Invalid Steam API Key.") data = await response.json() - if data['response']['success'] == 1: - return data['response']['steamid'] + if data["response"]["success"] == 1: + return data["response"]["steamid"] return None def extract_steamid64(url): - match = re.search(r'steamcommunity\.com/profiles/(\d+)', url) + match = re.search(r"steamcommunity\.com/profiles/(\d+)", url) return match.group(1) if match else None def extract_vanity_url(url): - match = re.search(r'steamcommunity\.com/id/([^/]+)', url) + match = re.search(r"steamcommunity\.com/id/([^/]+)", url) return match.group(1) if match else None async def fetch_steam_profile(steamid64): summary_url = f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?key={steam_api_key}&steamids={steamid64}" bans_url = f"https://api.steampowered.com/ISteamUser/GetPlayerBans/v1/?key={steam_api_key}&steamids={steamid64}" - + async with aiohttp.ClientSession() as session: summary_response, bans_response = await asyncio.gather( - session.get(summary_url), - session.get(bans_url) + session.get(summary_url), session.get(bans_url) ) if summary_response.status == 403 or bans_response.status == 403: raise InvalidSteamAPIKeyException("Invalid Steam API Key.")