-
Notifications
You must be signed in to change notification settings - Fork 33
/
neteasedown.py
322 lines (288 loc) · 12.5 KB
/
neteasedown.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# -*- coding: utf-8 -*-
import base64
from re import findall
from asyncio import sleep
from os import sep, remove, listdir
from os.path import isfile, exists, getsize
from time import strftime, localtime
from pagermaid import version
from pagermaid.listener import listener
from pagermaid.utils import alias_command, execute, pip_install
pip_install("pyncm")
pip_install("mutagen")
from mutagen.mp3 import EasyMP3
from mutagen.id3 import ID3, APIC
from mutagen.flac import FLAC, Picture
from mutagen.oggvorbis import OggVorbis
from pyncm import GetCurrentSession, apis, DumpSessionAsString, SetCurrentSession, LoadSessionFromString
from pyncm.utils.helper import TrackHelper
from pyncm.apis import LoginFailedException
from pyncm.apis.login import LoginLogout
from telethon.tl.types import DocumentAttributeAudio
def download_by_url(url, dest):
# Downloads generic content
response = GetCurrentSession().get(url, stream=True)
with open(dest, 'wb') as f:
for chunk in response.iter_content(1024 * 2 ** 10):
f.write(chunk) # write every 1MB read
return dest
def gen_author(song_info: dict) -> str:
data = []
for i in song_info["songs"][0]["ar"]:
data.append(i["name"])
return " ".join(data)
def get_duration(song_info: dict, track_info: dict) -> int:
if track_info["data"][0]["freeTrialInfo"]:
return track_info["data"][0]["freeTrialInfo"]["end"] - track_info["data"][0]["freeTrialInfo"]["start"]
else:
return int(song_info["songs"][0]["dt"] / 1000)
def get_file_size(path: str) -> float:
return round(getsize(path) / float(1024 * 1024), 2)
def if_outgoing() -> bool:
if exists(f"data{sep}ned.out"):
return True
else:
return False
def get_music_id(url: str) -> int:
if ("music.163.com" in url) and ("playlist" not in url):
data = findall(r'\?id=([0-9]*)', url)
if len(data) >= 1:
return int(data[0])
else:
return 0
else:
return 0
def tag_audio(track, file: str, cover_img: str = ''):
def write_keys(song):
# Write trackdatas
song['title'] = track.TrackName
song['artist'] = track.Artists
song['album'] = track.AlbumName
song['tracknumber'] = str(track.TrackNumber)
song['date'] = str(track.TrackPublishTime)
song.save()
def mp3():
song = EasyMP3(file)
write_keys(song)
if exists(cover_img):
song = ID3(file)
song.update_to_v23() # better compatibility over v2.4
song.add(APIC(encoding=3, mime='image/jpeg', type=3, desc='',
data=open(cover_img, 'rb').read()))
song.save(v2_version=3)
def flac():
song = FLAC(file)
write_keys(song)
if exists(cover_img):
pic = Picture()
pic.data = open(cover_img, 'rb').read()
pic.mime = 'image/jpeg'
song.add_picture(pic)
song.save()
def ogg():
song = OggVorbis(file)
write_keys(song)
if exists(cover_img):
pic = Picture()
pic.data = open(cover_img, 'rb').read()
pic.mime = 'image/jpeg'
song["metadata_block_picture"] = [base64.b64encode(pic.write()).decode('ascii')]
song.save()
format_ = file.split('.')[-1].upper()
for ext, method in [({'MP3'}, mp3), ({'FLAC'}, flac), ({'OGG', 'OGV'}, ogg)]:
if format_ in ext:
return method() or True
return False
async def netease_down(track_info: dict, song_info: dict, song) -> str:
if not isfile(f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}'):
# Downloding source audio
download_by_url(track_info["data"][0]["url"],
f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}')
# Downloading cover
if not isfile(f'data{sep}{song_info["songs"][0]["name"]}.jpg'):
download_by_url(song.AlbumCover,
f'data{sep}{song_info["songs"][0]["name"]}.jpg')
# 设置标签
tag_audio(song, f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}',
f'data{sep}{song_info["songs"][0]["name"]}.jpg')
# 返回
return f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}'
ned_help_msg = f"""
网易云搜/点歌。
i.e.
`-{alias_command('ned')} 失眠飞行 兔籽鲸 / 雨客Yoker` # 通过歌曲名称+歌手(可选)点歌
`-{alias_command('ned')} see you again -f` # 通过 -f 参数点播 flac 最高音质
`-{alias_command('ned')} 1430702717` # 通过歌曲 ID 点歌
`-{alias_command('ned')} outgoing` # 开/关自动识别发送的音乐链接
`-{alias_command('ned')} login` # 显示登录信息
`-{alias_command('ned')} login 手机号码 密码` # 登录账号
`-{alias_command('ned')} logout` # 登出
`-{alias_command('ned')} clear` # 手动清除缓存
"""
@listener(is_plugin=True, outgoing=True, command=alias_command("ned"),
description=ned_help_msg,
parameters="{关键词/id}/{login <账号> <密码>}/{clear}")
async def ned(context):
if len(context.parameter) < 1:
# 使用方法
await context.edit(ned_help_msg)
return
# 加载登录信息
if isfile(f"data{sep}session.ncm"):
with open(f"data{sep}session.ncm") as f:
SetCurrentSession(LoadSessionFromString(f.read()))
# 海外用户
GetCurrentSession().headers['X-Real-IP'] = '118.88.88.88'
# 处理账号登录
if context.parameter[0] == "login":
# 显示登录信息
if len(context.parameter) == 1:
login_info = GetCurrentSession().login_info
if login_info["success"]:
# 获取VIP类型
if login_info['content']['account']['vipType'] != 0:
vip_type = "**VIP**"
else:
vip_type = "**普通**"
# 获取账号创建时间
time = strftime("%Y-%m-%d %H:%M:%S", localtime(login_info['content']['account']['createTime'] / 1000))
if context.is_group:
await context.edit(f"[ned] 已登录{vip_type}账号,账号创建时间:`{time}`")
else:
await context.edit(f"[ned] 已登录{vip_type}账号:`{login_info['content']['profile']['nickname']}`,"
f"账号创建时间:`{time}`")
else:
await context.edit(f"[ned] **未登录/登录失败**,额外信息:`{login_info['content']}`")
return
# 过滤空参数
if len(context.parameter) == 2:
# 登录命令格式错误
await context.edit(f"**使用方法:** `-{alias_command('ned')} <账号> <密码>`")
return
# 开始登录
try:
apis.login.LoginViaCellphone(context.parameter[1], context.parameter[2])
except LoginFailedException:
await context.edit("**登录失败**,请检查账号密码是否正确。")
return
# 获取登录信息
login_info = GetCurrentSession().login_info
# 获取VIP类型
if login_info['content']['account']['vipType'] != 0:
vip_type = "**VIP**"
else:
vip_type = "**普通**"
# 获取账号创建时间
time = strftime("%Y-%m-%d %H:%M:%S", localtime(login_info['content']['account']['createTime'] / 1000))
if context.is_group:
await context.edit(f"[ned] **登录成功**,已登录{vip_type}账号,账号创建时间:`{time}`")
else:
await context.edit(f"[ned] **登录成功**,已登录{vip_type}账号:`{login_info['content']['profile']['nickname']}`,"
f"账号创建时间:`{time}`")
# 保存登录信息
with open(f"data{sep}session.ncm", 'w+') as f:
f.write(DumpSessionAsString(GetCurrentSession()))
return
elif context.parameter[0] == "logout":
# 登出
LoginLogout()
if isfile(f"data{sep}session.ncm"):
remove(f"data{sep}session.ncm")
return await context.edit("[ned] 账号登出成功。")
elif context.parameter[0] == "clear":
# 清除歌曲缓存
for i in listdir("data"):
if i.find(".mp3") != -1 or i.find(".jpg") != -1 or i.find(".flac") != -1 or i.find(".ogg") != -1:
remove(f"data{sep}{i}")
await context.edit("[ned] **已清除缓存**")
return
elif context.parameter[0] == "outgoing":
# 开关自动识别音乐链接
if if_outgoing():
remove(f"data{sep}ned.out")
return await context.edit("[ned] 已关闭自动识别音乐链接")
else:
with open(f"data{sep}ned.out", 'w') as f:
f.write('1')
return await context.edit("[ned] 已开启自动识别音乐链接")
# 搜索歌曲
# 判断是否使用最高比特率解析
flac_mode = True if context.arguments.find("-f") != -1 else False
song_id = context.arguments.replace("-f", "").replace("\u200b", "").strip()
# id
if song_id.isdigit():
song_id = int(song_id)
else:
search_data = apis.cloudsearch.GetSearchResult(song_id, 1, 1)
if search_data.get("result", {}).get("songCount", 0) >= 1:
song_id = search_data["result"]["songs"][0]["id"]
else:
await context.edit(f"**没有找到歌曲**,请检查歌曲名称是否正确。")
return
await start_download(context, song_id, flac_mode)
@listener(outgoing=True, ignore_edited=True)
async def auto_process_ned(context):
if not if_outgoing():
return
# 仅限群组和私聊
if not (context.is_group or context.is_private):
return
if not context.text:
return
data = get_music_id(context.text)
if not data:
return
reply = await context.reply("[ned]")
await start_download(reply, data, True, True, context)
async def start_download(context, song_id: int, flac_mode, outgoing=False, reply=None):
# 获取歌曲质量是否大于 320k HQ
track_info = apis.track.GetTrackAudio([song_id], bitrate=3200 * 1000 if flac_mode else 320000)
# 获取歌曲详情
song_info = apis.track.GetTrackDetail([song_id])
if track_info["data"][0]["code"] == 404:
msg = await context.edit(f"**没有找到歌曲**,请检查歌曲id是否正确。")
if outgoing:
await sleep(5)
await msg.delete()
return
await context.edit(f"正在下载歌曲:**{song_info['songs'][0]['name']} - {gen_author(song_info)}** "
f"{round(track_info['data'][0]['size'] / 1000 / 1000, 2)} MB")
# 下载歌曲并且设置歌曲标签
song = TrackHelper(song_info['songs'][0])
# 转义
for char in song_info["songs"][0]["name"]:
if char in ['/', '\\', ':', '*', '?', '"', '<', '>', '|']:
song_info["songs"][0]["name"] = song_info["songs"][0]["name"].replace(char, '')
path = await netease_down(track_info, song_info, song)
await context.edit("正在上传歌曲。。。")
# 上传歌曲
cap_ = ""
# 提醒登录VIP账号
if track_info["data"][0]["freeTrialInfo"]:
cap_ = f"**非VIP,正在试听 {track_info['data'][0]['freeTrialInfo']['start']}s ~ \n" \
f"{track_info['data'][0]['freeTrialInfo']['end']}s**\n"
cap = f"「**{song_info['songs'][0]['name']}**」\n" \
f"{gen_author(song_info)}\n" \
f"文件大小:{get_file_size(path)} MB\n" \
f"\n{cap_}" \
f"#netease #{int(track_info['data'][0]['br'] / 1000)}kbps #{track_info['data'][0]['type']}"
await context.client.send_file(
context.chat_id,
path,
reply_to=context.message.reply_to_msg_id if not outgoing else reply.id,
caption=cap,
link_preview=False,
force_document=False,
thumb=f'data{sep}{song_info["songs"][0]["name"]}.jpg',
attributes=(DocumentAttributeAudio(
get_duration(song_info, track_info), False, song_info['songs'][0]['name'], gen_author(song_info)),)
)
await context.delete()
# 过多文件自动清理
if len(listdir("data")) > 100:
for i in listdir("data"):
if i.find(".mp3") != -1 or i.find(".jpg") != -1 or i.find(".flac") != -1 or i.find(".ogg") != -1:
remove(f"data{sep}{i}")
msg = await context.respond("[ned] **已自动清除缓存**")
await sleep(3)
await msg.delete()