From 4abd850a84180d62c609b9b7355db800134c0222 Mon Sep 17 00:00:00 2001 From: Valerii Date: Wed, 10 Jun 2020 17:35:52 +0300 Subject: [PATCH] Updated telethon version and the application was rewritten to support asyncio --- .env.example | 11 ++++ monitor.py | 168 ----------------------------------------------- requirements.txt | 5 +- src/helpers.py | 138 ++++++++++++++++++++++++++++++++++++++ src/monitor.py | 50 ++++++++++++++ 5 files changed, 202 insertions(+), 170 deletions(-) delete mode 100644 monitor.py create mode 100644 src/helpers.py create mode 100644 src/monitor.py diff --git a/.env.example b/.env.example index 8b69fca..d30cdff 100644 --- a/.env.example +++ b/.env.example @@ -5,3 +5,14 @@ TELEGRAM_API_HASH = 0123456789abcdef0123456789abcdef # Logging level, available values: https://docs.python.org/3/library/logging.html#levels LOGGING_LEVEL = CRITICAL + +# Correct values: +# '' – You will not be notified about deleted ongoing messages +# '1' – You will be notified about deleted ongoing messages +# +# Enabled option is useful, when your companion deletes the bunch of his and your messages +NOTIFY_ONGOING_MESSAGES='1' + +# How many days messages will be stored in the SQLite database +# Warning: Database is not constrained by memory it'll occupy, you need to monitor your free disk space manually +MESSAGES_TTL_DAYS = 14 \ No newline at end of file diff --git a/monitor.py b/monitor.py deleted file mode 100644 index 0223bba..0000000 --- a/monitor.py +++ /dev/null @@ -1,168 +0,0 @@ -import getpass -import logging -import os -import pickle -import signal -import sqlite3 -import sys -import time - -from datetime import datetime, timedelta -from pathlib import Path -from typing import List -from dotenv import load_dotenv - -from telethon import events, TelegramClient -from telethon.errors import SessionPasswordNeededError -from telethon.tl.functions.users import GetFullUserRequest -from telethon.tl.types import Message - -MINUTES_PER_HOUR = 60 -SECONDS_PER_MINUTE = 60 - -# Loading environment variables -env_path = Path(".") / ".env" - -if os.path.isfile(env_path): - load_dotenv(dotenv_path=env_path) - -# Configure logging level -logging.basicConfig(level=os.getenv("LOGGING_LEVEL", logging.INFO)) - -# Cleaning database interval -# 2 days by documentation and + 6 hours by practice -MESSAGE_SAVING_PERIOD_SECONDS = 54 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE - -if os.getenv("TELEGRAM_API_ID") is None or os.getenv("TELEGRAM_API_HASH") is None: - print('Please, read `README.md` and create `.env` file with telegram API credentials') - exit(1) - -# Telegram API -TELEGRAM_API_ID = os.getenv("TELEGRAM_API_ID") -TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH") - -client = TelegramClient("db/user", TELEGRAM_API_ID, TELEGRAM_API_HASH) -assert client.connect() - -# Auth -if len(sys.argv) > 1 and sys.argv[1] == 'auth': - if client.is_user_authorized(): - confirmation = input('Do you really want to delete current session and authorize new? [y/n]: ') - if confirmation.lower() != 'y': - exit(0) - - phone_number = input("Enter phone number: ") - client.send_code_request(phone_number) - - try: - client.sign_in(phone_number, input("Enter code: ")) - except SessionPasswordNeededError: - client.sign_in(password=getpass.getpass()) - - if client.is_user_authorized(): - print('Something went wrong, please, retry') - exit(1) - - exit(0) - -# Daemon -if not client.is_user_authorized(): - print('Please, execute `auth` command before starting the daemon (see README.md file)') - exit(1) - -# Database connection, table and indices creation -conn = sqlite3.connect("db/messages.db", check_same_thread=False) -c = conn.cursor() - -c.execute("""CREATE TABLE IF NOT EXISTS messages - (message_id INTEGER PRIMARY KEY, message BLOB, created DATETIME)""") - -c.execute("CREATE INDEX IF NOT EXISTS messages_created_index ON messages (created DESC)") - -conn.commit() - -# Configure handlers -client.updates.workers = 1 - - -@client.on(events.NewMessage(incoming=True)) -def handler(event: events.NewMessage.Event): - c.execute("INSERT INTO messages (message_id, message, created) VALUES (?, ?, ?)", - (event.message.id, sqlite3.Binary(pickle.dumps(event.message)), str(datetime.now()))) - conn.commit() - - -@client.on(events.MessageDeleted()) -def handler(event: events.MessageDeleted.Event): - db_result = c.execute("SELECT message_id, message FROM messages WHERE message_id IN ({0})".format( - ",".join(str(e) for e in event.deleted_ids))).fetchall() - - messages: List[Message] = [pickle.loads(i[1]) for i in db_result] - - log_deleted_usernames = [] - - for message in messages: - user_request = client(GetFullUserRequest(message.from_id)) - user = user_request.user - - if user.first_name or user.last_name: - mention_username = \ - (user.first_name + " " if user.first_name else "") + \ - (user.last_name if user.last_name else "") - elif user.username: - mention_username = user.username - elif user.phone: - mention_username = user.phone - else: - mention_username = user.id - - log_deleted_usernames.append(mention_username + "(" + str(user.id) + ")") - - text = "** Deleted message from: **[{username}](tg://user?id={id})\n".format( - username=mention_username, id=user.id) - - if message.message: - text += "** Message: **" + message.message - - client.send_message( - "me", - text, - file=message.media - ) - - logging.info( - "Got {deleted_messages_count} deleted messages. Has in DB {db_messages_count}. Users: {users}".format( - deleted_messages_count=str(len(event.deleted_ids)), - db_messages_count=str(len(messages)), - users=", ".join(log_deleted_usernames)) - ) - - -client.start() - - -class GracefulKiller: - kill_now = False - - def __init__(self): - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - - def exit_gracefully(self, signum, frame): - self.kill_now = True - - -if __name__ == "__main__": - killer = GracefulKiller() - while True: - time.sleep(1) - - # Every minute clean DB - if int(datetime.now().timestamp()) % SECONDS_PER_MINUTE == 0: - delete_from_time = str(datetime.now() - timedelta(seconds=MESSAGE_SAVING_PERIOD_SECONDS)) - c.execute("DELETE FROM messages WHERE created < ?", (delete_from_time,)) - logging.info( - "Deleted {count} messages older than {time} from DB".format(count=c.rowcount, time=delete_from_time)) - - if killer.kill_now: - break diff --git a/requirements.txt b/requirements.txt index c807e2a..73cccd5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ -Telethon==0.19.0.1 -python-dotenv==v0.8.2 \ No newline at end of file +Telethon==1.14.0 +python-dotenv==v0.13.0 +pylint==2.5.3 \ No newline at end of file diff --git a/src/helpers.py b/src/helpers.py new file mode 100644 index 0000000..28fd664 --- /dev/null +++ b/src/helpers.py @@ -0,0 +1,138 @@ +import logging +import os +import pickle +import sqlite3 +import asyncio +from datetime import datetime, timedelta +from pathlib import Path +from typing import List + +from dotenv import load_dotenv +from telethon.events import NewMessage, MessageDeleted +from telethon import TelegramClient +from telethon.hints import Entity +from telethon.tl.types import Message + +CLEAN_OLD_MESSAGES_EVERY_SECONDS = 60 # 1 minute + + +def load_env(dot_env_folder): + env_path = Path(dot_env_folder) / ".env" + + if os.path.isfile(env_path): + load_dotenv(dotenv_path=env_path) + logging.debug('`.env` file is loaded') + else: + logging.debug('`.env` file is absent, using system environment variables') + + +def initialize_messages_db(): + connection = sqlite3.connect("db/messages_v2.db") + cursor = connection.cursor() + + cursor.execute("""CREATE TABLE IF NOT EXISTS messages + (message_id INTEGER PRIMARY KEY, message_from_id INTEGER, message TEXT, media BLOB, created DATETIME)""") + + cursor.execute("CREATE INDEX IF NOT EXISTS messages_created_index ON messages (created DESC)") + + connection.commit() + + return cursor, connection + + +sqlite_cursor, sqlite_connection = initialize_messages_db() + + +def get_on_new_message(client: TelegramClient): + async def on_new_message(event: NewMessage.Event): + sqlite_cursor.execute( + "INSERT INTO messages (message_id, message_from_id, message, media, created) VALUES (?, ?, ?, ?, ?)", + ( + event.message.id, + event.message.from_id, + event.message.message, + sqlite3.Binary(pickle.dumps(event.message.media)), + str(datetime.now()))) + sqlite_connection.commit() + + return on_new_message + + +def load_messages_from_event(event: MessageDeleted.Event) -> List[Message]: + sql_message_ids = ",".join(str(deleted_id) for deleted_id in event.deleted_ids) + + db_results = sqlite_cursor.execute( + f"SELECT message_id, message_from_id, message, media FROM messages WHERE message_id IN ({sql_message_ids})" + ).fetchall() + + messages = [] + for db_result in db_results: + messages.append({ + "id": db_result[0], + "message_from_id": db_result[1], + "message": db_result[2], + "media": pickle.loads(db_result[3]), + }) + + return messages + + +async def get_mention_username(user: Entity): + if user.first_name or user.last_name: + mention_username = \ + (user.first_name + " " if user.first_name else "") + \ + (user.last_name if user.last_name else "") + elif user.username: + mention_username = user.username + elif user.phone: + mention_username = user.phone + else: + mention_username = user.id + + return mention_username + + +def get_on_message_deleted(client: TelegramClient): + async def on_message_deleted(event: MessageDeleted.Event): + messages = load_messages_from_event(event) + + log_deleted_usernames = [] + + for message in messages: + user = await client.get_entity(message['message_from_id']) + mention_username = await get_mention_username(user) + + log_deleted_usernames.append(mention_username + " (" + str(user.id) + ")") + text = "🔥🔥🔥🤫🤐🤭🙊🔥🔥🔥\n**Deleted message from: **[{username}](tg://user?id={id})\n".format( + username=mention_username, id=user.id) + + if message['message']: + text += "**Message:** " + message['message'] + + await client.send_message( + "me", + text, + file=message['media'] + ) + + logging.info( + "Got {deleted_messages_count} deleted messages. Has in DB {db_messages_count}. Users: {users}".format( + deleted_messages_count=str(len(event.deleted_ids)), + db_messages_count=str(len(messages)), + users=", ".join(log_deleted_usernames)) + ) + + return on_message_deleted + + +async def cycled_clean_old_messages(): + messages_ttl_days = int(os.getenv('MESSAGES_TTL_DAYS', 14)) + + while True: + delete_from_time = str(datetime.now() - timedelta(days=messages_ttl_days)) + sqlite_cursor.execute("DELETE FROM messages WHERE created < ?", (delete_from_time,)) + logging.info( + f"Deleted {sqlite_cursor.rowcount} messages older than {delete_from_time} from DB" + ) + + await asyncio.sleep(CLEAN_OLD_MESSAGES_EVERY_SECONDS) diff --git a/src/monitor.py b/src/monitor.py new file mode 100644 index 0000000..db04b6e --- /dev/null +++ b/src/monitor.py @@ -0,0 +1,50 @@ +import logging +import os +import pathlib +import sys + +from telethon import TelegramClient, events +from helpers import load_env, get_on_new_message, get_on_message_deleted, cycled_clean_old_messages + +BASE_DIR = (pathlib.Path(__file__).parent / '..').absolute() + +# Configure logging level, based on the system environment variables +logging.basicConfig(level=os.getenv("LOGGING_LEVEL", logging.INFO)) + +# Loading environment variables +load_env(BASE_DIR) + +# Configure logging level, based on the `.env` file and on the system environment variables +logging.basicConfig(level=os.getenv("LOGGING_LEVEL", logging.INFO)) + +if os.getenv("TELEGRAM_API_ID") is None or os.getenv("TELEGRAM_API_HASH") is None: + logging.critical('Please, read `README.md` and set-up environment variables (you can create a copy of ' + '`.env.example` file with new name `.env` and fill correct values') + exit(1) + + +async def main(): + if len(sys.argv) > 1 and sys.argv[1] == 'auth': + # TODO: perform logout in the code, in case the user use `auth` argument + logging.critical('You successfully authorized, please, run the same command without `auth` argument to ' + 'start monitoring your messages. If you want to log-out, remove the file `db/user.session`, ' + 'to log-out and re-execute this command') + exit(0) + + if not await client.is_user_authorized(): + logger.critical('Please, execute `auth` command before starting the daemon (see `README.md` file)') + exit(1) + + if bool(os.getenv('NOTIFY_ONGOING_MESSAGES', '1')): + new_message_event = events.NewMessage() + else: + new_message_event = events.NewMessage(incoming=True, outgoing=False) + + client.add_event_handler(get_on_new_message(client), new_message_event) + client.add_event_handler(get_on_message_deleted(client), events.MessageDeleted()) + + await cycled_clean_old_messages() + + +with TelegramClient('db/user', os.getenv("TELEGRAM_API_ID"), os.getenv("TELEGRAM_API_HASH")) as client: + client.loop.run_until_complete(main())