-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
twistmoe.py
162 lines (126 loc) · 4.42 KB
/
twistmoe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import requests
import hashlib
import base64
import time
import os
import args
from tqdm import tqdm
from pathlib import Path
from Crypto.Cipher import AES
from Crypto.Util import Padding
AES_KEY = b"267041df55ca2b36f2e322d05ee2c9cf"
MAX_RETRY_COUNT = args.parsed_args.scrape_max_retry or 10
def download_episodes(slug):
episodes = get_episodes(slug)
episodes_list = []
for episode in episodes:
source = episode["source"]
file_name = Path(source).name
video_path = f"./episodes/{file_name}"
headers = {
"referer": "https://twist.moe/",
"x-access-token": "0df14814b9e590a1f26d3071a4ed7974"
}
video_headers = requests.head(source, headers=headers)
if video_headers.status_code != 200:
if args.parsed_args.verbose:
print(f"[twistmoe.py] [WARNING] Episode {source} not reachable! (status code {video_headers.status_code})")
continue
content_length = int(video_headers.headers["content-length"] or 0)
video_file = open(video_path, "wb")
downloaded_bytes = 0
retries = 0
if args.parsed_args.verbose:
progress_bar = tqdm(total=content_length, unit='iB', unit_scale=True)
progress_bar.set_description(f"[twistmoe.py] [INFO] Downloading {file_name}")
while downloaded_bytes < content_length:
try:
response = requests.get(source, timeout=5, stream=True, headers={"Range": "bytes=%d-" % downloaded_bytes, **headers})
for chunk in response.iter_content(chunk_size=1024*1024):
chunk_len = len(chunk)
downloaded_bytes += chunk_len
if args.parsed_args.verbose:
progress_bar.update(chunk_len)
video_file.write(chunk)
# debug
#percent = int(downloaded_bytes * 100. // content_length)
#print(f"Downloaded {downloaded_bytes}/{content_length} ({percent}%)")
except requests.RequestException:
# If killed, just wait a second or skip
retries += 1
if retries >= MAX_RETRY_COUNT:
if args.parsed_args.verbose:
print(f"[twistmoe.py] [WARNING] Max retries hit. Skipping episode")
progress_bar.close()
break
if args.parsed_args.verbose:
print(f"[twistmoe.py] [WARNING] Error while downloading episode. Continuing in one second ({retries}/{MAX_RETRY_COUNT} retries)")
time.sleep(1)
if args.parsed_args.verbose:
progress_bar.close()
video_file.close()
if retries >= MAX_RETRY_COUNT:
os.remove(video_path)
continue
episodes_list.append({
"episode_number": episode["number"],
"video_path": video_path
})
return episodes_list
def get_episodes(slug):
headers = {
"referer": "https://twist.moe/",
"x-access-token": "0df14814b9e590a1f26d3071a4ed7974"
}
response = requests.get(f"https://twist.moe/api/anime/{slug}/sources", headers=headers)
if response.status_code != 200:
if args.parsed_args.verbose:
print(f"[twistmoe.py] [WARNING] No sources found for {slug}")
return []
encrypted_episodes = response.json()
episodes = []
for episode in encrypted_episodes:
episodes.append({
"source": decrypt_source(episode["source"]),
"number": episode["number"]
})
return episodes
def decrypt_source(encrypted_source):
decoded_source = base64.b64decode(encrypted_source.encode())
decoded_source = decoded_source[8:]
salt = decoded_source[:8]
decoded_source = decoded_source[8:]
key_derivation = evpKDF(AES_KEY, salt, key_size=12)
crypto_data = key_derivation["key"]
key = crypto_data[:len(crypto_data)-16]
iv = crypto_data[len(crypto_data)-16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(decoded_source)
unpadded = Padding.unpad(decrypted, 16)
return f"https://cdn.twist.moe{unpadded.decode()}"
def evpKDF(passwd, salt, key_size=8, iv_size=4, iterations=1, hash_algorithm="md5"):
"""
https://github.com/Shani-08/ShaniXBMCWork2/blob/master/plugin.video.serialzone/jscrypto.py
"""
target_key_size = key_size + iv_size
derived_bytes = b""
number_of_derived_words = 0
block = None
hasher = hashlib.new(hash_algorithm)
while number_of_derived_words < target_key_size:
if block is not None:
hasher.update(block)
hasher.update(passwd)
hasher.update(salt)
block = hasher.digest()
hasher = hashlib.new(hash_algorithm)
for i in range(1, iterations):
hasher.update(block)
block = hasher.digest()
hasher = hashlib.new(hash_algorithm)
derived_bytes += block[0: min(len(block), (target_key_size - number_of_derived_words) * 4)]
number_of_derived_words += len(block)/4
return {
"key": derived_bytes[0: key_size * 4],
"iv": derived_bytes[key_size * 4:]
}