-
Notifications
You must be signed in to change notification settings - Fork 0
/
discord_bot.py
334 lines (275 loc) · 12.1 KB
/
discord_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import os
from discord.ext import commands, tasks
from email_crawler import fetch_articles_from_days
from discord import Intents
from dotenv import load_dotenv
from config_manager import get_cron_frequency, get_min_relevancy_score, get_search_criteria
import logging
import functools
import discord
from discord import app_commands
import sys
import signal
import asyncio
from flask import Flask, jsonify
import threading
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler("newsletter_bot.log"),
logging.StreamHandler()
]
)
load_dotenv() # Load environment variables from .env file
# Load environment variables
TOKEN = os.getenv('DISCORD_TOKEN')
# Set up the bot
intents = Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='?', intents=intents)
# Set up Flask app
app = Flask(__name__)
@app.route('/healthz')
def health_check():
return jsonify({"message": "ok"}), 200
def run_flask():
app.run(host='0.0.0.0', port=5000)
# Start Flask in a separate thread
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
def command_error_handler(func):
@functools.wraps(func)
async def wrapper(interaction: discord.Interaction, *args, **kwargs):
try:
return await func(interaction, *args, **kwargs)
except Exception as e:
logging.error(f"Error in command {func.__name__}: {str(e)}", exc_info=True)
await interaction.followup.send("Something went wrong. Please try again later.")
return wrapper
# Apply the wrapper to all commands
for command in bot.commands:
command.callback = command_error_handler(command.callback)
@bot.event
async def on_ready():
print(f'Logged in as {bot.user}')
class NewsletterBot(commands.Bot):
def __init__(self):
intents = discord.Intents.default()
intents.message_content = True
super().__init__(command_prefix='?', intents=intents)
async def setup_hook(self):
await self.tree.sync()
bot = NewsletterBot()
class ArticlePaginator(discord.ui.View):
def __init__(self, articles, days):
super().__init__(timeout=300)
self.articles = articles
self.days = days
self.current_page = 0
self.per_page = 10
self.max_pages = (len(self.articles) - 1) // self.per_page + 1
# Hide buttons if there's only one page
if self.max_pages <= 1:
self.previous_button.style = discord.ButtonStyle.gray
self.previous_button.disabled = True
self.previous_button.label = "\u200b" # Invisible character
self.next_button.style = discord.ButtonStyle.gray
self.next_button.disabled = True
self.next_button.label = "\u200b" # Invisible character
self.clear_items() # Remove all items from the view
@discord.ui.button(label="Previous", style=discord.ButtonStyle.gray, disabled=True)
async def previous_button(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = max(0, self.current_page - 1)
await self.update_message(interaction)
@discord.ui.button(label="Next", style=discord.ButtonStyle.gray)
async def next_button(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = min(self.max_pages - 1, self.current_page + 1)
await self.update_message(interaction)
async def update_message(self, interaction: discord.Interaction):
embed = self.create_embed()
self.update_button_states()
await interaction.response.edit_message(embed=embed, view=self)
def create_embed(self):
start = self.current_page * self.per_page
end = start + self.per_page
current_articles = self.articles[start:end]
embed = discord.Embed(title=f"Articles from the last {self.days} days", color=0xFFA500)
embed.set_footer(text=f"Page {self.current_page + 1}/{self.max_pages}")
for i, article in enumerate(current_articles, start=start+1):
embed.add_field(
name=f"{i}. {article.title}",
value=f"{article.url}\n{article.description[:160] + '...' if len(article.description) > 160 else article.description}",
inline=False
)
return embed
def update_button_states(self):
if self.max_pages <= 1:
return # Don't update states if there's only one page
self.previous_button.disabled = (self.current_page == 0)
self.next_button.disabled = (self.current_page == self.max_pages - 1)
def group_criteria(criteria):
grouped = get_search_criteria()
return {group: [c.lower() for c in criteria if c.lower() in group.lower().split(', ')] for group in grouped}
@bot.tree.command(name="fr", description="Fetch recent articles")
@app_commands.describe(
days="Number of days to fetch articles from (default: 7)",
all="Fetch all articles, including those below the relevancy threshold (0 or 1, default: 0)",
criteria="Filter articles by criteria (default: None)"
)
@command_error_handler
async def fr(interaction: discord.Interaction, days: int = 7, all: int = 0, criteria: str = None):
await interaction.response.defer()
if days < 1:
await interaction.followup.send("Please provide a valid number of days (greater than 0).")
return
if all not in [0, 1]:
await interaction.followup.send("The 'all' parameter must be either 0 or 1.")
return
valid_criteria = [c.lower() for group in get_search_criteria() for c in group.split(', ')]
if criteria:
criteria = criteria.lower()
if criteria not in valid_criteria:
grouped_criteria = group_criteria(valid_criteria)
criteria_list = "\n".join([f"{', '.join(items)}" for group, items in grouped_criteria.items() if items])
await interaction.followup.send(f"Invalid criteria. Please choose from:\n{criteria_list}")
return
articles = fetch_articles_from_days(days, criteria)
min_relevancy = get_min_relevancy_score()
if all == 0:
articles = [
a for a in articles
if any(criterion['score'] >= min_relevancy for criterion in a.criteria)
]
if articles:
# Sort by relevancy score. If criteria is provided, sort by THAT criteria score only
if criteria:
articles.sort(key=lambda x: next((criterion['score'] for criterion in x.criteria if criterion['name'].lower() == criteria), 0), reverse=True)
else:
# Safely get the highest score from criteria, defaulting to 0 if criteria is empty
articles.sort(key=lambda x: x.criteria[0]['score'] if x.criteria else 0, reverse=True)
paginator = ArticlePaginator(articles, days)
embed = paginator.create_embed()
await interaction.followup.send(embed=embed, view=paginator)
else:
await interaction.followup.send("No articles found for the specified period.")
@bot.tree.command(name="memo-drafts", description="Generate memo drafts for recent articles")
@app_commands.describe(
days="Number of days to fetch articles from (default: 7)",
criteria="Generate memo for a specific criteria (default: None)"
)
@command_error_handler
async def memo_drafts(interaction: discord.Interaction, days: int = 7, criteria: str = None):
await interaction.response.defer()
if days < 1:
await interaction.followup.send("Please provide a valid number of days (greater than 0).")
return
valid_criteria = [c.lower() for group in get_search_criteria() for c in group.split(', ')]
if criteria:
criteria = criteria.lower()
if criteria not in valid_criteria:
grouped_criteria = group_criteria(valid_criteria)
criteria_list = "\n".join([f"{', '.join(items)}" for group, items in grouped_criteria.items() if items])
await interaction.followup.send(f"Invalid criteria. Please choose from:\n{criteria_list}")
return
articles = fetch_articles_from_days(days, criteria)
min_relevancy = get_min_relevancy_score()
# Filter articles by minimum relevancy score
articles = [
a for a in articles
if any(criterion['score'] >= min_relevancy for criterion in a.criteria)
]
if not articles:
await interaction.followup.send("No relevant articles found for the specified period.")
return
# Generate memo drafts
used_articles = set()
if criteria:
memo_draft, used_articles = generate_memo_draft(articles, criteria, used_articles=used_articles)
memo_drafts = [memo_draft]
else:
top_criteria = valid_criteria[:3]
memo_drafts = []
for c in top_criteria:
memo_draft, used_articles = generate_memo_draft(articles, c, used_articles=used_articles)
memo_drafts.append(memo_draft)
other_memo_draft, used_articles = generate_memo_draft(articles, "Other", valid_criteria[3:], used_articles=used_articles)
memo_drafts.append(other_memo_draft)
# Create and send embeds
embeds = [create_memo_embed(memo) for memo in memo_drafts if memo['articles']]
if embeds:
await interaction.followup.send(embeds=embeds[:10]) # Discord allows max 10 embeds per message
else:
await interaction.followup.send("No memo drafts could be generated for the specified criteria and period.")
def generate_memo_draft(articles, criteria, other_criteria=None, used_articles=None):
if used_articles is None:
used_articles = set()
if other_criteria:
filtered_articles = [
a for a in articles
if any(criterion['name'].lower() in [c.lower() for c in other_criteria] for criterion in a.criteria)
and a.url not in used_articles
]
else:
filtered_articles = [
a for a in articles
if any(criterion['name'].lower() == criteria.lower() for criterion in a.criteria)
and a.url not in used_articles
]
# Sort articles by relevancy score for the specific criteria
filtered_articles.sort(
key=lambda x: next((criterion['score'] for criterion in x.criteria if criterion['name'].lower() == criteria.lower()), 0),
reverse=True
)
selected_articles = filtered_articles[:8] # Top 8 articles
used_articles.update(article.url for article in selected_articles)
memo = {
'criteria': criteria,
'articles': [
{
'title': article.title,
'description': article.description,
'url': article.url
}
for article in selected_articles
]
}
return memo, used_articles
def create_memo_embed(memo):
embed = discord.Embed(title=f"Memo Draft: {memo['criteria']}", color=0xFFFF00)
for i, article in enumerate(memo['articles'][:3], start=1):
embed.add_field(
name=article['title'],
value=f"{article['description']}\n[Read more]({article['url']})",
inline=False
)
quick_links = "\n".join([f"- [{a['title']}]({a['url']})" for a in memo['articles'][3:8]])
if quick_links:
embed.add_field(name="Quick links", value=quick_links, inline=False)
return embed
async def exit_handler(signum, frame):
print("Received signal to exit. Shutting down...")
await bot.close()
for task in asyncio.all_tasks(loop=bot.loop):
if task is not asyncio.current_task():
task.cancel()
await asyncio.gather(*asyncio.all_tasks(loop=bot.loop), return_exceptions=True)
bot.loop.stop()
def signal_handler(signum, frame):
bot.loop.create_task(exit_handler(signum, frame))
# Register the exit handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def run_bot():
try:
bot.run(TOKEN)
except Exception as e:
print(f"Unhandled exception: {e}")
sys.exit(1)
finally:
if not bot.is_closed():
bot.loop.run_until_complete(bot.close())
if __name__ == "__main__":
run_bot()