-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
78 lines (60 loc) · 2.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from aiogram import exceptions as aiogram_exc
from NekoGram import Menu
import asyncio
from const import SEARCH_LIST, ADMIN_IDS, RUN_INTERVAL, NEKO, STORAGE
from loggers import main_logger
import scrappers
import menus
async def send_data(data: Menu, user: int):
while True:
try:
await NEKO.bot.send_message(
chat_id=user,
text=data.text,
reply_markup=data.markup,
parse_mode=data.parse_mode
)
break
except aiogram_exc.RetryAfter as e:
await asyncio.sleep(e.timeout)
except aiogram_exc.TelegramAPIError:
break
await asyncio.sleep(.2)
async def notify_error(scrapper: str):
main_logger.error(f'Error in {scrapper}')
for user in ADMIN_IDS:
data = await NEKO.build_menu(name='notification_failed', obj=None, user_id=user, auto_build=False)
await data.build(text_format={'scrapper': scrapper})
await send_data(data=data, user=user)
async def notify_found(item_name: str, store_name: str, url: str):
for user in ADMIN_IDS:
data = await NEKO.build_menu(name='notification_found', obj=None, user_id=user, auto_build=False)
await data.build(text_format={'item': item_name, 'store': store_name}, markup_format={'url': url})
await send_data(data=data, user=user)
async def scrape_urls():
while True:
for item in SEARCH_LIST:
main_logger.info(f'Looking up {item}')
for scrapper in [s for s in dir(scrappers) if s[0].isupper()]:
scrapper_class = getattr(scrappers, scrapper)
r = await getattr(scrapper_class, 'search')(item)
main_logger.info(f'Results: {r}')
if r is None:
await notify_error(scrapper=scrapper)
for result in r:
if not await STORAGE.check('SELECT url FROM search_cache WHERE url = ?', result):
await STORAGE.apply('INSERT INTO search_cache (url) VALUES (?)', result)
await notify_found(item_name=item, store_name=getattr(scrapper_class, 'NAME'), url=result)
await asyncio.sleep(RUN_INTERVAL)
async def startup(_):
NEKO.attach_router(menus.util.formatters.ROUTER)
await NEKO.storage.acquire_pool()
# Import database structure
with open('db.sql', 'r') as f:
for statement in f.read().split('--'):
await NEKO.storage.apply(statement)
asyncio.get_event_loop().create_task(scrape_urls())
async def shutdown(_):
await NEKO.storage.close_pool()
if __name__ == '__main__':
NEKO.start_polling(on_startup=startup, on_shutdown=shutdown)