Skip to content

Commit

Permalink
Copy internal cog updates from Python
Browse files Browse the repository at this point in the history
Signed-off-by: GitHub <noreply@github.com>
  • Loading branch information
shenanigansd authored Apr 15, 2024
1 parent 415f414 commit 06047c4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 65 deletions.
3 changes: 1 addition & 2 deletions src/bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class _BaseURLs(EnvConfig, env_prefix="urls_"):

github_bot_repo: str = "https://github.com/letsbuilda/anubis"

paste: str = "https://paste.pythondiscord.com"
paste_url: str = "https://paste.pythondiscord.com"


BaseURLs = _BaseURLs()
Expand All @@ -130,7 +130,6 @@ class _URLs(_BaseURLs):
connect_max_retries: int = 3
connect_cooldown: int = 5

paste_service: str = f"{BaseURLs.paste}/{{key}}"
site_logs_view: str = "https://pythondiscord.com/staff/bot/logs"


Expand Down
109 changes: 46 additions & 63 deletions src/bot/exts/utilities/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,25 @@
import traceback
from collections import Counter
from io import StringIO
from typing import Any, Self
from typing import Any

import arrow
import discord
from discord.ext.commands import Cog, Context, group, has_any_role, is_owner
from pydis_core.utils.paste_service import PasteFile, PasteTooLongError, PasteUploadError, send_to_paste_service

from bot.bot import Bot
from bot.constants import DEBUG_MODE, Roles
from bot.constants import BaseURLs, DEBUG_MODE, Roles
from bot.log import get_logger
from bot.utils import (
PasteTooLongError,
PasteUploadError,
find_nth_occurrence,
send_to_paste_service,
)
from bot.utils import find_nth_occurrence

log = get_logger(__name__)


class Internal(Cog):
"""Administrator and Core Developer commands."""

def __init__(self: Self, bot: Bot) -> None:
def __init__(self, bot: Bot):
self.bot = bot
self.env = {}
self.ln = 0
Expand All @@ -44,16 +40,12 @@ def __init__(self: Self, bot: Bot) -> None:
self.eval.add_check(is_owner().predicate)

@Cog.listener()
async def on_socket_event_type(self: Self, event_type: str) -> None:
async def on_socket_event_type(self, event_type: str) -> None:
"""When a websocket event is received, increase our counters."""
self.socket_event_total += 1
self.socket_events[event_type] += 1

def _format( # noqa: PLR0912 - double check this
self: Self,
inp: str,
out: Any, # noqa: ANN401 - double check this
) -> tuple[str, discord.Embed | None]:
def _format(self, inp: str, out: Any) -> tuple[str, discord.Embed | None]:
"""Format the eval output into a string & attempt to format it into an Embed."""
self._ = out

