-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbot.py
220 lines (168 loc) · 6.84 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
"""The main file of the bot, basically sets up and starts the bot """
import asyncio
import json
import logging
import os
import sqlite3
from datetime import datetime
import aiohttp_client_cache
from aiohttp import ClientTimeout
from dotenv import load_dotenv
import hikari as hk
import lightbulb as lb
import miru
from lightbulb.ext import tasks
from functions.help import BotHelpCommand
from functions.utils import verbose_timedelta
load_dotenv()
# TODO
# 1. Nox based testing
def make_prefix(app, message: hk.Message) -> list:
cfg = open("config.json", encoding="utf-8")
cfg = json.loads(cfg.read())
guild_prefix_map = cfg["GUILD_PREFIX_MAP"]
prefixes = guild_prefix_map.get(str(message.guild_id), ["-"])
# print("Guild prefixes", prefixes, type(prefixes))
return prefixes
# The following snippet is borrowed from:
# https://github.com/Nereg/ARKMonitorBot/
def setup_logging() -> None:
"""Set up the logging of the events to log.txt (for debugging) [Level-1]"""
# get root logger
root_logger = logging.getLogger("")
# create a rotating file handler with 1 backup file and 1 megabyte size
file_handler = logging.handlers.RotatingFileHandler(
"./logs/log.txt", "w+", 1_000_000, 1, "UTF-8"
)
# create a default console handler
console_handler = logging.StreamHandler()
# create a formatting style (modified from hikari)
formatter = logging.Formatter(
fmt="%(levelname)-1.1s %(asctime)23.23s %(name)s @ %(lineno)d: %(message)s"
)
# add the formatter to both handlers
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# add both handlers to the root logger
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# set logging level to debug
root_logger.setLevel(logging.DEBUG)
bot = lb.BotApp(
token=os.getenv("BOT_TOKEN"),
intents=hk.Intents.ALL_UNPRIVILEGED
# | hk.Intents.ALL_PRIVILEGED,
| hk.Intents.MESSAGE_CONTENT | hk.Intents.GUILD_MEMBERS,
prefix=lb.when_mentioned_or(make_prefix),
logs="INFO",
allow_color=False,
owner_ids=[701090852243505212],
)
miru.install(bot)
tasks.load(bot)
bot.load_extensions_from("./extensions/")
@bot.listen()
async def on_starting(event: hk.StartingEvent) -> None:
"""Code which is executed once when the bot starts"""
bot.d.aio_session = aiohttp_client_cache.CachedSession(
cache_name="cache_db.db",
expire_after=24 * 60 * 60,
urls_expire_after={
"*.mangadex.org": 15 * 60,
"*.steampowered.com": 1 * 60 * 60,
},
allowed_codes=(200, 403, 404), # Cache responses with these status codes
allowed_methods=["GET", "POST"], # Cache requests with these HTTP methods
include_headers=True,
ignored_params=[
"auth_token"
], # Keep using the cached response even if this param changes
timeout=ClientTimeout(total=10),
)
bot.d.timeup = datetime.now().astimezone()
bot.d.chapter_info = {}
bot.d.update_channels = ["1127609035374461070"]
bot.d.con = sqlite3.connect("akane_db.db")
os.makedirs("pictures", exist_ok=True)
os.makedirs("logs", exist_ok=True)
with open("./logs/log.txt", "w+", encoding="UTF-8"):
pass
setup_logging()
@bot.listen()
async def on_stopping(event: hk.StoppingEvent) -> None:
"""Code which is executed once when the bot stops"""
await bot.rest.create_message(
1129030476695343174,
f"Bot closed with {verbose_timedelta(datetime.now().astimezone()-bot.d.timeup)} uptime",
)
@bot.command
@lb.command("ping", description="The bot's ping", ephemeral=True)
@lb.implements(lb.PrefixCommand, lb.SlashCommand)
async def ping(ctx: lb.Context) -> None:
"""Check the latency of the bot
Args:
ctx (lb.Context): The event context (irrelevant to the user)
"""
await ctx.respond(f"Pong! Latency: {bot.heartbeat_latency*1000:.2f}ms")
@bot.listen(lb.CommandErrorEvent)
async def on_error(event: lb.CommandErrorEvent) -> None:
"""The base function to listen for all errors
Args:
event (lb.CommandErrorEvent): The event context
Raises:
event.exception: Base exception probably
"""
# Unwrap the exception to get the original cause
exception = event.exception.__cause__ or event.exception
if isinstance(exception, lb.CommandInvocationError):
await event.context.respond(
f"Something went wrong during invocation of command `{event.context.command.name}`."
)
raise event.exception
if isinstance(exception, lb.NotOwner):
await event.context.event.message.add_reaction("❌")
elif isinstance(exception, lb.CommandIsOnCooldown):
await event.context.respond(
f"The command is on cooldown, you can use it after {int(exception.retry_after)}s",
delete_after=min(15, int(exception.retry_after)),
)
elif isinstance(exception, lb.MissingRequiredPermission):
await event.context.respond(
"You do not have the necessary permissions to use the command",
flags=hk.MessageFlag.EPHEMERAL,
)
elif isinstance(exception, lb.BotMissingRequiredPermission):
await event.context.respond("I don't have the permissions to do this 😔")
elif isinstance(exception, lb.NotEnoughArguments):
try:
ctx = event.context
command = ctx.command
if command.hidden:
return
await ctx.respond("Missing arguments, initializing command help...")
await asyncio.sleep(0.3)
helper = BotHelpCommand(ctx.bot)
await helper.send_command_help(ctx=ctx, command=command)
except Exception as e:
await event.context.respond(f"Stop, {e}")
elif isinstance(exception, lb.OnlyInGuild):
await event.context.respond("This command can't be invoked in DMs")
elif isinstance(exception, lb.ConverterFailure):
await event.context.respond(f"The argument `{exception.raw_value}` is invalid")
elif isinstance(exception, lb.MaxConcurrencyLimitReached):
await event.context.respond(
"Too many people are using the command, please try again later"
)
elif isinstance(exception, lb.CommandNotFound):
pass
# To move the fuzzy matching here
if __name__ == "__main__":
if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
import uvloop
uvloop.install()
bot.run(
status=hk.Status.IDLE,
activity=hk.Activity(name="『Idol』 | -help", type=hk.ActivityType.LISTENING),
)