From 3d7ec79be63d39a27114a6fbf5b0b3ea6873da4b Mon Sep 17 00:00:00 2001 From: hypergonial Date: Wed, 5 Jun 2024 00:15:52 +0200 Subject: [PATCH] Wrap up rewriting extensions --- pyproject.toml | 7 +- src/extensions/automod.py | 44 +------ src/extensions/fun.py | 8 +- src/extensions/misc.py | 119 +++++++++--------- src/extensions/moderation.py | 206 +++++++++++++++---------------- src/extensions/settings.py | 27 +--- src/extensions/tags.py | 4 +- src/extensions/troubleshooter.py | 7 +- src/models/mod_actions.py | 63 ++++++++++ src/utils/dictionaryapi.py | 6 +- 10 files changed, 246 insertions(+), 245 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d264309..2577f4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,9 @@ [tool.ruff] exclude = ["examples", "docs", "build"] +line-length = 120 +target-version = "py312" + +[tool.ruff.lint] select = [ "E", "F", @@ -19,8 +23,7 @@ select = [ ] ignore = ["F405", "F403", "E501", "D203", "D205", "D213", "RUF001"] fixable = ["I", "TCH", "D"] -line-length = 120 -target-version = "py312" + [tool.mypy] ignore_errors = true # I use pyright only because mypy dumb diff --git a/src/extensions/automod.py b/src/extensions/automod.py index 8876fd3..389d2df 100644 --- a/src/extensions/automod.py +++ b/src/extensions/automod.py @@ -1,6 +1,5 @@ import datetime import enum -import json import logging import re import typing as t @@ -11,7 +10,7 @@ import src.utils as utils from src.etc import const -from src.etc.settings_static import default_automod_policies, notices +from src.etc.settings_static import notices from src.models.client import SnedClient, SnedPlugin from src.models.events import AutoModMessageFlagEvent from src.utils import helpers @@ -61,45 +60,6 @@ class AutoModState(enum.Enum): PERMABAN = "permaban" -# TODO: Purge this cursed abomination -async def get_policies(guild: hikari.SnowflakeishOr[hikari.Guild]) -> dict[str, t.Any]: - """Return auto-moderation policies for the specified guild. - - Parameters - ---------- - guild : hikari.SnowflakeishOr[hikari.Guild] - The guild to get policies for. - - Returns - ------- - dict[str, t.Any] - The guild's auto-moderation policies. - """ - guild_id = hikari.Snowflake(guild) - - records = await plugin.client.db_cache.get(table="mod_config", guild_id=guild_id) - - policies = json.loads(records[0]["automod_policies"]) if records else default_automod_policies - - for key in default_automod_policies: - if key not in policies: - policies[key] = default_automod_policies[key] - - for nested_key in default_automod_policies[key]: - if nested_key not in policies[key]: - policies[key][nested_key] = default_automod_policies[key][nested_key] - - invalid = [] - for key in policies: - if key not in default_automod_policies: - invalid.append(key) - - for key in invalid: - policies.pop(key) - - return policies - - def can_automod_punish(me: hikari.Member, offender: hikari.Member) -> bool: """Determine if automod can punish a member. This checks all required permissions and if the member is a cool person or not. @@ -628,7 +588,7 @@ async def scan_messages(event: hikari.GuildMessageCreateEvent | hikari.GuildMess if not message.member or message.member.is_bot: return - policies = await get_policies(message.guild_id) + policies = await plugin.client.mod.get_automod_policies(message.guild_id) if isinstance(event, hikari.GuildMessageUpdateEvent): all( diff --git a/src/extensions/fun.py b/src/extensions/fun.py index abd8823..64ca742 100644 --- a/src/extensions/fun.py +++ b/src/extensions/fun.py @@ -504,11 +504,9 @@ async def calc( @arc.slash_command("tictactoe", "Play tic tac toe with someone!") async def tictactoe( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to play tic tac toe with!")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to play tic tac toe with!")], size: arc.Option[int, arc.IntParams("The size of the board. Default is 3.", choices=[3, 4, 5])] = 3, ) -> None: - if not helpers.is_member(user): - return assert ctx.member is not None if user.id == ctx.author.id: @@ -728,11 +726,9 @@ async def urban_lookup(ctx: SnedContext, word: arc.Option[str, arc.StrParams("Th @arc.slash_command("avatar", "Displays a user's avatar for your viewing pleasure.") async def avatar( ctx: SnedContext, - user: arc.Option[hikari.User | None, arc.UserParams("The user to show the avatar for.")] = None, + user: arc.Option[hikari.Member | None, arc.MemberParams("The user to show the avatar for.")] = None, show_global: arc.Option[bool, arc.BoolParams("To show the global avatar or not, if applicable.")] = False, ) -> None: - if user and not helpers.is_member(user): - return member = user or ctx.member assert member is not None diff --git a/src/extensions/misc.py b/src/extensions/misc.py index a13f0e9..5287ef8 100644 --- a/src/extensions/misc.py +++ b/src/extensions/misc.py @@ -313,7 +313,7 @@ async def edit( assert ctx.interaction.app_permissions is not None - channel = ctx.app.cache.get_guild_channel(message.channel_id) or await ctx.app.rest.fetch_channel( + channel = ctx.client.cache.get_guild_channel(message.channel_id) or await ctx.client.rest.fetch_channel( message.channel_id ) @@ -321,13 +321,11 @@ async def edit( ctx.interaction.app_permissions, hikari.Permissions.SEND_MESSAGES | hikari.Permissions.VIEW_CHANNEL | hikari.Permissions.READ_MESSAGE_HISTORY, ): - raise lightbulb.BotMissingRequiredPermission( - perms=hikari.Permissions.SEND_MESSAGES - | hikari.Permissions.VIEW_CHANNEL - | hikari.Permissions.READ_MESSAGE_HISTORY + raise arc.BotMissingPermissionsError( + hikari.Permissions.SEND_MESSAGES | hikari.Permissions.VIEW_CHANNEL | hikari.Permissions.READ_MESSAGE_HISTORY ) - if message.author.id != ctx.app.user_id: + if message.author.id != ctx.client.user_id: await ctx.respond( embed=hikari.Embed( title="❌ Not Authored", @@ -349,7 +347,8 @@ async def edit( max_length=2000, ) ) - await modal.send(ctx.interaction) + await ctx.respond_with_builder(modal.build_response(ctx.client.miru)) + ctx.client.miru.start_modal(modal) await modal.wait() if not modal.last_context: return @@ -362,18 +361,16 @@ async def edit( ) -@plugin.command -@lightbulb.app_command_permissions(None, dm_enabled=False) -@lightbulb.add_checks( - bot_has_permissions( +@plugin.include +@arc.with_hook( + arc.bot_has_permissions( hikari.Permissions.SEND_MESSAGES | hikari.Permissions.VIEW_CHANNEL | hikari.Permissions.READ_MESSAGE_HISTORY ) ) -@lightbulb.command("Raw Content", "Show raw content for this message.", pass_options=True) -@lightbulb.implements(lightbulb.MessageCommand) -async def raw(ctx: SnedMessageContext, target: hikari.Message) -> None: - if target.content: - await ctx.respond(f"```{target.content[:1990]}```", flags=hikari.MessageFlag.EPHEMERAL) +@arc.message_command("Raw Content") +async def raw(ctx: SnedContext, message: hikari.Message) -> None: + if message.content: + await ctx.respond(f"```{message.content[:1990]}```", flags=hikari.MessageFlag.EPHEMERAL) else: await ctx.respond( embed=hikari.Embed( @@ -385,14 +382,21 @@ async def raw(ctx: SnedMessageContext, target: hikari.Message) -> None: ) -@plugin.command -@lightbulb.app_command_permissions(None, dm_enabled=False) -@lightbulb.option("timezone", "The timezone to set as your default. Example: 'Europe/Kiev'", autocomplete=True) -@lightbulb.command( - "timezone", "Sets your preferred timezone for other time-related commands to use.", pass_options=True -) -@lightbulb.implements(lightbulb.SlashCommand) -async def set_timezone(ctx: SnedSlashContext, timezone: str) -> None: +async def tz_autocomplete(data: arc.AutocompleteData[SnedClient, str]) -> list[str]: + if data.focused_value: + return get_close_matches(data.focused_value.title(), pytz.common_timezones, 25) + return [] + + +@plugin.include +@arc.slash_command("timezone", "Sets your preferred timezone for other time-related commands to use.") +async def set_timezone( + ctx: SnedContext, + timezone: arc.Option[ + str, + arc.StrParams("The timezone to set as your default. Example: 'Europe/Kiev'", autocomplete_with=tz_autocomplete), + ], +) -> None: if timezone.title() not in pytz.common_timezones: await ctx.respond( embed=hikari.Embed( @@ -404,16 +408,16 @@ async def set_timezone(ctx: SnedSlashContext, timezone: str) -> None: ) return - await ctx.app.db.execute( + await ctx.client.db.execute( """ - INSERT INTO preferences (user_id, timezone) - VALUES ($1, $2) - ON CONFLICT (user_id) DO - UPDATE SET timezone = $2""", + INSERT INTO preferences (user_id, timezone) + VALUES ($1, $2) + ON CONFLICT (user_id) DO + UPDATE SET timezone = $2""", ctx.user.id, timezone.title(), ) - await ctx.app.db_cache.refresh(table="preferences", user_id=ctx.user.id, timezone=timezone.title()) + await ctx.client.db_cache.refresh(table="preferences", user_id=ctx.user.id, timezone=timezone.title()) await ctx.respond( embed=hikari.Embed( @@ -425,40 +429,31 @@ async def set_timezone(ctx: SnedSlashContext, timezone: str) -> None: ) -@set_timezone.autocomplete("timezone") -async def tz_opts( - option: hikari.AutocompleteInteractionOption, interaction: hikari.AutocompleteInteraction -) -> list[str]: - if option.value: - assert isinstance(option.value, str) - return get_close_matches(option.value.title(), pytz.common_timezones, 25) - return [] - - -@plugin.command -@lightbulb.app_command_permissions(None, dm_enabled=False) -@lightbulb.option( - "style", - "Timestamp style.", - choices=[ - "t - Short time", - "T - Long time", - "d - Short date", - "D - Long Date", - "f - Short Datetime", - "F - Long Datetime", - "R - Relative", +@plugin.include +@arc.slash_command("timestamp", "Create a Discord timestamp from human-readable time formats and dates.") +async def timestamp_gen( + ctx: SnedContext, + time: arc.Option[ + str, arc.StrParams("The time to create the timestamp from. Examples: 'in 20 minutes', '2022-04-03', '21:43'") ], - required=False, -) -@lightbulb.option("time", "The time to create the timestamp from. Examples: 'in 20 minutes', '2022-04-03', '21:43'") -@lightbulb.command( - "timestamp", "Create a Discord timestamp from human-readable time formats and dates.", pass_options=True -) -@lightbulb.implements(lightbulb.SlashCommand) -async def timestamp_gen(ctx: SnedSlashContext, time: str, style: str | None = None) -> None: + style: arc.Option[ + str | None, + arc.StrParams( + "Timestamp style.", + choices=[ + "t - Short time", + "T - Long time", + "d - Short date", + "D - Long Date", + "f - Short Datetime", + "F - Long Datetime", + "R - Relative", + ], + ), + ] = None, +) -> None: try: - converted_time = await ctx.app.scheduler.convert_time( + converted_time = await ctx.client.scheduler.convert_time( time, conversion_mode=ConversionMode.ABSOLUTE, user=ctx.user ) except ValueError as error: diff --git a/src/extensions/moderation.py b/src/extensions/moderation.py index ea93516..4041c42 100644 --- a/src/extensions/moderation.py +++ b/src/extensions/moderation.py @@ -54,11 +54,9 @@ async def whois_user_command( @arc.slash_subcommand("add", "Add a role to the target user.") async def role_add( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to add the role to.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to add the role to.")], role: arc.Option[hikari.Role, arc.RoleParams("The role to add.")], ) -> None: - if not helpers.is_member(user): - return assert ctx.guild_id and ctx.member me = ctx.client.cache.get_member(ctx.guild_id, ctx.client.user_id) @@ -100,10 +98,11 @@ async def role_add( await ctx.client.rest.add_role_to_member( ctx.guild_id, user, role, reason=f"{ctx.member} ({ctx.member.id}): Added role via Sned" ) - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="✅ Role added", description=f"Added role {role.mention} to `{user}`.", color=const.EMBED_GREEN - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -111,11 +110,9 @@ async def role_add( @arc.slash_subcommand("remove", "Remove a role from the target user.") async def role_del( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to remove the role from.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to remove the role from.")], role: arc.Option[hikari.Role, arc.RoleParams("The role to remove.")], ) -> None: - if not helpers.is_member(user): - return assert ctx.guild_id and ctx.member me = ctx.client.cache.get_member(ctx.guild_id, ctx.client.user_id) @@ -157,10 +154,11 @@ async def role_del( await ctx.client.rest.remove_role_from_member( ctx.guild_id, user, role, reason=f"{ctx.member} ({ctx.member.id}): Removed role via Sned" ) - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="✅ Role removed", description=f"Removed role {role.mention} from `{user}`.", color=const.EMBED_GREEN - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -189,7 +187,7 @@ async def purge( ] = None, ) -> None: channel = ctx.get_channel() or await ctx.client.rest.fetch_channel(ctx.channel_id) - assert isinstance(channel, hikari.TextableGuildChannel) + assert isinstance(channel, hikari.TextableGuildChannel) and ctx.guild_id predicates = [ # Ignore deferred typing indicator so it doesn't get deleted lmfao @@ -275,7 +273,7 @@ async def purge( color=const.ERROR_COLOR, ) - await ctx.mod_respond(embed=embed) + await ctx.respond(embed=embed, flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id))) @plugin.include @@ -287,36 +285,36 @@ async def purge( ) async def deobfuscate( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user who's nickname should be deobfuscated.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user who's nickname should be deobfuscated.")], strict: arc.Option[ bool, arc.BoolParams("If enabled, uses stricter filtering and may filter out certain valid letters.") ] = True, ) -> None: - if not helpers.is_member(user): - return - + assert ctx.guild_id is not None new_nick = helpers.normalize_string(user.display_name, strict=strict) if not new_nick: new_nick = "Blessed by Sned" if new_nick == user.display_name: - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="ℹ️ No action taken", description=f"The nickname of **{user.display_name}** is already deobfuscated or contains nothing to deobfuscate.", color=const.EMBED_BLUE, - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) return await user.edit(nickname=new_nick, reason=f"{ctx.author} ({ctx.author.id}): Deobfuscated nickname") - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="✅ Deobfuscated!", description=f"{user.mention}'s nickname is now: `{new_nick}`", color=const.EMBED_GREEN, - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -328,23 +326,25 @@ async def deobfuscate( @journal.include @arc.slash_subcommand("get", "Retrieve the journal for the specified user.") async def journal_get( - ctx: SnedContext, user: arc.Option[hikari.User, arc.UserParams("The user to retrieve the journal for.")] + ctx: SnedContext, user: arc.Option[hikari.Member, arc.MemberParams("The user to retrieve the journal for.")] ) -> None: assert ctx.guild_id is not None journal = await JournalEntry.fetch_journal(user, ctx.guild_id) if journal: - navigator = models.AuthorOnlyNavigator(ctx, pages=helpers.build_journal_pages(journal)) # type: ignore + navigator = models.AuthorOnlyNavigator(ctx.author, pages=helpers.build_journal_pages(journal)) # type: ignore ephemeral = bool((await ctx.client.mod.get_settings(ctx.guild_id)).flags & ModerationFlags.IS_EPHEMERAL) - await navigator.send(ctx.interaction, ephemeral=ephemeral) + await ctx.respond_with_builder(await navigator.build_response_async(ctx.client.miru, ephemeral=ephemeral)) + ctx.client.miru.start_view(navigator) else: - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="📒 Journal entries for this user:", - description=f"There are no journal entries for this user yet. Any moderation-actions will leave an entry here, or you can set one manually with `/journal add {ctx.options.user}`", + description=f"There are no journal entries for this user yet. Any moderation-actions will leave an entry here, or you can set one manually with `/journal add {user}`", color=const.EMBED_BLUE, - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -365,12 +365,13 @@ async def journal_add( created_at=helpers.utcnow(), ).update() - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="✅ Journal entry added!", - description=f"Added a new journal entry to user **{user}**. You can view this user's journal via the command `/journal get {ctx.options.user}`.", + description=f"Added a new journal entry to user **{user}**. You can view this user's journal via the command `/journal get {user}`.", color=const.EMBED_GREEN, - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -383,20 +384,19 @@ async def journal_add( ) async def warn_cmd( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to be warned.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to be warned.")], reason: arc.Option[str | None, arc.StrParams("The reason for this warn")] = None, ) -> None: - if not helpers.is_member(user): - return - assert ctx.member is not None + assert ctx.member is not None and ctx.guild_id is not None embed = await ctx.client.mod.warn(user, ctx.member, reason=reason) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=miru.View().add_item( miru.Button( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -406,9 +406,8 @@ async def warn_cmd( @warns.include @arc.slash_subcommand("list", "List the current warning count for a user.") async def warns_list( - ctx: SnedContext, user: arc.Option[hikari.User, arc.UserParams("The user to show the warning count for.")] + ctx: SnedContext, user: arc.Option[hikari.Member, arc.MemberParams("The user to show the warning count for.")] ) -> None: - helpers.is_member(user) assert ctx.guild_id is not None db_user = await DatabaseUser.fetch(user.id, ctx.guild_id) @@ -419,7 +418,7 @@ async def warns_list( color=const.WARN_COLOR, ) embed.set_thumbnail(user.display_avatar_url) - await ctx.mod_respond(embed=embed) + await ctx.respond(embed=embed, flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id))) @warns.include @@ -427,21 +426,19 @@ async def warns_list( @arc.slash_subcommand("clear", "Clear warnings for the specified user.") async def warns_clear( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to clear warnings for.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to clear warnings for.")], reason: arc.Option[str | None, arc.StrParams("The reason for clearing this user's warns.")] = None, ) -> None: - if not helpers.is_member(user): - return - assert ctx.guild_id is not None and ctx.member is not None embed = await ctx.client.mod.clear_warns(user, ctx.member, reason=reason) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=miru.View().add_item( miru.Button( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -450,22 +447,20 @@ async def warns_clear( @arc.slash_subcommand("remove", "Remove a single warning from the specified user.") async def warns_remove( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to remove a warning from.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to remove a warning from.")], reason: arc.Option[str | None, arc.StrParams("The reason for removing this user's warn.")] = None, ) -> None: - if not helpers.is_member(user): - return - assert ctx.guild_id is not None and ctx.member is not None embed = await ctx.client.mod.remove_warn(user, ctx.member, reason=reason) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=miru.View().add_item( miru.Button( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -480,17 +475,15 @@ async def warns_remove( ) async def timeout_cmd( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to time out.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to time out.")], duration: arc.Option[ str, arc.StrParams("The duration to time the user out for. Example: '10 minutes', '2022-03-01', 'tomorrow 20:00'"), ], reason: arc.Option[str | None, arc.StrParams("The reason for timing out this user.")] = None, ) -> None: - if not helpers.is_member(user): - return + assert ctx.member is not None and ctx.guild_id is not None reason = helpers.format_reason(reason, max_length=1024) - assert ctx.member is not None if user.communication_disabled_until() is not None: await ctx.respond( @@ -519,13 +512,14 @@ async def timeout_cmd( embed = await ctx.client.mod.timeout(user, ctx.member, communication_disabled_until, reason) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=miru.View().add_item( miru.Button( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -541,14 +535,12 @@ async def timeout_cmd( @arc.slash_subcommand("remove", "Remove timeout from a user.") async def timeouts_remove_cmd( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to time out.")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to time out.")], reason: arc.Option[str | None, arc.StrParams("The reason for timing out this user.")] = None, ) -> None: - if not helpers.is_member(user): - return reason = helpers.format_reason(reason, max_length=1024) - assert ctx.member is not None + assert ctx.member is not None and ctx.guild_id is not None if user.communication_disabled_until() is None: await ctx.respond( @@ -563,7 +555,7 @@ async def timeouts_remove_cmd( await ctx.client.mod.remove_timeout(user, ctx.member, reason) - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="🔉 " + "Timeout removed", description=f"**{user}**'s timeout was removed.\n**Reason:** ```{reason}```", @@ -574,6 +566,7 @@ async def timeouts_remove_cmd( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -604,7 +597,7 @@ async def ban_cmd( ), ] = None, ) -> None: - assert ctx.member is not None + assert ctx.member is not None and ctx.guild_id is not None if duration: try: @@ -629,7 +622,7 @@ async def ban_cmd( days_to_delete=days_to_delete or 0, reason=reason, ) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=( miru.View() @@ -646,6 +639,7 @@ async def ban_cmd( ) ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -660,7 +654,7 @@ async def ban_cmd( ) async def softban_cmd( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to softban")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to softban")], days_to_delete: arc.Option[ int, arc.IntParams( @@ -670,8 +664,7 @@ async def softban_cmd( ], reason: arc.Option[str | None, arc.StrParams("The reason why this softban was performed")] = None, ) -> None: - helpers.is_member(user) - assert ctx.member is not None + assert ctx.member is not None and ctx.guild_id is not None await ctx.respond(hikari.ResponseType.DEFERRED_MESSAGE_CREATE) embed = await ctx.client.mod.ban( @@ -681,7 +674,7 @@ async def softban_cmd( days_to_delete=days_to_delete, reason=reason, ) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=( miru.View().add_item( @@ -692,6 +685,7 @@ async def softban_cmd( ) ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -707,16 +701,17 @@ async def unban_cmd( user: arc.Option[hikari.User, arc.UserParams("The user to unban")], reason: arc.Option[str | None, arc.StrParams("The reason for performing this unban")] = None, ) -> None: - assert ctx.member is not None + assert ctx.member is not None and ctx.guild_id is not None embed = await ctx.client.mod.unban(user, ctx.member, reason=reason) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=miru.View().add_item( miru.Button( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -727,22 +722,20 @@ async def unban_cmd( @arc.slash_command("kick", "Kick a user from this server.", default_permissions=hikari.Permissions.KICK_MEMBERS) async def kick_cmd( ctx: SnedContext, - user: arc.Option[hikari.User, arc.UserParams("The user to kick")], + user: arc.Option[hikari.Member, arc.MemberParams("The user to kick")], reason: arc.Option[str | None, arc.StrParams("The reason for performing this kick")] = None, ) -> None: - if not helpers.is_member(user): - return - - assert ctx.member is not None + assert ctx.member is not None and ctx.guild_id is not None embed = await ctx.client.mod.kick(user, ctx.member, reason=reason) - await ctx.mod_respond( + await ctx.respond( embed=embed, components=miru.View().add_item( miru.Button( label="View Journal", custom_id=f"JOURNAL:{user.id}:{ctx.member.id}", style=hikari.ButtonStyle.SECONDARY ) ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -759,13 +752,16 @@ async def slowmode_mcd( int, arc.IntParams("The slowmode interval in seconds, use 0 to disable it.", min=0, max=21600) ], ) -> None: + assert ctx.guild_id is not None + await ctx.client.rest.edit_channel(ctx.channel_id, rate_limit_per_user=interval) - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="✅ Slowmode updated", description=f"{const.EMOJI_SLOWMODE} Slowmode is now set to 1 message per `{interval}` seconds.", color=const.EMBED_GREEN, - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) @@ -783,10 +779,10 @@ async def slowmode_mcd( async def massban( ctx: SnedContext, joined_after: arc.Option[ - hikari.User | None, arc.UserParams("Only match users that joined after this user.", name="joined-after") + hikari.Member | None, arc.MemberParams("Only match users that joined after this user.", name="joined-after") ] = None, joined_before: arc.Option[ - hikari.User | None, arc.UserParams("Only match users that joined before this user.", name="joined-before") + hikari.Member | None, arc.MemberParams("Only match users that joined before this user.", name="joined-before") ] = None, joined: arc.Option[ int | None, arc.IntParams("Only match users that joined this server x minutes before.", min=1) @@ -811,11 +807,6 @@ async def massban( ), ] = None, ) -> None: - if joined_before: - helpers.is_member(joined_before) - if joined_after: - helpers.is_member(joined_after) - guild = ctx.get_guild() assert guild is not None @@ -851,37 +842,44 @@ async def massban( now = helpers.utcnow() - if created: # why ruff add blank line below :( + if created: - def created_(member: hikari.User, offset=now - datetime.timedelta(minutes=created)) -> bool: + def created_pred(member: hikari.User, offset=now - datetime.timedelta(minutes=created)) -> bool: return member.created_at > offset - predicates.append(created_) + predicates.append(created_pred) if joined: - def joined_(member: hikari.User, offset=now - datetime.timedelta(minutes=joined)) -> bool: + def joined_pred(member: hikari.User, offset=now - datetime.timedelta(minutes=joined)) -> bool: if not isinstance(member, hikari.Member): return True else: - return member.joined_at and member.joined_at > offset + return member.joined_at is not None and member.joined_at > offset - predicates.append(joined_) + predicates.append(joined_pred) - # TODO: these functions are gonna have to be renamed as they overwrite the function params - if joined_after and helpers.is_member(joined_after): + if joined_after: - def joined_after_(member: hikari.Member, joined_after=joined_after) -> bool: - return member.joined_at and joined_after.joined_at and member.joined_at > joined_after.joined_at + def joined_after_pred(member: hikari.Member, joined_after=joined_after) -> bool: + return ( + member.joined_at is not None + and joined_after.joined_at is not None + and member.joined_at > joined_after.joined_at + ) - predicates.append(joined_after_) + predicates.append(joined_after_pred) - if joined_before and helpers.is_member(joined_before): + if joined_before: - def joined_before_(member: hikari.Member, joined_before=joined_before) -> bool: - return member.joined_at and joined_before.joined_at and member.joined_at < joined_before.joined_at + def joined_before_pred(member: hikari.Member, joined_before=joined_before) -> bool: + return ( + member.joined_at is not None + and joined_before.joined_at is not None + and member.joined_at < joined_before.joined_at + ) - predicates.append(joined_before_) + predicates.append(joined_before_pred) if len(predicates) == 3: await ctx.respond( @@ -921,7 +919,10 @@ def joined_before_(member: hikari.Member, joined_before=joined_before) -> bool: file = hikari.Bytes(content.encode("utf-8"), "members_to_ban.txt") if show is True: - await ctx.mod_respond(attachment=file) + await ctx.respond( + attachment=file, + flags=(await ctx.client.mod.get_msg_flags(guild.id)), + ) return reason = reason or "No reason provided." @@ -956,10 +957,7 @@ def joined_before_(member: hikari.Member, joined_before=joined_before) -> bool: if not confirmed: return - # FIXME: This abomination needs to be utterly destroyed - userlog = ctx.client.get_plugin("Logging") - if userlog: - await userlog.d.actions.freeze_logging(guild.id) + await ctx.client.userlogger.freeze_logging(guild.id) count = 0 @@ -978,16 +976,16 @@ def joined_before_(member: hikari.Member, joined_before=joined_before) -> bool: MassBanEvent(ctx.client.app, ctx.guild_id, ctx.member, len(to_ban), count, file, reason) ) - await ctx.mod_respond( + await ctx.respond( embed=hikari.Embed( title="✅ Massban finished", description=f"Banned **{count}/{len(to_ban)}** users.", color=const.EMBED_GREEN, - ) + ), + flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id)), ) - if userlog: - await userlog.d.actions.unfreeze_logging(ctx.guild_id) + await ctx.client.userlogger.unfreeze_logging(ctx.guild_id) @arc.loader diff --git a/src/extensions/settings.py b/src/extensions/settings.py index 7875dec..54d44e0 100644 --- a/src/extensions/settings.py +++ b/src/extensions/settings.py @@ -510,10 +510,7 @@ async def settings_logging(self) -> None: """Show and handle Logging menu.""" assert self.last_context is not None and self.last_context.guild_id is not None - userlog = self.arc_client.get_plugin("Logging") - assert userlog is not None - - log_channels = await userlog.d.actions.get_log_channel_ids_view(self.last_context.guild_id) + log_channels = await self.arc_client.userlogger.get_log_channel_ids_view(self.last_context.guild_id) embed = hikari.Embed( title="Logging Settings", @@ -541,7 +538,7 @@ async def settings_logging(self) -> None: options.append(miru.SelectOption(label=log_event_strings[log_category], value=log_category)) self.select_screen(OptionsTextSelect(options=options, placeholder="Select a category..."), parent="Main") - is_color = await userlog.d.actions.is_color_enabled(self.last_context.guild_id) + is_color = await self.arc_client.userlogger.is_color_enabled(self.last_context.guild_id) self.add_item(BooleanButton(state=is_color, label="Color logs")) await self.last_context.edit_response(embed=embed, components=self, flags=self.flags) @@ -593,9 +590,7 @@ async def settings_logging(self) -> None: return channel = self.value.channels[0] if self.value.channels else None - userlog = self.arc_client.get_plugin("Logging") - assert userlog is not None - await userlog.d.actions.set_log_channel( + await self.arc_client.userlogger.set_log_channel( LogEvent(log_event), self.last_context.guild_id, channel.id if channel else None ) @@ -605,11 +600,7 @@ async def settings_automod(self) -> None: """Open and handle automoderation main menu.""" assert self.last_context is not None and self.last_context.guild_id is not None - automod = self.arc_client.get_plugin("Auto-Moderation") - - assert automod is not None - - policies = await automod.d.actions.get_policies(self.last_context.guild_id) + policies = await self.arc_client.mod.get_automod_policies(self.last_context.guild_id) embed = hikari.Embed( title="Automoderation Settings", description="Below you can see a summary of the current automoderation settings. To see more details about a specific entry or change their settings, select it below!", @@ -641,11 +632,7 @@ async def settings_automod_policy(self, policy: str | None = None) -> None: if not policy: return await self.settings_automod() - automod = self.arc_client.get_plugin("Auto-Moderation") - - assert automod is not None - - policies: dict[str, t.Any] = await automod.d.actions.get_policies(self.last_context.guild_id) + policies: dict[str, t.Any] = await self.arc_client.mod.get_automod_policies(self.last_context.guild_id) policy_data = policies[policy] embed = hikari.Embed( title=f"Options for: {policy_strings[policy]['name']}", @@ -671,9 +658,7 @@ async def settings_automod_policy(self, policy: str | None = None) -> None: ) elif state in ["flag", "notice"]: - userlog = self.arc_client.get_plugin("Logging") - assert userlog is not None - channel_id = await userlog.d.actions.get_log_channel_id(LogEvent.FLAGS, self.last_context.guild_id) + channel_id = await self.arc_client.userlogger.get_log_channel_id(LogEvent.FLAGS, self.last_context.guild_id) if not channel_id: embed.add_field( name="⚠️ Warning:", diff --git a/src/extensions/tags.py b/src/extensions/tags.py index 8f51cee..6f728d0 100644 --- a/src/extensions/tags.py +++ b/src/extensions/tags.py @@ -66,14 +66,14 @@ async def callback(self, ctx: miru.ModalContext) -> None: self.tag_content = ctx.get_value_by_id("content") -async def tag_name_ac(data: arc.AutocompleteData) -> list[str]: +async def tag_name_ac(data: arc.AutocompleteData[SnedClient, str]) -> list[str]: """Autocomplete for tag names.""" if data.focused_value and data.guild_id: return (await Tag.fetch_closest_names(str(data.focused_value), data.guild_id)) or [] return [] -async def tag_owned_name_ac(data: arc.AutocompleteData) -> list[str]: +async def tag_owned_name_ac(data: arc.AutocompleteData[SnedClient, str]) -> list[str]: """Autocomplete for tag names that the user owns.""" if data.focused_value and data.guild_id: return (await Tag.fetch_closest_owned_names(str(data.focused_value), data.guild_id, data.user.id)) or [] diff --git a/src/extensions/troubleshooter.py b/src/extensions/troubleshooter.py index 86a95a1..9260b7a 100644 --- a/src/extensions/troubleshooter.py +++ b/src/extensions/troubleshooter.py @@ -73,9 +73,10 @@ is_dm_enabled=False, ) async def troubleshoot(ctx: SnedContext) -> None: - assert ctx.interaction.app_permissions is not None + assert ctx.app_permissions is not None + assert ctx.guild_id is not None - missing_perms = ~ctx.interaction.app_permissions & REQUIRED_PERMISSIONS + missing_perms = ~ctx.app_permissions & REQUIRED_PERMISSIONS content_list = [] if missing_perms is not hikari.Permissions.NONE: @@ -98,7 +99,7 @@ async def troubleshoot(ctx: SnedContext) -> None: color=const.ERROR_COLOR, ) - await ctx.mod_respond(embed=embed) + await ctx.respond(embed=embed, flags=(await ctx.client.mod.get_msg_flags(ctx.guild_id))) @arc.loader diff --git a/src/models/mod_actions.py b/src/models/mod_actions.py index 82a0787..64c457d 100644 --- a/src/models/mod_actions.py +++ b/src/models/mod_actions.py @@ -2,6 +2,7 @@ import datetime import enum +import json import logging import typing as t from contextlib import suppress @@ -14,6 +15,7 @@ from miru.abc import ViewItem from src.etc import const +from src.etc.settings_static import default_automod_policies from src.models.db_user import DatabaseUser, DatabaseUserFlag from src.models.errors import DMFailedError, RoleHierarchyError from src.models.events import TimerCompleteEvent, WarnCreateEvent, WarnRemoveEvent, WarnsClearEvent @@ -96,6 +98,67 @@ async def get_settings(self, guild: hikari.SnowflakeishOr[hikari.PartialGuild]) return ModerationSettings() + async def get_msg_flags(self, guild: hikari.SnowflakeishOr[hikari.PartialGuild]) -> hikari.MessageFlag: + """Get the message flags for a guild. + + Parameters + ---------- + guild : hikari.SnowflakeishOr[hikari.PartialGuild] + The guild to get message flags for. + + Returns + ------- + hikari.MessageFlag + The guild's message flags. + """ + return ( + hikari.MessageFlag.EPHEMERAL + if (await self.is_ephemeral(hikari.Snowflake(guild))) + else hikari.MessageFlag.NONE + ) + + async def is_ephemeral(self, guild: hikari.SnowflakeishOr[hikari.PartialGuild]) -> bool: + """Check if responses to moderation actions should be done ephemerally.""" + return bool((await self.get_settings(hikari.Snowflake(guild))).flags & ModerationFlags.IS_EPHEMERAL) + + # TODO: Purge this cursed abomination + async def get_automod_policies(self, guild: hikari.SnowflakeishOr[hikari.Guild]) -> dict[str, t.Any]: + """Return auto-moderation policies for the specified guild. + + Parameters + ---------- + guild : hikari.SnowflakeishOr[hikari.Guild] + The guild to get policies for. + + Returns + ------- + dict[str, t.Any] + The guild's auto-moderation policies. + """ + guild_id = hikari.Snowflake(guild) + + records = await self._client.db_cache.get(table="mod_config", guild_id=guild_id) + + policies = json.loads(records[0]["automod_policies"]) if records else default_automod_policies + + for key in default_automod_policies: + if key not in policies: + policies[key] = default_automod_policies[key] + + for nested_key in default_automod_policies[key]: + if nested_key not in policies[key]: + policies[key][nested_key] = default_automod_policies[key][nested_key] + + invalid = [] + for key in policies: + if key not in default_automod_policies: + invalid.append(key) + + for key in invalid: + policies.pop(key) + + return policies + async def pre_mod_actions( self, guild: hikari.SnowflakeishOr[hikari.Guild], diff --git a/src/utils/dictionaryapi.py b/src/utils/dictionaryapi.py index ad23ae5..f8932dc 100644 --- a/src/utils/dictionaryapi.py +++ b/src/utils/dictionaryapi.py @@ -110,7 +110,7 @@ class DictionaryEntry: @classmethod def from_dict(cls, data: dict[str, t.Any]) -> DictionaryEntry: - et = data.get("et", None) + et = data.get("et") try: if et and et[0][0] == "text": et = re.sub(r"[{]\S+[}]", "", et[0][1]) @@ -121,10 +121,10 @@ def from_dict(cls, data: dict[str, t.Any]) -> DictionaryEntry: id=data["meta"]["id"], word=data["meta"]["id"].split(":")[0], definitions=data["shortdef"], - functional_label=data.get("fl", None), + functional_label=data.get("fl"), offensive=data["meta"].get("offensive") or False, etymology=et, - date=data.get("date", None), + date=data.get("date"), )