From 8ed588a91e8d1b8e52790e0ae00fb34d25bc60aa Mon Sep 17 00:00:00 2001 From: Chris Lovering Date: Thu, 6 Jun 2024 20:18:22 +0100 Subject: [PATCH 1/5] Rename utils to syncer_utils for clarity --- .../{_utils.py => _syncer_utils.py} | 3 +++ .../exts/event_listeners/guild_listeners.py | 16 ++++++++-------- .../exts/event_listeners/message_listeners.py | 4 ++-- 3 files changed, 13 insertions(+), 10 deletions(-) rename metricity/exts/event_listeners/{_utils.py => _syncer_utils.py} (94%) diff --git a/metricity/exts/event_listeners/_utils.py b/metricity/exts/event_listeners/_syncer_utils.py similarity index 94% rename from metricity/exts/event_listeners/_utils.py rename to metricity/exts/event_listeners/_syncer_utils.py index 4006ea2..8e8c54f 100644 --- a/metricity/exts/event_listeners/_utils.py +++ b/metricity/exts/event_listeners/_syncer_utils.py @@ -1,8 +1,11 @@ import discord +from pydis_core.utils import logging from sqlalchemy.ext.asyncio import AsyncSession from metricity import models +log = logging.get_logger(__name__) + def insert_thread(thread: discord.Thread, sess: AsyncSession) -> None: """Insert the given thread to the database session.""" diff --git a/metricity/exts/event_listeners/guild_listeners.py b/metricity/exts/event_listeners/guild_listeners.py index 79cd8f4..3b1053b 100644 --- a/metricity/exts/event_listeners/guild_listeners.py +++ b/metricity/exts/event_listeners/guild_listeners.py @@ -12,7 +12,7 @@ from metricity.bot import Bot from metricity.config import BotConfig from metricity.database import async_session -from metricity.exts.event_listeners import _utils +from metricity.exts.event_listeners import _syncer_utils log = logging.get_logger(__name__) @@ -29,10 +29,10 @@ async def sync_guild(self) -> None: await self.bot.wait_until_guild_available() guild = self.bot.get_guild(self.bot.guild_id) - await self.sync_channels(guild) + await _syncer_utils.sync_channels(self.bot, guild) log.info("Beginning thread archive state synchronisation process") - await self.sync_thread_archive_state(guild) + await _syncer_utils.sync_thread_archive_state(guild) log.info("Beginning user synchronisation process") async with async_session() as sess: @@ -210,7 +210,7 @@ async def on_guild_channel_create(self, channel: discord.abc.GuildChannel) -> No if channel.guild.id != BotConfig.guild_id: return - await self.sync_channels(channel.guild) + await _syncer_utils.sync_channels(self.bot, channel.guild) @commands.Cog.listener() async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None: @@ -218,7 +218,7 @@ async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> No if channel.guild.id != BotConfig.guild_id: return - await self.sync_channels(channel.guild) + await _syncer_utils.sync_channels(self.bot, channel.guild) @commands.Cog.listener() async def on_guild_channel_update( @@ -230,7 +230,7 @@ async def on_guild_channel_update( if channel.guild.id != BotConfig.guild_id: return - await self.sync_channels(channel.guild) + await _syncer_utils.sync_channels(self.bot, channel.guild) @commands.Cog.listener() async def on_thread_create(self, thread: discord.Thread) -> None: @@ -238,7 +238,7 @@ async def on_thread_create(self, thread: discord.Thread) -> None: if thread.guild.id != BotConfig.guild_id: return - await self.sync_channels(thread.guild) + await _syncer_utils.sync_channels(self.bot, thread.guild) @commands.Cog.listener() async def on_thread_update(self, _before: discord.Thread, thread: discord.Thread) -> None: @@ -246,7 +246,7 @@ async def on_thread_update(self, _before: discord.Thread, thread: discord.Thread if thread.guild.id != BotConfig.guild_id: return - await self.sync_channels(thread.guild) + await _syncer_utils.sync_channels(self.bot, thread.guild) @commands.Cog.listener() async def on_guild_available(self, guild: discord.Guild) -> None: diff --git a/metricity/exts/event_listeners/message_listeners.py b/metricity/exts/event_listeners/message_listeners.py index a71e53f..917b13c 100644 --- a/metricity/exts/event_listeners/message_listeners.py +++ b/metricity/exts/event_listeners/message_listeners.py @@ -7,7 +7,7 @@ from metricity.bot import Bot from metricity.config import BotConfig from metricity.database import async_session -from metricity.exts.event_listeners import _utils +from metricity.exts.event_listeners import _syncer_utils from metricity.models import Message, User @@ -44,7 +44,7 @@ async def on_message(self, message: discord.Message) -> None: return from_thread = isinstance(message.channel, discord.Thread) - await _utils.sync_message(message, sess, from_thread=from_thread) + await _syncer_utils.sync_message(message, sess, from_thread=from_thread) await sess.commit() From 312a7bc2954f3c2a7c99295841121219d0c655ce Mon Sep 17 00:00:00 2001 From: Chris Lovering Date: Thu, 6 Jun 2024 20:21:10 +0100 Subject: [PATCH 2/5] Move channel & thread syncer to sync utils file --- .../exts/event_listeners/_syncer_utils.py | 111 ++++++++++++++++++ .../exts/event_listeners/guild_listeners.py | 106 ----------------- 2 files changed, 111 insertions(+), 106 deletions(-) diff --git a/metricity/exts/event_listeners/_syncer_utils.py b/metricity/exts/event_listeners/_syncer_utils.py index 8e8c54f..258a165 100644 --- a/metricity/exts/event_listeners/_syncer_utils.py +++ b/metricity/exts/event_listeners/_syncer_utils.py @@ -1,8 +1,12 @@ import discord from pydis_core.utils import logging +from sqlalchemy import update from sqlalchemy.ext.asyncio import AsyncSession from metricity import models +from metricity.bot import Bot +from metricity.config import BotConfig +from metricity.database import async_session log = logging.get_logger(__name__) @@ -39,3 +43,110 @@ async def sync_message(message: discord.Message, sess: AsyncSession, *, from_thr args["thread_id"] = str(thread.id) sess.add(models.Message(**args)) + + +async def sync_channels(bot: Bot, guild: discord.Guild) -> None: + """Sync channels and categories with the database.""" + bot.channel_sync_in_progress.clear() + + log.info("Beginning category synchronisation process") + + async with async_session() as sess: + for channel in guild.channels: + if isinstance(channel, discord.CategoryChannel): + if existing_cat := await sess.get(models.Category, str(channel.id)): + existing_cat.name = channel.name + else: + sess.add(models.Category(id=str(channel.id), name=channel.name, deleted=False)) + + await sess.commit() + + log.info("Category synchronisation process complete, synchronising deleted categories") + + async with async_session() as sess: + await sess.execute( + update(models.Category) + .where(~models.Category.id.in_( + [str(channel.id) for channel in guild.channels if isinstance(channel, discord.CategoryChannel)], + )) + .values(deleted=True), + ) + await sess.commit() + + log.info("Deleted category synchronisation process complete, synchronising channels") + + async with async_session() as sess: + for channel in guild.channels: + if channel.category and channel.category.id in BotConfig.ignore_categories: + continue + + if not isinstance(channel, discord.CategoryChannel): + category_id = str(channel.category.id) if channel.category else None + # Cast to bool so is_staff is False if channel.category is None + is_staff = channel.id in BotConfig.staff_channels or bool( + channel.category and channel.category.id in BotConfig.staff_categories, + ) + if db_chan := await sess.get(models.Channel, str(channel.id)): + db_chan.name = channel.name + else: + sess.add(models.Channel( + id=str(channel.id), + name=channel.name, + category_id=category_id, + is_staff=is_staff, + deleted=False, + )) + + await sess.commit() + + log.info("Channel synchronisation process complete, synchronising deleted channels") + + async with async_session() as sess: + await sess.execute( + update(models.Channel) + .where(~models.Channel.id.in_([str(channel.id) for channel in guild.channels])) + .values(deleted=True), + ) + await sess.commit() + + log.info("Deleted channel synchronisation process complete, synchronising threads") + + async with async_session() as sess: + for thread in guild.threads: + if thread.parent and thread.parent.category: + if thread.parent.category.id in BotConfig.ignore_categories: + continue + else: + # This is a forum channel, not currently supported by Discord.py. Ignore it. + continue + + if db_thread := await sess.get(models.Thread, str(thread.id)): + db_thread.name = thread.name + db_thread.archived = thread.archived + db_thread.auto_archive_duration = thread.auto_archive_duration + db_thread.locked = thread.locked + db_thread.type = thread.type.name + else: + insert_thread(thread, sess) + await sess.commit() + + log.info("Thread synchronisation process complete, finished synchronising guild.") + bot.channel_sync_in_progress.set() + + +async def sync_thread_archive_state(guild: discord.Guild) -> None: + """Sync the archive state of all threads in the database with the state in guild.""" + active_thread_ids = [str(thread.id) for thread in guild.threads] + + async with async_session() as sess: + await sess.execute( + update(models.Thread) + .where(models.Thread.id.in_(active_thread_ids)) + .values(archived=False), + ) + await sess.execute( + update(models.Thread) + .where(~models.Thread.id.in_(active_thread_ids)) + .values(archived=True), + ) + await sess.commit() diff --git a/metricity/exts/event_listeners/guild_listeners.py b/metricity/exts/event_listeners/guild_listeners.py index 3b1053b..a60c291 100644 --- a/metricity/exts/event_listeners/guild_listeners.py +++ b/metricity/exts/event_listeners/guild_listeners.py @@ -98,112 +98,6 @@ async def sync_guild(self) -> None: self.bot.sync_process_complete.set() - @staticmethod - async def sync_thread_archive_state(guild: discord.Guild) -> None: - """Sync the archive state of all threads in the database with the state in guild.""" - active_thread_ids = [str(thread.id) for thread in guild.threads] - - async with async_session() as sess: - await sess.execute( - update(models.Thread) - .where(models.Thread.id.in_(active_thread_ids)) - .values(archived=False), - ) - await sess.execute( - update(models.Thread) - .where(~models.Thread.id.in_(active_thread_ids)) - .values(archived=True), - ) - await sess.commit() - - async def sync_channels(self, guild: discord.Guild) -> None: - """Sync channels and categories with the database.""" - self.bot.channel_sync_in_progress.clear() - - log.info("Beginning category synchronisation process") - - async with async_session() as sess: - for channel in guild.channels: - if isinstance(channel, discord.CategoryChannel): - if existing_cat := await sess.get(models.Category, str(channel.id)): - existing_cat.name = channel.name - else: - sess.add(models.Category(id=str(channel.id), name=channel.name, deleted=False)) - - await sess.commit() - - log.info("Category synchronisation process complete, synchronising deleted categories") - - async with async_session() as sess: - await sess.execute( - update(models.Category) - .where(~models.Category.id.in_( - [str(channel.id) for channel in guild.channels if isinstance(channel, discord.CategoryChannel)], - )) - .values(deleted=True), - ) - await sess.commit() - - log.info("Deleted category synchronisation process complete, synchronising channels") - - async with async_session() as sess: - for channel in guild.channels: - if channel.category and channel.category.id in BotConfig.ignore_categories: - continue - - if not isinstance(channel, discord.CategoryChannel): - category_id = str(channel.category.id) if channel.category else None - # Cast to bool so is_staff is False if channel.category is None - is_staff = channel.id in BotConfig.staff_channels or bool( - channel.category and channel.category.id in BotConfig.staff_categories, - ) - if db_chan := await sess.get(models.Channel, str(channel.id)): - db_chan.name = channel.name - else: - sess.add(models.Channel( - id=str(channel.id), - name=channel.name, - category_id=category_id, - is_staff=is_staff, - deleted=False, - )) - - await sess.commit() - - log.info("Channel synchronisation process complete, synchronising deleted channels") - - async with async_session() as sess: - await sess.execute( - update(models.Channel) - .where(~models.Channel.id.in_([str(channel.id) for channel in guild.channels])) - .values(deleted=True), - ) - await sess.commit() - - log.info("Deleted channel synchronisation process complete, synchronising threads") - - async with async_session() as sess: - for thread in guild.threads: - if thread.parent and thread.parent.category: - if thread.parent.category.id in BotConfig.ignore_categories: - continue - else: - # This is a forum channel, not currently supported by Discord.py. Ignore it. - continue - - if db_thread := await sess.get(models.Thread, str(thread.id)): - db_thread.name = thread.name - db_thread.archived = thread.archived - db_thread.auto_archive_duration = thread.auto_archive_duration - db_thread.locked = thread.locked - db_thread.type = thread.type.name - else: - _utils.insert_thread(thread, sess) - await sess.commit() - - log.info("Thread synchronisation process complete, finished synchronising guild.") - self.bot.channel_sync_in_progress.set() - @commands.Cog.listener() async def on_guild_channel_create(self, channel: discord.abc.GuildChannel) -> None: """Sync the channels when one is created.""" From 4568d3fde613a6a4968d8daf3a223879df2612b5 Mon Sep 17 00:00:00 2001 From: Chris Lovering Date: Thu, 6 Jun 2024 20:21:23 +0100 Subject: [PATCH 3/5] Add cog that syncs the guild on startup --- .../exts/event_listeners/guild_listeners.py | 94 +------------- .../exts/event_listeners/startup_sync.py | 115 ++++++++++++++++++ 2 files changed, 116 insertions(+), 93 deletions(-) create mode 100644 metricity/exts/event_listeners/startup_sync.py diff --git a/metricity/exts/event_listeners/guild_listeners.py b/metricity/exts/event_listeners/guild_listeners.py index a60c291..db976b2 100644 --- a/metricity/exts/event_listeners/guild_listeners.py +++ b/metricity/exts/event_listeners/guild_listeners.py @@ -1,17 +1,11 @@ """An ext to listen for guild (and guild channel) events and syncs them to the database.""" -import math - import discord from discord.ext import commands -from pydis_core.utils import logging, scheduling -from sqlalchemy import column, update -from sqlalchemy.dialects.postgresql import insert +from pydis_core.utils import logging -from metricity import models from metricity.bot import Bot from metricity.config import BotConfig -from metricity.database import async_session from metricity.exts.event_listeners import _syncer_utils log = logging.get_logger(__name__) @@ -22,81 +16,6 @@ class GuildListeners(commands.Cog): def __init__(self, bot: Bot) -> None: self.bot = bot - scheduling.create_task(self.sync_guild()) - - async def sync_guild(self) -> None: - """Sync all channels and members in the guild.""" - await self.bot.wait_until_guild_available() - - guild = self.bot.get_guild(self.bot.guild_id) - await _syncer_utils.sync_channels(self.bot, guild) - - log.info("Beginning thread archive state synchronisation process") - await _syncer_utils.sync_thread_archive_state(guild) - - log.info("Beginning user synchronisation process") - async with async_session() as sess: - await sess.execute(update(models.User).values(in_guild=False)) - await sess.commit() - - users = [ - { - "id": str(user.id), - "name": user.name, - "avatar_hash": getattr(user.avatar, "key", None), - "guild_avatar_hash": getattr(user.guild_avatar, "key", None), - "joined_at": user.joined_at, - "created_at": user.created_at, - "is_staff": BotConfig.staff_role_id in [role.id for role in user.roles], - "bot": user.bot, - "in_guild": True, - "public_flags": dict(user.public_flags), - "pending": user.pending, - } - for user in guild.members - ] - - user_chunks = discord.utils.as_chunks(users, 500) - created = 0 - updated = 0 - total_users = len(users) - - log.info("Performing bulk upsert of %d rows in %d chunks", total_users, math.ceil(total_users / 500)) - - async with async_session() as sess: - for chunk in user_chunks: - qs = insert(models.User).returning(column("xmax")).values(chunk) - - update_cols = [ - "name", - "avatar_hash", - "guild_avatar_hash", - "joined_at", - "is_staff", - "bot", - "in_guild", - "public_flags", - "pending", - ] - - res = await sess.execute(qs.on_conflict_do_update( - index_elements=[models.User.id], - set_={k: getattr(qs.excluded, k) for k in update_cols}, - )) - - objs = list(res) - - created += [obj[0] == 0 for obj in objs].count(True) - updated += [obj[0] != 0 for obj in objs].count(True) - - log.info("User upsert: inserted %d rows, updated %d rows, done %d rows, %d rows remaining", - created, updated, created + updated, total_users - (created + updated)) - - await sess.commit() - - log.info("User upsert complete") - - self.bot.sync_process_complete.set() @commands.Cog.listener() async def on_guild_channel_create(self, channel: discord.abc.GuildChannel) -> None: @@ -142,17 +61,6 @@ async def on_thread_update(self, _before: discord.Thread, thread: discord.Thread await _syncer_utils.sync_channels(self.bot, thread.guild) - @commands.Cog.listener() - async def on_guild_available(self, guild: discord.Guild) -> None: - """Synchronize the user table with the Discord users.""" - log.info("Received guild available for %d", guild.id) - - if guild.id != BotConfig.guild_id: - log.info("Guild was not the configured guild, discarding event") - return - - await self.sync_guild() - async def setup(bot: Bot) -> None: """Load the GuildListeners cog.""" diff --git a/metricity/exts/event_listeners/startup_sync.py b/metricity/exts/event_listeners/startup_sync.py new file mode 100644 index 0000000..7ff5026 --- /dev/null +++ b/metricity/exts/event_listeners/startup_sync.py @@ -0,0 +1,115 @@ +"""An ext to sync the guild when the bot starts up.""" + +import math + +import discord +from discord.ext import commands +from pydis_core.utils import logging, scheduling +from sqlalchemy import column, update +from sqlalchemy.dialects.postgresql import insert + +from metricity import models +from metricity.bot import Bot +from metricity.config import BotConfig +from metricity.database import async_session +from metricity.exts.event_listeners import _syncer_utils + +log = logging.get_logger(__name__) + + +class StartupSyncer(commands.Cog): + """Sync the guild on bot startup.""" + + def __init__(self, bot: Bot) -> None: + self.bot = bot + scheduling.create_task(self.sync_guild()) + + async def sync_guild(self) -> None: + """Sync all channels and members in the guild.""" + await self.bot.wait_until_guild_available() + + guild = self.bot.get_guild(self.bot.guild_id) + await _syncer_utils.sync_channels(self.bot, guild) + + log.info("Beginning thread archive state synchronisation process") + await _syncer_utils.sync_thread_archive_state(guild) + + log.info("Beginning user synchronisation process") + async with async_session() as sess: + await sess.execute(update(models.User).values(in_guild=False)) + await sess.commit() + + users = [ + { + "id": str(user.id), + "name": user.name, + "avatar_hash": getattr(user.avatar, "key", None), + "guild_avatar_hash": getattr(user.guild_avatar, "key", None), + "joined_at": user.joined_at, + "created_at": user.created_at, + "is_staff": BotConfig.staff_role_id in [role.id for role in user.roles], + "bot": user.bot, + "in_guild": True, + "public_flags": dict(user.public_flags), + "pending": user.pending, + } + for user in guild.members + ] + + user_chunks = discord.utils.as_chunks(users, 500) + created = 0 + updated = 0 + total_users = len(users) + + log.info("Performing bulk upsert of %d rows in %d chunks", total_users, math.ceil(total_users / 500)) + + async with async_session() as sess: + for chunk in user_chunks: + qs = insert(models.User).returning(column("xmax")).values(chunk) + + update_cols = [ + "name", + "avatar_hash", + "guild_avatar_hash", + "joined_at", + "is_staff", + "bot", + "in_guild", + "public_flags", + "pending", + ] + + res = await sess.execute(qs.on_conflict_do_update( + index_elements=[models.User.id], + set_={k: getattr(qs.excluded, k) for k in update_cols}, + )) + + objs = list(res) + + created += [obj[0] == 0 for obj in objs].count(True) + updated += [obj[0] != 0 for obj in objs].count(True) + + log.info("User upsert: inserted %d rows, updated %d rows, done %d rows, %d rows remaining", + created, updated, created + updated, total_users - (created + updated)) + + await sess.commit() + + log.info("User upsert complete") + + self.bot.sync_process_complete.set() + + @commands.Cog.listener() + async def on_guild_available(self, guild: discord.Guild) -> None: + """Synchronize the user table with the Discord users.""" + log.info("Received guild available for %d", guild.id) + + if guild.id != BotConfig.guild_id: + log.info("Guild was not the configured guild, discarding event") + return + + await self.sync_guild() + + +async def setup(bot: Bot) -> None: + """Load the GuildListeners cog.""" + await bot.add_cog(StartupSyncer(bot)) From e302fe21cae062ee62ef525f3f31817e3196a5db Mon Sep 17 00:00:00 2001 From: Chris Lovering Date: Thu, 6 Jun 2024 20:25:09 +0100 Subject: [PATCH 4/5] Bump metricity version to 2.6.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e817933..5c5e61c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metricity" -version = "2.5.1" +version = "2.6.0" description = "Advanced metric collection for the Python Discord server" authors = ["Joe Banks "] license = "MIT" From a03c02c94c9d514db872fafe88146a0e7ec81ea4 Mon Sep 17 00:00:00 2001 From: Chris Lovering Date: Thu, 6 Jun 2024 20:29:01 +0100 Subject: [PATCH 5/5] Use a generator instead of a list to store the members to sync This reduces the memory required on bot startup --- metricity/exts/event_listeners/startup_sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metricity/exts/event_listeners/startup_sync.py b/metricity/exts/event_listeners/startup_sync.py index 7ff5026..0f6264f 100644 --- a/metricity/exts/event_listeners/startup_sync.py +++ b/metricity/exts/event_listeners/startup_sync.py @@ -39,7 +39,7 @@ async def sync_guild(self) -> None: await sess.execute(update(models.User).values(in_guild=False)) await sess.commit() - users = [ + users = ( { "id": str(user.id), "name": user.name, @@ -54,12 +54,12 @@ async def sync_guild(self) -> None: "pending": user.pending, } for user in guild.members - ] + ) user_chunks = discord.utils.as_chunks(users, 500) created = 0 updated = 0 - total_users = len(users) + total_users = len(guild.members) log.info("Performing bulk upsert of %d rows in %d chunks", total_users, math.ceil(total_users / 500))