From ed5e67b9160d816f8b1bc66dad621241a3934ce1 Mon Sep 17 00:00:00 2001 From: yawwwwwn <40122222+yawwwwwn@users.noreply.github.com> Date: Sun, 26 Jul 2020 17:40:55 +0900 Subject: [PATCH] Update danmu protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 采用 protocol ver 2 连接弹幕服务器,适配压缩弹幕的读取 (fix #289 ) --- bilibiliCilent.py | 154 ++++++++++++++++++++++++++------------------- conf/bilibili.conf | 2 +- 2 files changed, 92 insertions(+), 64 deletions(-) diff --git a/bilibiliCilent.py b/bilibiliCilent.py index e304641..b8718cb 100644 --- a/bilibiliCilent.py +++ b/bilibiliCilent.py @@ -8,6 +8,7 @@ import asyncio import random import struct +import zlib import json import sys @@ -71,9 +72,10 @@ class bilibiliClient(): def __init__(self, roomid, area): self.bilibili = bilibili() + self._protocol_version = self.bilibili.dic_bilibili['_protocolversion'] self._reader = None self._writer = None - self._uid = None + self._uid = int(100000000000000 + 200000000000000 * random.random()) self.connected = False self._UserCount = 0 self.dic_bulletin = { @@ -91,15 +93,13 @@ def close_connection(self): async def connectServer(self): try: - reader, writer = await asyncio.open_connection(self.bilibili.dic_bilibili['_ChatHost'], - self.bilibili.dic_bilibili['_ChatPort']) + self._reader, self._writer = await asyncio.open_connection(self.bilibili.dic_bilibili['_ChatHost'], + self.bilibili.dic_bilibili['_ChatPort']) except: print("连接无法建立,请检查本地网络状况") await asyncio.sleep(5) return - self._reader = reader - self._writer = writer - if (await self.SendJoinChannel(self._roomId) == True): + if await self.SendJoinChannel(): self.connected = True Printer().printer(f'[{self.area}分区] 连接 {self._roomId} 弹幕服务器成功', "Info", "green") await self.ReceiveMessageLoop() @@ -109,33 +109,45 @@ async def HeartbeatLoop(self): await asyncio.sleep(0.5) while self.connected: - await self.SendSocketData(0, 16, self.bilibili.dic_bilibili['_protocolversion'], 2, 1, "") + await self.SendSocketData(ver=self._protocol_version, action=2, body="") await asyncio.sleep(30) - async def SendJoinChannel(self, channelId): - self._uid = (int)(100000000000000.0 + 200000000000000.0 * random.random()) - body = '{"roomid":%s,"uid":%s}' % (channelId, self._uid) - await self.SendSocketData(0, 16, self.bilibili.dic_bilibili['_protocolversion'], 7, 1, body) - return True + async def SendJoinChannel(self): + body = json.dumps({ + "roomid": self._roomId, + "uid": self._uid, + # "protover": 2, + # "key": token, + }, separators=(',', ':')) + return await self.SendSocketData(ver=self._protocol_version, action=7, body=body) - async def SendSocketData(self, packetlength, magic, ver, action, param, body): + async def SendSocketData(self, ver, action, body): bytearr = body.encode('utf-8') - if packetlength == 0: - packetlength = len(bytearr) + 16 - sendbytes = struct.pack('!IHHII', packetlength, magic, ver, action, param) - if len(bytearr) != 0: - sendbytes = sendbytes + bytearr + header_len = 16 + body_len = len(bytearr) + packet_len = 16 + body_len + sequence = 1 + sendbytes = struct.pack(f'!IHHII{body_len}s', + packet_len, header_len, ver, action, sequence, bytearr) try: self._writer.write(sendbytes) - except: - Printer().printer(f"Error when self._writer.write(sendbytes): {sys.exc_info()[0]}, {sys.exc_info()[1]}","Error","red") + except Exception: + Printer().printer(f"Error when self._writer.write(sendbytes): {sys.exc_info()[0]}, {sys.exc_info()[1]}", + "Error", "red") self.connected = False + return False try: await self._writer.drain() - except ConnectionError: - pass + except (ConnectionResetError, ConnectionAbortedError) as e: + # [WinError 10054] 远程主机强迫关闭了一个现有的连接。 + # [WinError 10053] 你的主机中的软件中止了一个已建立的连接。 + Printer().printer(f"Failed @ self._writer.drain(): {repr(e)}", "Error", "red") + return False except Exception: - Printer().printer(f"Error when self._writer.drain(): {sys.exc_info()[0]}, {sys.exc_info()[1]}","Error","red") + Printer().printer(f"Error when self._writer.drain(): {sys.exc_info()[0]}, {sys.exc_info()[1]}", + "Error", "red") + return False + return True async def ReadSocketData(self, len_wanted): bytes_data = b'' @@ -175,65 +187,81 @@ async def ReadSocketData(self, len_wanted): self.close_connection() return None - if not tmp: + if not len(tmp): Printer().printer(f"主动关闭或者远端主动发来FIN @[{self.area}分区]{self._roomId}","Error","red") self.close_connection() await asyncio.sleep(1) return None else: - bytes_data = bytes_data + tmp - len_remain = len_remain - len(tmp) + bytes_data += tmp + len_remain -= len(tmp) return bytes_data async def ReceiveMessageLoop(self): - while self.connected == True: - tmp = await self.ReadSocketData(16) - if tmp is None: + while self.connected: + length = await self.ReadSocketData(4) + if length is None: break - expr, = struct.unpack('!I', tmp[:4]) + packet_len, = struct.unpack('!I', length) + body = await self.ReadSocketData(packet_len-4) + if body is None: + break - num, = struct.unpack('!I', tmp[8:12]) + await self.parse_packet(packet_len, body) - num2 = expr - 16 + async def parse_packet(self, packet_len: int, body: bytes): + """ + len(body) == packet_len - 4 + :param packet_len: length of whole packet + :param body: header except packet_len + subsequent main body + """ + # 基础假设是每个packet都有16字节header的保守序列,没有另外再先读取header_len然后读header + header_len, ver, action, sequence = struct.unpack('!HHII', body[:12]) + body = body[12:] + # print(packet_len, header_len, ver, action, sequence, body) - tmp = await self.ReadSocketData(num2) - if tmp is None: - break + if action == 3: + # 3 人气值,数据不是JSON,是4字节整数 + self._UserCount, = struct.unpack('!I', body) + else: + try: + dic = json.loads(body) + except UnicodeDecodeError: + inner_packet = zlib.decompress(body) + # print(f'{packet_len} 字节解压出 {inner_packet.count(b"cmd")} 条消息') - if num2 != 0: - num -= 1 - if num == 0 or num == 1 or num == 2: - num3, = struct.unpack('!I', tmp) - self._UserCount = num3 - continue - elif num == 3 or num == 4: - try: - messages = tmp.decode('utf-8') - except: - continue - try: - await self.parseDanMu(messages) - except Exception: - Printer().printer(f'Failed when parsing: {messages}\n{traceback.format_exc()}', "Error", "red") - continue - elif num == 5 or num == 6 or num == 7: - continue - else: - if num != 16: - pass - else: - continue + pack_p = 0 + packs_len = len(inner_packet) + while pack_p < packs_len: + pack_len, = struct.unpack('!I', inner_packet[pack_p:pack_p+4]) + await self.parse_packet(pack_len, inner_packet[pack_p+4:pack_p+pack_len]) + pack_p += pack_len + return + except json.JSONDecodeError as e: + Printer().printer(f"{repr(e)} when json decode: {body}", "Error", "red") + return + except Exception: + Printer().printer(f"Failed when parse_packet: {body}\n{traceback.format_exc()}", "Error", "red") + return - async def parseDanMu(self, messages): - try: - dic = json.loads(messages) - except: - return + if action == 5: + try: + await self.parseDanMu(dic) + except Exception: + Printer().printer(f"Failed when parsing: {dic}\n{traceback.format_exc()}", "Error", "red") + elif action == 8: + # 26, 16, 2, 8, 1, b'{"code":0}' + pass + else: + # lyyyuna原版有action=17就不去请求body的逻辑 + Printer().printer(f"异常action值: {packet_len, header_len, ver, action, sequence, dic}", "Warning", "red") + async def parseDanMu(self, dic): cmd = dic.get('cmd') if cmd is None: + Printer().printer(f"No cmd: {dic}", "Warning", "red") return if cmd == 'LIVE': diff --git a/conf/bilibili.conf b/conf/bilibili.conf index d82e97b..3c922ce 100644 --- a/conf/bilibili.conf +++ b/conf/bilibili.conf @@ -8,7 +8,7 @@ platform = android app_secret = 560c52ccd288fed045859ed18bffd973 _CIDInfoUrl = http://live.bilibili.com/api/player?id=cid: _ChatPort = 2243 -_protocolversion = 1 +_protocolversion = 2 _ChatHost = livecmt-2.bilibili.com activity_name = access_key =