Expand All @@ -70,7 +62,7 @@ def _format( # noqa: PLR0912 - double check this

# Create the input dialog
for i, line in enumerate(lines):
if i == 0: # - spread out for docs
if i == 0:
# Start dialog
start = f"In [{self.ln}]: "

Expand All @@ -93,19 +85,20 @@ def _format( # noqa: PLR0912 - double check this
# to indent it.
start = "...: ".rjust(len(str(self.ln)) + 7)

if i == len(lines) - 2 and line.startswith("return"):
line = line[6:].strip() # noqa: PLW2901 - is literally fine
if i == len(lines) - 2:
if line.startswith("return"):
line = line[6:].strip()

# Combine everything
res += start + line + "\n"
res += (start + line + "\n")

self.stdout.seek(0)
text = self.stdout.read()
self.stdout.close()
self.stdout = StringIO()

if text:
res += text + "\n"
res += (text + "\n")

if out is None:
# No output, return the input statement
Expand All @@ -119,13 +112,14 @@ def _format( # noqa: PLR0912 - double check this
res = (res, out)

else:
if isinstance(out, str) and out.startswith(
"Traceback (most recent call last):\n",
):
if (isinstance(out, str) and out.startswith("Traceback (most recent call last):\n")):
# Leave out the traceback message
out = "\n" + "\n".join(out.split("\n")[1:])

pretty = out if isinstance(out, str) else pprint.pformat(out, compact=True, width=60)
if isinstance(out, str):
pretty = out
else:
pretty = pprint.pformat(out, compact=True, width=60)

if pretty != str(out):
# We're using the pretty version, start on the next line
Expand All @@ -135,19 +129,17 @@ def _format( # noqa: PLR0912 - double check this
# Text too long, shorten
li = pretty.split("\n")

pretty = (
"\n".join(li[:3]) # First 3 lines
+ "\n ...\n" # Ellipsis to indicate removed lines
+ "\n".join(li[-3:])
) # last 3 lines
pretty = ("\n".join(li[:3]) # First 3 lines
+ "\n ...\n" # Ellipsis to indicate removed lines
+ "\n".join(li[-3:])) # last 3 lines

# Add the output
res += pretty
res = (res, None)

return res # Return (text, embed)

async def _eval(self: Self, ctx: Context, code: str) -> discord.Message | None:
async def _eval(self, ctx: Context, code: str) -> discord.Message | None:
"""Eval the input code string & send an embed to the invoking context."""
self.ln += 1

Expand All @@ -162,11 +154,11 @@ async def _eval(self: Self, ctx: Context, code: str) -> discord.Message | None:
"channel": ctx.channel,
"guild": ctx.guild,
"ctx": ctx,
"self": Self,
"self": self,
"bot": self.bot,
"inspect": inspect,
"discord": discord,
"contextlib": contextlib,
"contextlib": contextlib
}

self.env.update(env)
Expand All @@ -183,16 +175,14 @@ async def func(): # (None,) -> Any
return _
finally:
self.env.update(locals())
""".format(
textwrap.indent(code, " "),
)
""".format(textwrap.indent(code, " "))

try:
exec(code_, self.env) # noqa: S102
func = self.env["func"]
res = await func()

except Exception: # noqa: BLE001 - eh...
except Exception:
res = traceback.format_exc()

out, embed = self._format(code, res)
Expand All @@ -207,22 +197,24 @@ async def func(): # (None,) -> Any
truncate_index = newline_truncate_index

if len(out) > truncate_index:
file = PasteFile(content=out)
try:
paste_link = await send_to_paste_service(
self.bot.http_session,
out,
extension="py",
resp = await send_to_paste_service(
files=[file],
http_session=self.bot.http_session,
paste_url=BaseURLs.paste_url,
)
except PasteTooLongError:
paste_text = "too long to upload to paste service."
except PasteUploadError:
paste_text = "failed to upload contents to paste service."
else:
paste_text = f"full contents at {paste_link}"
paste_text = f"full contents at {resp.link}"

await ctx.send(
f"```py\n{out[:truncate_index]}\n```... response truncated; {paste_text}",
embed=embed,
f"```py\n{out[:truncate_index]}\n```"
f"... response truncated; {paste_text}",
embed=embed
)
return None

Expand All @@ -231,39 +223,30 @@ async def func(): # (None,) -> Any

@group(name="internal", aliases=("int",))
@has_any_role(Roles.administrators, Roles.core_developers)
async def internal_group(self: Self, ctx: Context) -> None:
"""Internal commands. Top secret!.""" # noqa: D401 - formatting
async def internal_group(self, ctx: Context) -> None:
"""Internal commands. Top secret!"""
if not ctx.invoked_subcommand:
await ctx.send_help(ctx.command)

@internal_group.command(name="eval", aliases=("e",))
@has_any_role(Roles.administrators)
async def eval(
self: Self,
ctx: Context,
*,
code: str,
) -> None: # - uh... good point
async def eval(self, ctx: Context, *, code: str) -> None:
"""Run eval in a REPL-like format."""
code = code.strip("`")
if re.match("py(thon)?\n", code):
code = "\n".join(code.split("\n")[1:])

if (
not re.search( # Check if it's an expression
r"^(return|import|for|while|def|class|from|exit|[a-zA-Z0-9]+\s*=)",
code,
re.MULTILINE,
)
and len(code.split("\n")) == 1
):
code += "_ = "
if not re.search( # Check if it's an expression
r"^(return|import|for|while|def|class|"
r"from|exit|[a-zA-Z0-9]+\s*=)", code, re.M) and len(
code.split("\n")) == 1:
code = "_ = " + code

await self._eval(ctx, code)

@internal_group.command(name="socketstats", aliases=("socket", "stats"))
@has_any_role(Roles.administrators, Roles.core_developers)
async def socketstats(self: Self, ctx: Context) -> None:
@has_any_role(Roles.administrators, Roles.core_developers)
async def socketstats(self, ctx: Context) -> None:
"""Fetch information on the socket events received from Discord."""
running_s = (arrow.utcnow() - self.socket_since).total_seconds()

Expand All @@ -272,7 +255,7 @@ async def socketstats(self: Self, ctx: Context) -> None:
stats_embed = discord.Embed(
title="WebSocket statistics",
description=f"Receiving {per_s:0.2f} events per second.",
color=discord.Color.og_blurple(),
color=discord.Color.og_blurple()
)

for event_type, count in self.socket_events.most_common(25):
Expand Down

0 comments on commit 06047c4

Please sign in to comment.