Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
Update danmu protocol
Browse files Browse the repository at this point in the history
采用 protocol ver 2 连接弹幕服务器,适配压缩弹幕的读取 (fix #289 )
  • Loading branch information
yawwwwwn committed Jul 26, 2020
1 parent 093768e commit ed5e67b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 64 deletions.
154 changes: 91 additions & 63 deletions bilibiliCilent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import asyncio
import random
import struct
import zlib
import json
import sys

Expand Down Expand Up @@ -71,9 +72,10 @@ class bilibiliClient():

def __init__(self, roomid, area):
self.bilibili = bilibili()
self._protocol_version = self.bilibili.dic_bilibili['_protocolversion']
self._reader = None
self._writer = None
self._uid = None
self._uid = int(100000000000000 + 200000000000000 * random.random())
self.connected = False
self._UserCount = 0
self.dic_bulletin = {
Expand All @@ -91,15 +93,13 @@ def close_connection(self):

async def connectServer(self):
try:
reader, writer = await asyncio.open_connection(self.bilibili.dic_bilibili['_ChatHost'],
self.bilibili.dic_bilibili['_ChatPort'])
self._reader, self._writer = await asyncio.open_connection(self.bilibili.dic_bilibili['_ChatHost'],
self.bilibili.dic_bilibili['_ChatPort'])
except:
print("连接无法建立,请检查本地网络状况")
await asyncio.sleep(5)
return
self._reader = reader
self._writer = writer
if (await self.SendJoinChannel(self._roomId) == True):
if await self.SendJoinChannel():
self.connected = True
Printer().printer(f'[{self.area}分区] 连接 {self._roomId} 弹幕服务器成功', "Info", "green")
await self.ReceiveMessageLoop()
Expand All @@ -109,33 +109,45 @@ async def HeartbeatLoop(self):
await asyncio.sleep(0.5)

while self.connected:
await self.SendSocketData(0, 16, self.bilibili.dic_bilibili['_protocolversion'], 2, 1, "")
await self.SendSocketData(ver=self._protocol_version, action=2, body="")
await asyncio.sleep(30)

async def SendJoinChannel(self, channelId):
self._uid = (int)(100000000000000.0 + 200000000000000.0 * random.random())
body = '{"roomid":%s,"uid":%s}' % (channelId, self._uid)
await self.SendSocketData(0, 16, self.bilibili.dic_bilibili['_protocolversion'], 7, 1, body)
return True
async def SendJoinChannel(self):
body = json.dumps({
"roomid": self._roomId,
"uid": self._uid,
# "protover": 2,
# "key": token,
}, separators=(',', ':'))
return await self.SendSocketData(ver=self._protocol_version, action=7, body=body)

async def SendSocketData(self, packetlength, magic, ver, action, param, body):
async def SendSocketData(self, ver, action, body):
bytearr = body.encode('utf-8')
if packetlength == 0:
packetlength = len(bytearr) + 16
sendbytes = struct.pack('!IHHII', packetlength, magic, ver, action, param)
if len(bytearr) != 0:
sendbytes = sendbytes + bytearr
header_len = 16
body_len = len(bytearr)
packet_len = 16 + body_len
sequence = 1
sendbytes = struct.pack(f'!IHHII{body_len}s',
packet_len, header_len, ver, action, sequence, bytearr)
try:
self._writer.write(sendbytes)
except:
Printer().printer(f"Error when self._writer.write(sendbytes): {sys.exc_info()[0]}, {sys.exc_info()[1]}","Error","red")
except Exception:
Printer().printer(f"Error when self._writer.write(sendbytes): {sys.exc_info()[0]}, {sys.exc_info()[1]}",
"Error", "red")
self.connected = False
return False
try:
await self._writer.drain()
except ConnectionError:
pass
except (ConnectionResetError, ConnectionAbortedError) as e:
# [WinError 10054] 远程主机强迫关闭了一个现有的连接。
# [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
Printer().printer(f"Failed @ self._writer.drain(): {repr(e)}", "Error", "red")
return False
except Exception:
Printer().printer(f"Error when self._writer.drain(): {sys.exc_info()[0]}, {sys.exc_info()[1]}","Error","red")
Printer().printer(f"Error when self._writer.drain(): {sys.exc_info()[0]}, {sys.exc_info()[1]}",
"Error", "red")
return False
return True

async def ReadSocketData(self, len_wanted):
bytes_data = b''
Expand Down Expand Up @@ -175,65 +187,81 @@ async def ReadSocketData(self, len_wanted):
self.close_connection()
return None

if not tmp:
if not len(tmp):
Printer().printer(f"主动关闭或者远端主动发来FIN @[{self.area}分区]{self._roomId}","Error","red")
self.close_connection()
await asyncio.sleep(1)
return None
else:
bytes_data = bytes_data + tmp
len_remain = len_remain - len(tmp)
bytes_data += tmp
len_remain -= len(tmp)

return bytes_data

async def ReceiveMessageLoop(self):
while self.connected == True:
tmp = await self.ReadSocketData(16)
if tmp is None:
while self.connected:
length = await self.ReadSocketData(4)
if length is None:
break

expr, = struct.unpack('!I', tmp[:4])
packet_len, = struct.unpack('!I', length)
body = await self.ReadSocketData(packet_len-4)
if body is None:
break

num, = struct.unpack('!I', tmp[8:12])
await self.parse_packet(packet_len, body)

num2 = expr - 16
async def parse_packet(self, packet_len: int, body: bytes):
"""
len(body) == packet_len - 4
:param packet_len: length of whole packet
:param body: header except packet_len + subsequent main body
"""
# 基础假设是每个packet都有16字节header的保守序列,没有另外再先读取header_len然后读header
header_len, ver, action, sequence = struct.unpack('!HHII', body[:12])
body = body[12:]
# print(packet_len, header_len, ver, action, sequence, body)

tmp = await self.ReadSocketData(num2)
if tmp is None:
break
if action == 3:
# 3 人气值,数据不是JSON,是4字节整数
self._UserCount, = struct.unpack('!I', body)
else:
try:
dic = json.loads(body)
except UnicodeDecodeError:
inner_packet = zlib.decompress(body)
# print(f'{packet_len} 字节解压出 {inner_packet.count(b"cmd")} 条消息')

if num2 != 0:
num -= 1
if num == 0 or num == 1 or num == 2:
num3, = struct.unpack('!I', tmp)
self._UserCount = num3
continue
elif num == 3 or num == 4:
try:
messages = tmp.decode('utf-8')
except:
continue
try:
await self.parseDanMu(messages)
except Exception:
Printer().printer(f'Failed when parsing: {messages}\n{traceback.format_exc()}', "Error", "red")
continue
elif num == 5 or num == 6 or num == 7:
continue
else:
if num != 16:
pass
else:
continue
pack_p = 0
packs_len = len(inner_packet)
while pack_p < packs_len:
pack_len, = struct.unpack('!I', inner_packet[pack_p:pack_p+4])
await self.parse_packet(pack_len, inner_packet[pack_p+4:pack_p+pack_len])
pack_p += pack_len
return
except json.JSONDecodeError as e:
Printer().printer(f"{repr(e)} when json decode: {body}", "Error", "red")
return
except Exception:
Printer().printer(f"Failed when parse_packet: {body}\n{traceback.format_exc()}", "Error", "red")
return

async def parseDanMu(self, messages):
try:
dic = json.loads(messages)
except:
return
if action == 5:
try:
await self.parseDanMu(dic)
except Exception:
Printer().printer(f"Failed when parsing: {dic}\n{traceback.format_exc()}", "Error", "red")
elif action == 8:
# 26, 16, 2, 8, 1, b'{"code":0}'
pass
else:
# lyyyuna原版有action=17就不去请求body的逻辑
Printer().printer(f"异常action值: {packet_len, header_len, ver, action, sequence, dic}", "Warning", "red")

async def parseDanMu(self, dic):
cmd = dic.get('cmd')
if cmd is None:
Printer().printer(f"No cmd: {dic}", "Warning", "red")
return

if cmd == 'LIVE':
Expand Down
2 changes: 1 addition & 1 deletion conf/bilibili.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ platform = android
app_secret = 560c52ccd288fed045859ed18bffd973
_CIDInfoUrl = http://live.bilibili.com/api/player?id=cid:
_ChatPort = 2243
_protocolversion = 1
_protocolversion = 2
_ChatHost = livecmt-2.bilibili.com
activity_name =
access_key =
Expand Down

0 comments on commit ed5e67b

Please sign in to comment.