-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpost_scrapper.py
117 lines (99 loc) · 4.08 KB
/
post_scrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Module to scrap new posts from TG channel."""
from bs4 import BeautifulSoup as BS
import html2text
import logger
import requests
import re
MAX_TRIES = 15 # how many posts to check to find the last one (needed because of posts deletion)
def _test():
"""Test function for this module."""
logger.init()
import database_manager
db = database_manager.database()
data = db.channels
x = data[0]
posts, last_post_id, _ = scrap_channel(x.name, x.last_post_id)
for post in posts:
# print(post)
pass
print(last_post_id)
def scrap_channel(channel_name: str, last_post_id: int) -> tuple[list, int]:
"""Look for latest posts in a specified channel.
Returns found posts, last found post ID and BS object of base post.."""
found_posts = []
current_try = 0
last_found_post_id = last_post_id
current_post_id = last_found_post_id + 1
deleted_posts = 0
logger.log.write('SCRAPPER - PARSING CHANNEL...')
base_post_response, base_post_ok = _get_response(f'https://t.me/{channel_name}')
if not base_post_ok: # connection error
logger.log.warning('SCRAPPER - UNABLE TO GET BASE POST.')
return [], last_post_id
while True:
url = f'https://t.me/{channel_name}/{current_post_id}?embed=1&mode=tme'
r, is_ok = _get_response(url)
if is_ok:
post_html = BS(r.text, 'html.parser')
is_post_found = _validate_post(post_html)
if is_post_found:
last_found_post_id = current_post_id
current_try = 0
found_posts.append(post_html)
else:
deleted_posts += 1
current_try += 1
if current_try > MAX_TRIES:
break
current_post_id += 1
# some logging stuff:
all_posts_amount = current_post_id - MAX_TRIES - last_post_id # posts after last existing post are not counted
deleted_posts -= MAX_TRIES
if deleted_posts == all_posts_amount == 1: # somehow it works now
deleted_posts = all_posts_amount = 0
logger.log.write(f'SCRAPPER - {all_posts_amount} POSTS FOUND: {len(found_posts)} EXIST, {deleted_posts} DELETED.')
logger.log.write(f'SCRAPPER - LAST FOUND POST ID: {last_found_post_id}.')
# note that post with last_post_id was taken into account in previous run
return found_posts, last_found_post_id
def scrap_private_channel(last_post_id: int) -> tuple[list, int]:
# TODO: finish this
pass
def _get_response(url: str) -> tuple[requests.Response | None, bool]:
"""Handle request errors. Returns Response object (or None) and OK flag."""
try:
r = requests.get(url)
except requests.exceptions.Timeout:
logger.log.warning('SCRAPPER - TIMEOUT ERROR.')
except requests.exceptions.ConnectionError:
logger.log.warning('SCRAPPER - CHECK CONNECTION.')
except Exception as e:
logger.log.error(e)
else:
is_ok = r.status_code == 200
if is_ok:
return r, is_ok
# some error logging:
bad_codes = {
"404": "SCRAPPER - PAGE NOT FOUND.",
"500": "SCRAPPER - INTERNAL SERVER ERROR.",
"503": "SCRAPPER - SERVICE UNAVAILABLE.",
"other": f"SCRAPPER - UNKNOWN STATUS CODE: {r.status_code}."
}
error = bad_codes.get(r.status_code, "other")
logger.log.warning(error)
return None, False
def _validate_post(post_html: BS) -> bool:
"""Make sure that post is 'online' (exists and not deleted).
Returns True if post is found."""
def _html_to_text(html):
"""Returns text of the post."""
h = html2text.HTML2Text()
h.decode_errors = 'ignore' # ignore Unicode decoding errors
text = h.handle(html)
text = re.sub(r'\*+', '', text) # remove asterisks
text = re.sub(r'^[ \t]*[\\`]', '', text, flags=re.MULTILINE) # remove leading \ or `
return text
content = _html_to_text(str(post_html.find('div', {'class': 'tgme_widget_message_text js-message_text', 'dir': 'auto'})))
return content.strip() != 'None'
if __name__ == '__main__':
_test()