From 33c72867513ecbd49fe237888350931fa4083db5 Mon Sep 17 00:00:00 2001 From: Thanos <111999343+Sachaa-Thanasius@users.noreply.github.com> Date: Mon, 15 Apr 2024 13:27:32 -0400 Subject: [PATCH] A few things: - Retire ai_generation cog. - Move inspire_me command to misc cog. - Upgrade python-requires, linting checks, and typing checks to >=3.11. - Change version to CalVer for convenience. --- core/utils/custom_logging.py | 6 +- core/utils/embeds.py | 7 +- core/utils/misc.py | 29 +-- core/utils/pagination.py | 7 +- exts/ai_generation/__init__.py | 15 -- exts/ai_generation/ai_generation.py | 270 ------------------------ exts/ai_generation/utils.py | 220 ------------------- exts/dice.py | 13 +- exts/emoji_ops.py | 23 +- exts/lol.py | 8 +- exts/misc.py | 58 ++++- exts/notifications/rss_notifications.py | 12 +- exts/patreon.py | 14 +- exts/snowball/utils.py | 8 +- exts/story_search.py | 6 +- exts/timing.py | 8 +- exts/todo.py | 7 +- pyproject.toml | 9 +- 18 files changed, 89 insertions(+), 631 deletions(-) delete mode 100644 exts/ai_generation/__init__.py delete mode 100644 exts/ai_generation/ai_generation.py delete mode 100644 exts/ai_generation/utils.py diff --git a/core/utils/custom_logging.py b/core/utils/custom_logging.py index 75c24ca..32b8c5f 100644 --- a/core/utils/custom_logging.py +++ b/core/utils/custom_logging.py @@ -13,17 +13,15 @@ import logging from logging.handlers import RotatingFileHandler from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Self from discord.utils import _ColourFormatter as ColourFormatter, stream_supports_colour # type: ignore # Because color. if TYPE_CHECKING: from types import TracebackType - - from typing_extensions import Self else: - TracebackType = Self = object + TracebackType = object __all__ = ("LoggingManager",) diff --git a/core/utils/embeds.py b/core/utils/embeds.py index db104a3..37f7aad 100644 --- a/core/utils/embeds.py +++ b/core/utils/embeds.py @@ -7,16 +7,11 @@ import itertools import logging from collections.abc import Iterable, Sequence -from typing import TYPE_CHECKING, TypeAlias +from typing import Self, TypeAlias import discord -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - AnyEmoji: TypeAlias = discord.Emoji | discord.PartialEmoji | str diff --git a/core/utils/misc.py b/core/utils/misc.py index ec1e383..abd9e2d 100644 --- a/core/utils/misc.py +++ b/core/utils/misc.py @@ -6,36 +6,11 @@ import logging import time -from asyncio import iscoroutinefunction -from collections.abc import Awaitable, Callable, Coroutine -from typing import TYPE_CHECKING, Any, ParamSpec, TypeGuard, TypeVar - - -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - -T = TypeVar("T") -P = ParamSpec("P") -Coro = Coroutine[Any, Any, T] __all__ = ("catchtime",) -def is_awaitable_func(func: Callable[P, T] | Callable[P, Awaitable[T]]) -> TypeGuard[Callable[P, Awaitable[T]]]: - return iscoroutinefunction(func) - - -def is_coroutine_func(func: Callable[P, T] | Callable[P, Coro[T]]) -> TypeGuard[Callable[P, Coro[T]]]: - return iscoroutinefunction(func) - - -def is_not_coroutine_func(func: Callable[P, T] | Callable[P, Coro[T]]) -> TypeGuard[Callable[P, T]]: - return not iscoroutinefunction(func) - - class catchtime: """A context manager class that times what happens within it. @@ -47,10 +22,10 @@ class catchtime: The logging channel to send the time to, if relevant. Optional. """ - def __init__(self, logger: logging.Logger | None = None) -> None: + def __init__(self, logger: logging.Logger | None = None): self.logger = logger - def __enter__(self) -> Self: + def __enter__(self): self.total_time = time.perf_counter() return self diff --git a/core/utils/pagination.py b/core/utils/pagination.py index 37f57ad..d08e1d9 100644 --- a/core/utils/pagination.py +++ b/core/utils/pagination.py @@ -8,16 +8,11 @@ import asyncio from abc import ABC, abstractmethod from collections.abc import Sequence -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing import Any, Generic, Self, TypeVar import discord -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - _LT = TypeVar("_LT") diff --git a/exts/ai_generation/__init__.py b/exts/ai_generation/__init__.py deleted file mode 100644 index 455ccd3..0000000 --- a/exts/ai_generation/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from .ai_generation import AIGenerationCog - - -if TYPE_CHECKING: - from core import Beira - - -async def setup(bot: Beira) -> None: - """Connects cog to bot.""" - - await bot.add_cog(AIGenerationCog(bot)) diff --git a/exts/ai_generation/ai_generation.py b/exts/ai_generation/ai_generation.py deleted file mode 100644 index 5e4f44a..0000000 --- a/exts/ai_generation/ai_generation.py +++ /dev/null @@ -1,270 +0,0 @@ -""" -ai_generation.py: A cog with commands for doing fun AI things with OpenAI and other AI APIs, like generating images -and morphs. -""" - -from __future__ import annotations - -import asyncio -import logging -from io import BytesIO -from time import perf_counter -from typing import Literal - -import discord -from discord.ext import commands -from PIL import Image - -import core - -from .utils import create_completion, create_image, create_inspiration, create_morph, get_image, process_image - - -LOGGER = logging.getLogger(__name__) - -INSPIROBOT_ICON_URL = "https://pbs.twimg.com/profile_images/815624354876760064/zPmAZWP4_400x400.jpg" - - -class DownloadButtonView(discord.ui.View): - """A small view that adds download buttons to a message based on the given labels and download urls.""" - - def __init__(self, *button_links: tuple[str, str], timeout: int | None = 180) -> None: - super().__init__(timeout=timeout) - for link in button_links: - self.add_item(discord.ui.Button(style=discord.ButtonStyle.blurple, label=link[0], url=link[1])) - - -class AIGenerationCog(commands.Cog, name="AI Generation"): - """A cog with commands for doing fun AI things with OpenAI's API, like generating images and morphs. - - Note: This is all Athena's fault. - """ - - def __init__(self, bot: core.Beira) -> None: - self.bot = bot - - @property - def cog_emoji(self) -> discord.PartialEmoji: - """:class:`discord.PartialEmoji`: A partial emoji representing this cog.""" - - return discord.PartialEmoji(name="\N{ROBOT FACE}") - - async def cog_command_error(self, ctx: core.Context, error: Exception) -> None: # type: ignore # Narrowing - """Handles any errors within this cog.""" - - assert ctx.command is not None - - # Extract the original error. - error = getattr(error, "original", error) - if ctx.interaction: - error = getattr(error, "original", error) - - embed = discord.Embed(color=0x5E9A40) - - if isinstance(error, ConnectionError | KeyError): - LOGGER.warning("OpenAI Response error.", exc_info=error) - embed.title = "OpenAI Response Error" - embed.description = "There's a connection issue with OpenAI's API. Please try again in a minute or two." - ctx.command.reset_cooldown(ctx) - elif isinstance(error, commands.CommandOnCooldown): - embed.title = "Command on Cooldown!" - embed.description = f"Please wait {error.retry_after:.2f} seconds before trying this command again." - else: - embed.title = f'Error with "{ctx.command}"' - embed.description = "You've triggered an error with this command. Please try again in a minute or two." - LOGGER.exception("Unknown command error in AIGenerationCog.", exc_info=error) - - await ctx.send(embed=embed, ephemeral=True, delete_after=10) - - async def morph_user(self, target: discord.User, prompt: str) -> tuple[str, BytesIO]: - """Does the morph process. - - Parameters - ---------- - target: :class:`discord.User` - The person whose avatar will be morphed. - prompt: :class:`str` - The text that the AI will use. - """ - - # Save the avatar to a bytes buffer. - avatar_buffer = BytesIO() - await target.display_avatar.replace(size=256, format="png", static_format="png").save(avatar_buffer) - - # Verify what the new size is. - with Image.open(avatar_buffer) as avatar_image: - file_size = avatar_image.size - - ai_url = await create_image(self.bot.web_session, prompt, file_size) - ai_bytes = await get_image(self.bot.web_session, ai_url) - ai_buffer = await asyncio.to_thread(process_image, ai_bytes) - gif_buffer = await create_morph(avatar_buffer, ai_buffer) - - return ai_url, gif_buffer - - @commands.hybrid_group() - async def openai(self, ctx: core.Context) -> None: - """A group of commands using OpenAI's API. Includes morphing, image generation, and text generation.""" - - await ctx.send_help(ctx.command) - - @openai.command(name="pigeonify") - @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def morph_athena(self, ctx: core.Context, target: discord.User | None = None) -> None: - """Turn Athena (or someone else) into the pigeon she is at heart. - - Parameters - ---------- - ctx: :class:`core.Context` - The invocation context. - target: :class:`discord.User`, optional - The user whose avatar will be pigeonified. Defaults to Athena. - """ - - async with ctx.typing(): - default_target: discord.User = self.bot.get_user(self.bot.special_friends["athenahope"]) # type: ignore - act_target = target or default_target - prompt = "an anxious, dumb, insane, crazy-looking cartoon pigeon" - - log_start_time = perf_counter() - ai_img_url, result_gif = await self.morph_user(act_target, prompt) - log_end_time = perf_counter() - - morph_time = log_end_time - log_start_time - - # Create and send an embed that holds the generated morph. - gif_file = discord.File(result_gif, filename="pigeonlord.gif") - embed = ( - discord.Embed( - color=0x5D6E7F, - title=f"{act_target.display_name}'s True Form", - description="***Behold!***", - ) - .set_image(url="attachment://pigeonlord.gif") - .set_footer(text=f"Generated using the OpenAI API | Total Generation Time: {morph_time:.3f}s") - ) - - sent_message = await ctx.send(embed=embed, file=gif_file) - - # Create two download buttons. - morph_url = sent_message.embeds[0].image.url or " " - buttons_view = DownloadButtonView(("Download Morph", morph_url), ("Download Final Image", ai_img_url)) - await sent_message.edit(view=buttons_view) - - @openai.command(name="morph") - @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def morph_general(self, ctx: core.Context, target: discord.User, *, prompt: str) -> None: - """Create a morph gif with a user's avatar and a prompt-based AI image. - - Parameters - ---------- - ctx: :class:`core.Context` - The invocation context. - target: :class:`discord.User` - The user whose avatar will be morphed. - prompt: :class:`str` - The text that the AI will use. - """ - - async with ctx.typing(): - log_start_time = perf_counter() - ai_img_url, result_gif = await self.morph_user(target, prompt) - log_end_time = perf_counter() - - morph_time = log_end_time - log_start_time - - # Create and send an embed that holds the generated morph. - gif_file = discord.File(result_gif, filename="morph.gif") - embed = ( - discord.Embed(color=0x5D6E7F, title=f"Morph of {target.display_name}", description="—+—+—+—+—+—+—") - .add_field(name="Prompt", value=prompt) - .set_image(url="attachment://morph.gif") - .set_footer(text=f"Generated using the OpenAI API | Total Generation Time: {morph_time:.3f}s") - ) - - LOGGER.info("Total morph time: %.5fs", morph_time) - - sent_message = await ctx.send(embed=embed, file=gif_file) - - # Create two download buttons. - morph_url = sent_message.embeds[0].image.url or " " - buttons_view = DownloadButtonView(("Download Morph", morph_url), ("Download Final Image", ai_img_url)) - await sent_message.edit(view=buttons_view) - - @openai.command() - @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def generate( - self, - ctx: core.Context, - generation_type: Literal["text", "image"] = "image", - *, - prompt: str, - ) -> None: - """Create and send AI-generated images or text based on a given prompt. - - Parameters - ---------- - ctx: :class:`core.Context` - The invocation context. - generation_type: Literal["text", "image"], default="image" - What the AI is generating. - prompt: :class:`str` - The text that the AI will use. - """ - - async with ctx.typing(): - embed = discord.Embed(color=0x5D6E7F, title="AI-Generated", description="—+—+—+—+—+—+—") - assert (embed.title is not None) and (embed.description is not None) # For typing. - - if generation_type == "image": - log_start_time = perf_counter() - ai_url = await create_image(ctx.session, prompt, (512, 512)) - ai_bytes = await get_image(ctx.session, ai_url) - ai_buffer = await asyncio.to_thread(process_image, ai_bytes) - creation_time = perf_counter() - log_start_time - - # Send the generated text in an embed. - ai_img_file = discord.File(ai_buffer, filename="ai_image.png") - embed.title += " Image" - ( - embed.add_field(name="Prompt", value=prompt) - .set_image(url="attachment://ai_image.png") - .set_footer(text=f"Generated using the OpenAI API | Total Generation Time: {creation_time:.3f}s") - ) - sent_message = await ctx.send(embed=embed, file=ai_img_file) - - # Create a download button. - image_url = sent_message.embeds[0].image.url or " " - await sent_message.edit(view=DownloadButtonView(("Download Image", image_url))) - - elif generation_type == "text": - log_start_time = perf_counter() - ai_text = await create_completion(ctx.session, prompt) - creation_time = perf_counter() - log_start_time - - # Send the generated image in an embed. - embed.title += " Text" - ( - embed.add_field(name="Prompt", value=prompt, inline=False) - .add_field(name="Result", value=ai_text, inline=False) - .set_footer(text=f"Generated using the OpenAI API | Total Generation Time: {creation_time:.3f}s") - ) - await ctx.send(embed=embed) - - else: - embed.title += " Error" - embed.description += "\nPlease enter the type of output — `image` or `text` — before your prompt." - await ctx.send(embed=embed) - - @commands.hybrid_command() - async def inspire_me(self, ctx: core.Context) -> None: - """Generate a random inspirational poster with InspiroBot.""" - - async with ctx.typing(): - image_url = await create_inspiration(ctx.session) - embed = ( - discord.Embed(color=0xE04206) - .set_image(url=image_url) - .set_footer(text="Generated with InspiroBot at https://inspirobot.me/", icon_url=INSPIROBOT_ICON_URL) - ) - await ctx.send(embed=embed) diff --git a/exts/ai_generation/utils.py b/exts/ai_generation/utils.py deleted file mode 100644 index aefcf46..0000000 --- a/exts/ai_generation/utils.py +++ /dev/null @@ -1,220 +0,0 @@ -import asyncio -import logging -import tempfile -from collections.abc import Generator -from contextlib import contextmanager -from io import BytesIO -from pathlib import Path - -import aiohttp -from PIL import Image - -import core - - -__all__ = ( - "get_image", - "process_image", - "create_completion", - "create_image", - "create_inspiration", - "temp_file_names", - "create_morph", -) - -LOGGER = logging.getLogger(__name__) - -FFMPEG = Path("C:/ffmpeg/bin/ffmpeg.exe") # Depends on the machine and its PATH. -INSPIROBOT_API_URL = "https://inspirobot.me/api" - - -async def get_image(session: aiohttp.ClientSession, url: str) -> bytes: - """Asynchronously load the bytes of an image from a url. - - Parameters - ---------- - session: :class:`aiohttp.ClientSession` - The web session with which to retrieve the image data. - url: :class:`str` - The url to retrieve the image from. - - Returns - ------- - :class:`bytes` - The image data. - """ - - async with session.get(url) as resp: - return await resp.read() - - -def process_image(image_bytes: bytes) -> BytesIO: - """Processes image data with PIL.""" - - with Image.open(BytesIO(image_bytes)) as new_image: - output_buffer = BytesIO() - new_image.save(output_buffer, "png") - output_buffer.seek(0) - - return output_buffer - - -async def create_completion(session: aiohttp.ClientSession, prompt: str) -> str: - """Makes a call to OpenAI's API to generate text based on given input. - - Parameters - ---------- - prompt: :class:`str` - The text OpenAI will generatively complete. - - Returns - ------- - text: :class:`str` - The generated text completion, or an empty string if it failed. - """ - - async with session.post( - "https://api.openai.com/v1/completions", - headers={"Authorization": f"Bearer {core.CONFIG}"}, - json={ - "model": "gpt-3.5-turbo-instruct", - "prompt": prompt, - "max_tokens": 150, - "temperature": 0, - }, - ) as response: - result = await response.json() - try: - return result["choices"][0]["text"] or "" - except (KeyError, IndexError): - return "" - - -async def create_image(session: aiohttp.ClientSession, prompt: str, size: tuple[int, int] = (256, 256)) -> str: - """Makes a call to OpenAI's API to generate an image based on given inputs. - - Parameters - ---------- - prompt: :class:`str` - The text OpenAI will use to generate the image. - size: tuple[:class:`int`, :class:`int`] - The dimensions of the resulting image. - - Returns - ------- - url: :class:`str` - The url of the generated image, or an empty string if it failed. - """ - - async with session.post( - "https://api.openai.com/v1/images/generations", - headers={"Authorization": f"Bearer {core.CONFIG}"}, - json={ - "model": "dall-e-2", - "prompt": prompt, - "n": 1, - "size": f"{size[0]}x{size[1]}", # FIXME: Find a way to pass in a literal. - "response_format": "url", - }, - ) as response: - result = await response.json() - try: - return result["data"][0]["url"] or "" - except (KeyError, IndexError): - return "" - - -async def create_inspiration(session: aiohttp.ClientSession) -> str: - """Makes a call to InspiroBot's API to generate an inspirational poster. - - Parameters - ---------- - session: :class:`aiohttp.ClientSession` - The web session used to access the API. - - Returns - ------- - :class:`str` - The url for the generated poster. - """ - - async with session.get(url=INSPIROBOT_API_URL, params={"generate": "true"}) as response: - response.raise_for_status() - return await response.text() - - -@contextmanager -def temp_file_names(*extensions: str) -> Generator[tuple[Path, ...], None, None]: - """Create temporary filesystem paths to generated filenames in a temporary folder. - - Upon completion, the folder is removed. - - Parameters - ---------- - *extensions: tuple[:class:`str`] - The file extensions that the generated filenames should have, e.g. py, txt, doc. - - Yields - ------ - temp_paths: tuple[:class:`Path`] - Filepaths with random filenames with the given file extensions, in order. - """ - - with tempfile.TemporaryDirectory() as temp_dir: - temp_paths = tuple(Path(temp_dir).joinpath(f"temp_output{i}." + ext) for i, ext in enumerate(extensions)) - yield temp_paths - - -async def create_morph(before_img_buffer: BytesIO, after_img_buffer: BytesIO) -> BytesIO: - """Create a morph gif between two images using ffmpeg. - - Parameters - ---------- - before_img_buffer: :class:`BytesIO` - The starting image loaded as bytes in a buffer. - after_img_buffer: :class:`BytesIO` - The ending image loaded as bytes in a buffer. - - Returns - ------- - gif_buffer: :class:`BytesIO` - The gif loaded as bytes in a buffer. - - References - ---------- - Source of the ffmpeg command: https://stackoverflow.com/questions/71178068/video-morph-between-two-images-ffmpeg-minterpolate - """ - - with temp_file_names("png", "png", "mp4", "gif") as (avatar_temp, ai_temp, mp4_temp, gif_temp): - # Save the input images to temporary files. - with avatar_temp.open("wb") as file: - file.write(before_img_buffer.getvalue()) - - with ai_temp.open("wb") as file: - file.write(after_img_buffer.getvalue()) - - # Run an ffmpeg command to create and save the morph mp4 from the temp images. - # fmt: off - cmd1_list = [ - f"{FFMPEG}", "-nostdin", "-y", "-r", "0.3", "-stream_loop", "1", "-i", f"{avatar_temp}", - "-r", "0.3", "-stream_loop", "2", "-i", f"{ai_temp}", - "-filter_complex", - "[0][1]concat=n=2:v=1:a=0[v];[v]minterpolate=fps=24:scd=none,trim=3:7,setpts=PTS-STARTPTS", - "-pix_fmt", "yuv420p", f"{mp4_temp}", - ] - # fmt: on - process1 = await asyncio.create_subprocess_exec(*cmd1_list) - await process1.wait() - LOGGER.info("MP4 creation completed") - - # Run another ffmpeg command to convert the morph mp4 into a gif. - cmd2_list = [f"{FFMPEG}", "-i", f"{mp4_temp}", "-f", "gif", f"{gif_temp}"] - process2 = await asyncio.create_subprocess_exec(*cmd2_list) - await process2.wait() - LOGGER.info("GIF creation completed.") - - # Save the gif to a buffer. - gif_buffer = BytesIO(gif_temp.read_bytes()) - gif_buffer.seek(0) - - return gif_buffer diff --git a/exts/dice.py b/exts/dice.py index 0b5ff10..f3d3d50 100644 --- a/exts/dice.py +++ b/exts/dice.py @@ -2,7 +2,6 @@ dice.py: The extension that holds a die roll command and all the associated utility classes. TODO: Consider adding more elements from https://wiki.roll20.net/Dice_Reference. -TODO: Consider switching to or extending Sinbad's dicelib: https://github.com/mikeshardmind/dicelib. """ from __future__ import annotations @@ -14,7 +13,7 @@ import textwrap from collections.abc import Callable from io import StringIO -from typing import TYPE_CHECKING, Any, cast +from typing import Any, Self import discord import msgspec @@ -25,12 +24,6 @@ from core.utils import EMOJI_STOCK -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - - LOGGER = logging.getLogger(__name__) @@ -582,7 +575,9 @@ async def run_expression(self, interaction: discord.Interaction, _: ui.Button[Se send_kwargs["embed"] = DiceEmbed(expression_info=(self.expression, filled_in_expression, result)) send_kwargs["view"] = ui.View().add_item(RerollButton(expression=self.expression)) else: - dice_select = cast(DiceSelect, discord.utils.get(self.children, custom_id="dice:select")) + dice_select = discord.utils.get(self.children, custom_id="dice:select") + assert isinstance(dice_select, DiceSelect) # Known at runtime. + if dice_select.values: dice_info = {int(val): self.num_rolls for val in dice_select.values} roll_info = roll_basic_dice(dice_info) diff --git a/exts/emoji_ops.py b/exts/emoji_ops.py index 3ad50f6..889889f 100644 --- a/exts/emoji_ops.py +++ b/exts/emoji_ops.py @@ -10,23 +10,29 @@ import logging import re import unicodedata -from typing import TYPE_CHECKING, Any +from io import BytesIO +from typing import Any, Self import discord from discord import app_commands from discord.ext import commands +from PIL import Image import core -from .ai_generation.utils import get_image, process_image +LOGGER = logging.getLogger(__name__) -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object -LOGGER = logging.getLogger(__name__) +def process_image(image_bytes: bytes) -> BytesIO: + """Processes image data with PIL.""" + + with Image.open(BytesIO(image_bytes)) as new_image: + output_buffer = BytesIO() + new_image.save(output_buffer, "png") + output_buffer.seek(0) + + return output_buffer class GuildStickerFlags(commands.FlagConverter): @@ -403,7 +409,8 @@ async def emoji_add( return # Attempt to read the input as an image url. - emoji_bytes = await get_image(ctx.session, entity) + async with ctx.session.get(entity) as resp: + emoji_bytes = await resp.read() else: # Attempt to convert and read the input as an emoji normally. emoji_bytes = await converted_emoji.read() diff --git a/exts/lol.py b/exts/lol.py index a9d9b95..c9746a7 100644 --- a/exts/lol.py +++ b/exts/lol.py @@ -10,7 +10,7 @@ import itertools import logging from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import Any, Self from urllib.parse import quote, urljoin import aiohttp @@ -23,12 +23,6 @@ from core.utils import StatsEmbed -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - - LOGGER = logging.getLogger(__name__) GECKODRIVER = Path().resolve().joinpath("drivers/geckodriver/geckodriver.exe") diff --git a/exts/misc.py b/exts/misc.py index 8aecef6..f32cc7a 100644 --- a/exts/misc.py +++ b/exts/misc.py @@ -13,8 +13,10 @@ import random import re import tempfile +import time from io import BytesIO, StringIO +import aiohttp import discord import openpyxl import openpyxl.styles @@ -22,11 +24,13 @@ from discord.ext import commands import core -from core.utils import catchtime LOGGER = logging.getLogger(__name__) +INSPIROBOT_API_URL = "https://inspirobot.me/api" +INSPIROBOT_ICON_URL = "https://pbs.twimg.com/profile_images/815624354876760064/zPmAZWP4_400x400.jpg" + def capitalize_meow(word: str, reference: str) -> str: """Capitalize the meow-ified version of a word based on the original word's capitalization.""" @@ -100,7 +104,7 @@ def color_step(r: int, g: int, b: int, repetitions: int = 1) -> tuple[int, int, def process_color_data(role_data: list[tuple[str, discord.Colour]]) -> BytesIO: - """Sort colors, format them in an excel sheet, and return that sheet as a bytes stream.""" + """Format role names and colors in an excel sheet and return that sheet as a bytes stream.""" headers = ["Role Name", "Role Color (Hex)"] workbook = openpyxl.Workbook() @@ -124,6 +128,25 @@ def process_color_data(role_data: list[tuple[str, discord.Colour]]) -> BytesIO: return BytesIO(tmp.read()) +async def create_inspiration(session: aiohttp.ClientSession) -> str: + """Makes a call to InspiroBot's API to generate an inspirational poster. + + Parameters + ---------- + session: :class:`aiohttp.ClientSession` + The web session used to access the API. + + Returns + ------- + :class:`str` + The url for the generated poster. + """ + + async with session.get(url=INSPIROBOT_API_URL, params={"generate": "true"}) as response: + response.raise_for_status() + return await response.text() + + class MiscCog(commands.Cog, name="Misc"): """A cog with some basic commands, originally used for testing slash and hybrid command functionality.""" @@ -224,17 +247,17 @@ async def ping_(self, ctx: core.Context) -> None: ws_ping = self.bot.latency * 1000 - with catchtime() as ct: - await ctx.typing() - typing_ping = ct.total_time * 1000 + start_time = time.perf_counter() + await ctx.typing() + typing_ping = (time.perf_counter() - start_time) * 1000 - with catchtime() as ct: - await self.bot.db_pool.fetch("""SELECT * FROM guilds;""") - db_ping = ct.total_time * 1000 + start_time = time.perf_counter() + await self.bot.db_pool.fetch("""SELECT * FROM guilds;""") + db_ping = (time.perf_counter() - start_time) * 1000 - with catchtime() as ct: - message = await ctx.send(embed=discord.Embed(title="Ping...")) - msg_ping = ct.total_time * 1000 + start_time = time.perf_counter() + message = await ctx.send(embed=discord.Embed(title="Ping...")) + msg_ping = (time.perf_counter() - start_time) * 1000 pong_embed = ( discord.Embed(title="Pong! \N{TABLE TENNIS PADDLE AND BALL}") @@ -284,6 +307,19 @@ def color_key(item: tuple[str, discord.Colour]) -> tuple[int, int, int]: disc_file = discord.File(processed_data, f"{ctx.guild.name}-roles-sheet.xlsx") await ctx.send("Created Excel sheet with roles.", file=disc_file) + @commands.hybrid_command() + async def inspire_me(self, ctx: core.Context) -> None: + """Generate a random inspirational poster with InspiroBot.""" + + async with ctx.typing(): + image_url = await create_inspiration(ctx.session) + embed = ( + discord.Embed(color=0xE04206) + .set_image(url=image_url) + .set_footer(text="Generated with InspiroBot at https://inspirobot.me/", icon_url=INSPIROBOT_ICON_URL) + ) + await ctx.send(embed=embed) + async def setup(bot: core.Beira) -> None: """Connects cog to bot.""" diff --git a/exts/notifications/rss_notifications.py b/exts/notifications/rss_notifications.py index 19b60f8..e9b1029 100644 --- a/exts/notifications/rss_notifications.py +++ b/exts/notifications/rss_notifications.py @@ -1,8 +1,9 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING +from typing import Self +import aiohttp import asyncpg import discord import msgspec @@ -11,13 +12,6 @@ import core -if TYPE_CHECKING: - from aiohttp import ClientSession - from typing_extensions import Self -else: - ClientSession = Self = object - - class NotificationRecord(msgspec.Struct): id: int url: str @@ -25,7 +19,7 @@ class NotificationRecord(msgspec.Struct): webhook: discord.Webhook @classmethod - def from_record(cls, record: asyncpg.Record, *, session: ClientSession) -> Self: + def from_record(cls, record: asyncpg.Record, *, session: aiohttp.ClientSession) -> Self: webhook = discord.Webhook.from_url(record["notification_webhook"], session=session) return cls(record["notification_id"], record["notification_url"], record["last_notification"], webhook) diff --git a/exts/patreon.py b/exts/patreon.py index 5a35b4d..dbbf8ee 100644 --- a/exts/patreon.py +++ b/exts/patreon.py @@ -9,8 +9,9 @@ import logging import textwrap import urllib.parse -from typing import TYPE_CHECKING, Any +from typing import Any, Self +import asyncpg import discord import msgspec from discord.ext import commands, tasks @@ -19,13 +20,6 @@ from core.utils import PaginatedSelectView -if TYPE_CHECKING: - from asyncpg import Record - from typing_extensions import Self -else: - Record = Self = object - - LOGGER = logging.getLogger(__name__) CAMPAIGN_BASE = "https://www.patreon.com/api/oauth2/v2/campaigns" @@ -54,7 +48,7 @@ class PatreonTierInfo(msgspec.Struct): color: discord.Colour = discord.Colour.default() @classmethod - def from_record(cls, record: Record) -> Self: + def from_record(cls, record: asyncpg.Record) -> Self: attrs_ = ("creator_name", "tier_name", "tier_value", "tier_info", "discord_guild", "tier_role") return cls(*(record[attr] for attr in attrs_), emoji=discord.PartialEmoji.from_str(record["tier_emoji"])) @@ -150,7 +144,7 @@ async def _get_patreon_roles(self) -> None: await self.bot.wait_until_ready() query = """SELECT * FROM patreon_creators WHERE creator_name = 'ACI100' ORDER BY tier_value;""" - records: list[Record] = await self.bot.db_pool.fetch(query) + records: list[asyncpg.Record] = await self.bot.db_pool.fetch(query) self.patreon_tiers_info = [PatreonTierInfo.from_record(record) for record in records] temp_guild_id = self.patreon_tiers_info[0].guild_id diff --git a/exts/snowball/utils.py b/exts/snowball/utils.py index fc2cbf2..092f0eb 100644 --- a/exts/snowball/utils.py +++ b/exts/snowball/utils.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import Self import asyncpg import discord @@ -11,12 +11,6 @@ from core.utils.db import Connection_alias, Pool_alias, upsert_guilds, upsert_users -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - - __all__ = ( "SnowballRecord", "GuildSnowballSettings", diff --git a/exts/story_search.py b/exts/story_search.py index 4d2b278..4363187 100644 --- a/exts/story_search.py +++ b/exts/story_search.py @@ -14,7 +14,7 @@ from bisect import bisect_left from functools import lru_cache from pathlib import Path -from typing import TYPE_CHECKING, Any, ClassVar +from typing import Any, ClassVar, Self import aiohttp import asyncpg @@ -32,10 +32,6 @@ else: import importlib_resources -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object LOGGER = logging.getLogger(__name__) diff --git a/exts/timing.py b/exts/timing.py index d83091f..0ec8a91 100644 --- a/exts/timing.py +++ b/exts/timing.py @@ -1,7 +1,7 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING, TypedDict +from typing import NotRequired, TypedDict from zoneinfo import ZoneInfo, ZoneInfoNotFoundError import aiohttp @@ -12,12 +12,6 @@ import core -if TYPE_CHECKING: - from typing_extensions import NotRequired -else: - NotRequired = object - - # The supposed schedule table right now. """ CREATE TABLE IF NOT EXISTS scheduled_dispatches ( diff --git a/exts/todo.py b/exts/todo.py index a2bf837..4b7399f 100644 --- a/exts/todo.py +++ b/exts/todo.py @@ -8,7 +8,7 @@ import logging import textwrap from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any +from typing import Any, Self import asyncpg import discord @@ -20,11 +20,6 @@ from core.utils.db import Connection_alias, Pool_alias -if TYPE_CHECKING: - from typing_extensions import Self -else: - Self = object - LOGGER = logging.getLogger(__name__) diff --git a/pyproject.toml b/pyproject.toml index 224dc3b..822489e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,10 @@ [project] name = "Beira" -version = "0.0.1" +version = "2024.04.15" description = "An personal Discord bot made in Python." readme = "README.md" license = { file = "LICENSE" } -requires-python = ">=3.10" +requires-python = ">=3.11" authors = [ { name = "Sachaa-Thanasius", email = "111999343+Sachaa-Thanasius@users.noreply.github.com" }, ] @@ -15,7 +15,7 @@ Homepage = "https://github.com/Sachaa-Thanasius/Beira" [tool.ruff] include = ["main.py", "core/*", "exts/*", "**/pyproject.toml"] line-length = 120 -target-version = "py310" +target-version = "py311" [tool.ruff.lint] select = [ @@ -92,9 +92,10 @@ combine-as-imports = true [tool.pyright] include = ["main.py", "core", "exts"] -pythonVersion = "3.10" +pythonVersion = "3.11" typeCheckingMode = "strict" # reportImportCycles = "warning" reportPropertyTypeMismatch = "warning" reportUnnecessaryTypeIgnoreComment = "warning" +enableExperimentalFeatures = true \ No newline at end of file