From 86e302d68aa58515a39ad6c6e77ce18d912bcc00 Mon Sep 17 00:00:00 2001 From: douglascdev Date: Thu, 14 Dec 2023 15:33:47 -0300 Subject: [PATCH] Make a separate file for the irc message parser --- hasherino/__main__.py | 3 +- hasherino/factory.py | 2 +- hasherino/parse_irc.py | 346 +++++++++++++++++++++++++++++++++ hasherino/twitch_websocket.py | 347 +--------------------------------- 4 files changed, 350 insertions(+), 348 deletions(-) create mode 100644 hasherino/parse_irc.py diff --git a/hasherino/__main__.py b/hasherino/__main__.py index ba28ed2..b2890e4 100644 --- a/hasherino/__main__.py +++ b/hasherino/__main__.py @@ -16,6 +16,7 @@ from hasherino.components.settings_view import LOG_PATH from hasherino.factory import message_factory from hasherino.hasherino_dataclasses import Emote, HasherinoUser +from hasherino.parse_irc import Command, ParsedMessage from hasherino.pubsub import PubSub from hasherino.storage import ( AsyncKeyValueStorage, @@ -23,7 +24,7 @@ PersistentStorage, get_default_os_settings_path, ) -from hasherino.twitch_websocket import Command, ParsedMessage, TwitchWebsocket +from hasherino.twitch_websocket import TwitchWebsocket class Hasherino: diff --git a/hasherino/factory.py b/hasherino/factory.py index 48f4f1e..0505293 100644 --- a/hasherino/factory.py +++ b/hasherino/factory.py @@ -1,5 +1,5 @@ from hasherino.hasherino_dataclasses import Emote, HasherinoUser, Message -from hasherino.twitch_websocket import ParsedMessage +from hasherino.parse_irc import ParsedMessage def message_factory( diff --git a/hasherino/parse_irc.py b/hasherino/parse_irc.py new file mode 100644 index 0000000..915a3dc --- /dev/null +++ b/hasherino/parse_irc.py @@ -0,0 +1,346 @@ +import logging +from collections import defaultdict +from enum import Enum, auto + +from hasherino.hasherino_dataclasses import Badge, Emote + + +class Command(Enum): + PRIVMSG = auto() + USERSTATE = auto() + GLOBALUSERSTATE = auto() + OTHER = auto() + + +class ParsedMessage: + def __init__(self, message: str): + self.source = self.tags = self.parameters = self.command = None + + raw_components = self._get_raw_components(message) + if not raw_components: + return None + + self.command = self._parse_command(raw_components["raw_command"]) + + if self.command is None: + return None + else: + if raw_components["raw_tags"] is not None: + self.tags = self._parse_tags(raw_components["raw_tags"]) + + self.source = self._parse_source(raw_components["raw_source"]) + self.parameters = raw_components["raw_parameters"] + + def get_badges(self, ttv_badges: dict) -> list[Badge]: + def get_badge(set_id: str, version: str) -> dict | None: + try: + id_match = next((s for s in ttv_badges if s["set_id"] == set_id)) + version_match = next( + (s for s in id_match["versions"] if s["id"] == version) + ) + return version_match + except: + return None + + badges = [] + + if not self.tags or not self.tags.get("badges"): + return badges + + try: + for id, version in self.tags["badges"].items(): + badge = get_badge(id, version) + if badge: + badges.append(Badge(id, badge["title"], badge["image_url_4x"])) + except Exception as e: + logging.exception(f"Error {e}. Failed to get badges from message: {self}") + return [] + + return badges + + def get_author_chat_color(self) -> str: + result = "ffffff" + + if self.tags and self.tags["color"]: + result = ( + self.tags["color"][1:] + if self.tags["color"][0] == "#" + else self.tags["color"] + ) + + assert len(result) <= 7, f"Returned invalid color: {result}" + assert result[0] != "#" + + return f"#{result}" + + def get_author_displayname(self) -> str: + if not self.tags or not self.tags.get("display-name"): + logging.warning(f"Failed to get author display-name fo message: {self}") + return "" + + return self.tags.get("display-name") + + def get_command(self) -> Command: + result = Command.OTHER + + if not self.command or not self.command.get("command"): + return result + + match self.command["command"]: + case "PRIVMSG": + result = Command.PRIVMSG + case "USERSTATE": + result = Command.USERSTATE + case "GLOBALUSERSTATE": + result = Command.GLOBALUSERSTATE + case _: + result = Command.OTHER + + return result + + def is_me(self) -> bool: + """ + Messages sent with /me, coloring the whole line with the user's chat color + """ + return ( + self.get_command() is Command.PRIVMSG + and self.parameters[:7] == "\x01ACTION" + ) + + def get_message_text(self) -> str: + # Remove \r\n from end of text + result = "" if len(self.parameters) <= 2 else self.parameters[:-2] + + if self.is_me(): + # parameters: '\x01ACTION asd\x01\r\n' + result = result[8:-1] + + return result + + def get_emote_sets(self) -> list[str]: + result = [] + + if self.tags and self.tags.get("emote-sets"): + result = [tag for tag in self.tags["emote-sets"].split(",")] + + return result + + def get_emote_map(self) -> dict[str, Emote]: + """ + Returns map of emote name to emote object for twitch emotes included in the message tags + """ + if not self.tags: + return {} + + emote_name_to_id_and_url: dict[str, Emote] = {} + + if self.tags.get("emotes"): + for emote_id, list_of_index_tuples in self.tags["emotes"].items(): + first_starting_index, first_ending_index = map( + int, list_of_index_tuples[0] + ) + emote_name = self.get_message_text()[ + first_starting_index : first_ending_index + 1 + ] + emote_name_to_id_and_url[emote_name] = Emote( + emote_name, + emote_id, + f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/dark/2.0", + ) + + return emote_name_to_id_and_url + + def __str__(self) -> str: + return str(self.__dict__) + + def _get_raw_components(self, message: str) -> dict[str, str]: + if not message: + return None + + raw_tags = raw_source = raw_command = raw_parameters = "" + + # Start index + idx = 0 + + # Get tags + if message[idx] == "@": + end_idx = message.find(" ") + raw_tags = message[1:end_idx] + idx = end_idx + 1 + + # Get source(nick and host) + if message[idx] == ":": + idx += 1 + end_idx = message.find(" ", idx) + raw_source = message[idx:end_idx] + idx = end_idx + 1 + + # Command + end_idx = message.find(":", idx) + if -1 == end_idx: + end_idx = len(message) + + raw_command = message[idx:end_idx].strip() + + # Parameters + if end_idx != len(message): + idx = end_idx + 1 + raw_parameters = message[idx:] + + return { + "raw_tags": raw_tags, + "raw_source": raw_source, + "raw_command": raw_command, + "raw_parameters": raw_parameters, + } + + def _parse_command(self, raw_command: str) -> dict: + parsed_command = None + command_parts = raw_command.split(" ") + + match command_parts[0]: + case "JOIN" | "PART" | "NOTICE" | "CLEARCHAT" | "HOSTTARGET": + pass + case "PRIVMSG": + parsed_command = { + "command": command_parts[0], + "channel": command_parts[1], + } + case "PING": + parsed_command = {"command": command_parts[0]} + case "CAP": + """ + The parameters part of the messages contains the + enabled capabilities. + """ + parsed_command = { + "command": command_parts[0], + "isCapRequestEnabled": command_parts[2] == "ACK", + } + case "GLOBALUSERSTATE": + """ + Included only if you request the /commands capability. + But it has no meaning without also including the /tags capability. + """ + parsed_command = {"command": command_parts[0]} + case "USERSTATE": + """ + Included only if you request the /commands capability. + But it has no meaning without also including the /tags capability. + """ + parsed_command = {"command": command_parts[0]} + case "ROOMSTATE": + """ + Included only if you request the /commands capability. + But it has no meaning without also including the /tags capability. + """ + parsed_command = { + "command": command_parts[0], + "channel": command_parts[1], + } + case "RECONNECT": + logging.info( + "The Twitch IRC server is about to terminate the connection for maintenance." + ) + parsed_command = {"command": command_parts[0]} + case "421": + logging.warning(f"Unsupported IRC command: {command_parts[2]}") + return None + case "001": + # Logged in (successfully authenticated) + parsed_command = { + "command": command_parts[0], + "channel": command_parts[1], + } + case "002" | "003" | "004" | "353" | "366" | "372" | "375": + """ + Ignoring all other numeric messages. + 353 tells you who else is in the chat room you're joining. + """ + pass + case "376": + logging.info(f"Numeric message: {command_parts[0]}") + return None + case _: + logging.warning(f"Unexpected command: {command_parts[0]}") + return None + + return parsed_command + + def _parse_source(self, raw_source: str) -> None | dict[str, str]: + if not raw_source: + return None + else: + source_parts = raw_source.split("!") + return { + "nick": source_parts[0] if len(source_parts) == 2 else None, + "host": source_parts[1] if len(source_parts) == 2 else source_parts[0], + } + + def _parse_tags(self, raw_tags: str): + dict_parsed_tags = {} + + if not raw_tags: + return dict_parsed_tags + + for tag in raw_tags.split(";"): + tag_key, tag_value = tag.split("=") + + match tag_key: + case "badges-info": + """ + Contains metadata related to the chat badges in the badges tag. + Currently, this tag contains metadata only for subscriber badges, to indicate the number of months the user has been a subscriber. + """ + pass + case "badges": + # badges=staff/1,broadcaster/1,turbo/1; + if tag_value: + badges = dict() + for badge_and_version in tag_value.split(","): + badge, version = badge_and_version.split("/") + badges[badge] = version + else: + badges = None + + dict_parsed_tags[tag_key] = badges + + case "emotes": + """ + emotes=25:0-4,12-16/1902:6-10 + emotes=emotesv2_c51307f86f6241bc8cd8385efd7c7509:0-9/emotesv2_d9f1e820ca8e42bab70fc2f22dea0d5a:31-44 + + Comma-delimited list of emotes and their positions in the message. + Each emote is in the form, :- + """ + if tag_value: + id_to_positions = defaultdict(list) + + for emote_id_and_pos in tag_value.split("/"): + emote_id, positions = emote_id_and_pos.split(":") + + for start_end in positions.split(","): + start, end = start_end.split("-") + + id_to_positions[emote_id].append((start, end)) + + dict_parsed_tags[tag_key] = dict(id_to_positions) + else: + dict_parsed_tags[tag_key] = None + + case "color": + dict_parsed_tags["color"] = tag_value[1:] + + case "user-id": + dict_parsed_tags["user-id"] = tag_value + + case "display-name": + dict_parsed_tags["display-name"] = tag_value + + case "emote-sets": + dict_parsed_tags["emote-sets"] = tag_value + + case _: + pass + + return dict_parsed_tags diff --git a/hasherino/twitch_websocket.py b/hasherino/twitch_websocket.py index ea737ea..e76a44d 100644 --- a/hasherino/twitch_websocket.py +++ b/hasherino/twitch_websocket.py @@ -1,357 +1,12 @@ import logging import ssl -from collections import defaultdict -from enum import Enum, auto from typing import Awaitable import certifi import websockets from websockets.exceptions import ConnectionClosedError -from hasherino.hasherino_dataclasses import Badge, Emote - -__all__ = ["TwitchWebsocket", "ParsedMessage"] - - -class Command(Enum): - PRIVMSG = auto() - USERSTATE = auto() - GLOBALUSERSTATE = auto() - OTHER = auto() - - -class ParsedMessage: - def __init__(self, message: str): - self.source = self.tags = self.parameters = self.command = None - - raw_components = self._get_raw_components(message) - if not raw_components: - return None - - self.command = self._parse_command(raw_components["raw_command"]) - - if self.command is None: - return None - else: - if raw_components["raw_tags"] is not None: - self.tags = self._parse_tags(raw_components["raw_tags"]) - - self.source = self._parse_source(raw_components["raw_source"]) - self.parameters = raw_components["raw_parameters"] - - def get_badges(self, ttv_badges: dict) -> list[Badge]: - def get_badge(set_id: str, version: str) -> dict | None: - try: - id_match = next((s for s in ttv_badges if s["set_id"] == set_id)) - version_match = next( - (s for s in id_match["versions"] if s["id"] == version) - ) - return version_match - except: - return None - - badges = [] - - if not self.tags or not self.tags.get("badges"): - return badges - - try: - for id, version in self.tags["badges"].items(): - badge = get_badge(id, version) - if badge: - badges.append(Badge(id, badge["title"], badge["image_url_4x"])) - except Exception as e: - logging.exception(f"Error {e}. Failed to get badges from message: {self}") - return [] - - return badges - - def get_author_chat_color(self) -> str: - result = "ffffff" - - if self.tags and self.tags["color"]: - result = ( - self.tags["color"][1:] - if self.tags["color"][0] == "#" - else self.tags["color"] - ) - - assert len(result) <= 7, f"Returned invalid color: {result}" - assert result[0] != "#" - - return f"#{result}" - - def get_author_displayname(self) -> str: - if not self.tags or not self.tags.get("display-name"): - logging.warning(f"Failed to get author display-name fo message: {self}") - return "" - - return self.tags.get("display-name") - - def get_command(self) -> Command: - result = Command.OTHER - - if not self.command or not self.command.get("command"): - return result - - match self.command["command"]: - case "PRIVMSG": - result = Command.PRIVMSG - case "USERSTATE": - result = Command.USERSTATE - case "GLOBALUSERSTATE": - result = Command.GLOBALUSERSTATE - case _: - result = Command.OTHER - - return result - - def is_me(self) -> bool: - """ - Messages sent with /me, coloring the whole line with the user's chat color - """ - return ( - self.get_command() is Command.PRIVMSG - and self.parameters[:7] == "\x01ACTION" - ) - - def get_message_text(self) -> str: - # Remove \r\n from end of text - result = "" if len(self.parameters) <= 2 else self.parameters[:-2] - - if self.is_me(): - # parameters: '\x01ACTION asd\x01\r\n' - result = result[8:-1] - - return result - - def get_emote_sets(self) -> list[str]: - result = [] - - if self.tags and self.tags.get("emote-sets"): - result = [tag for tag in self.tags["emote-sets"].split(",")] - - return result - - def get_emote_map(self) -> dict[str, Emote]: - """ - Returns map of emote name to emote object for twitch emotes included in the message tags - """ - if not self.tags: - return {} - - emote_name_to_id_and_url: dict[str, Emote] = {} - - if self.tags.get("emotes"): - for emote_id, list_of_index_tuples in self.tags["emotes"].items(): - first_starting_index, first_ending_index = map( - int, list_of_index_tuples[0] - ) - emote_name = self.get_message_text()[ - first_starting_index : first_ending_index + 1 - ] - emote_name_to_id_and_url[emote_name] = Emote( - emote_name, - emote_id, - f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/dark/2.0", - ) - - return emote_name_to_id_and_url - - def __str__(self) -> str: - return str(self.__dict__) - - def _get_raw_components(self, message: str) -> dict[str, str]: - if not message: - return None - - raw_tags = raw_source = raw_command = raw_parameters = "" - - # Start index - idx = 0 - - # Get tags - if message[idx] == "@": - end_idx = message.find(" ") - raw_tags = message[1:end_idx] - idx = end_idx + 1 - - # Get source(nick and host) - if message[idx] == ":": - idx += 1 - end_idx = message.find(" ", idx) - raw_source = message[idx:end_idx] - idx = end_idx + 1 - - # Command - end_idx = message.find(":", idx) - if -1 == end_idx: - end_idx = len(message) - - raw_command = message[idx:end_idx].strip() - - # Parameters - if end_idx != len(message): - idx = end_idx + 1 - raw_parameters = message[idx:] - - return { - "raw_tags": raw_tags, - "raw_source": raw_source, - "raw_command": raw_command, - "raw_parameters": raw_parameters, - } - - def _parse_command(self, raw_command: str) -> dict: - parsed_command = None - command_parts = raw_command.split(" ") - - match command_parts[0]: - case "JOIN" | "PART" | "NOTICE" | "CLEARCHAT" | "HOSTTARGET": - pass - case "PRIVMSG": - parsed_command = { - "command": command_parts[0], - "channel": command_parts[1], - } - case "PING": - parsed_command = {"command": command_parts[0]} - case "CAP": - """ - The parameters part of the messages contains the - enabled capabilities. - """ - parsed_command = { - "command": command_parts[0], - "isCapRequestEnabled": command_parts[2] == "ACK", - } - case "GLOBALUSERSTATE": - """ - Included only if you request the /commands capability. - But it has no meaning without also including the /tags capability. - """ - parsed_command = {"command": command_parts[0]} - case "USERSTATE": - """ - Included only if you request the /commands capability. - But it has no meaning without also including the /tags capability. - """ - parsed_command = {"command": command_parts[0]} - case "ROOMSTATE": - """ - Included only if you request the /commands capability. - But it has no meaning without also including the /tags capability. - """ - parsed_command = { - "command": command_parts[0], - "channel": command_parts[1], - } - case "RECONNECT": - logging.info( - "The Twitch IRC server is about to terminate the connection for maintenance." - ) - parsed_command = {"command": command_parts[0]} - case "421": - logging.warning(f"Unsupported IRC command: {command_parts[2]}") - return None - case "001": - # Logged in (successfully authenticated) - parsed_command = { - "command": command_parts[0], - "channel": command_parts[1], - } - case "002" | "003" | "004" | "353" | "366" | "372" | "375": - """ - Ignoring all other numeric messages. - 353 tells you who else is in the chat room you're joining. - """ - pass - case "376": - logging.info(f"Numeric message: {command_parts[0]}") - return None - case _: - logging.warning(f"Unexpected command: {command_parts[0]}") - return None - - return parsed_command - - def _parse_source(self, raw_source: str) -> None | dict[str, str]: - if not raw_source: - return None - else: - source_parts = raw_source.split("!") - return { - "nick": source_parts[0] if len(source_parts) == 2 else None, - "host": source_parts[1] if len(source_parts) == 2 else source_parts[0], - } - - def _parse_tags(self, raw_tags: str): - dict_parsed_tags = {} - - if not raw_tags: - return dict_parsed_tags - - for tag in raw_tags.split(";"): - tag_key, tag_value = tag.split("=") - - match tag_key: - case "badges-info": - """ - Contains metadata related to the chat badges in the badges tag. - Currently, this tag contains metadata only for subscriber badges, to indicate the number of months the user has been a subscriber. - """ - pass - case "badges": - # badges=staff/1,broadcaster/1,turbo/1; - if tag_value: - badges = dict() - for badge_and_version in tag_value.split(","): - badge, version = badge_and_version.split("/") - badges[badge] = version - else: - badges = None - - dict_parsed_tags[tag_key] = badges - - case "emotes": - """ - emotes=25:0-4,12-16/1902:6-10 - emotes=emotesv2_c51307f86f6241bc8cd8385efd7c7509:0-9/emotesv2_d9f1e820ca8e42bab70fc2f22dea0d5a:31-44 - - Comma-delimited list of emotes and their positions in the message. - Each emote is in the form, :- - """ - if tag_value: - id_to_positions = defaultdict(list) - - for emote_id_and_pos in tag_value.split("/"): - emote_id, positions = emote_id_and_pos.split(":") - - for start_end in positions.split(","): - start, end = start_end.split("-") - - id_to_positions[emote_id].append((start, end)) - - dict_parsed_tags[tag_key] = dict(id_to_positions) - else: - dict_parsed_tags[tag_key] = None - - case "color": - dict_parsed_tags["color"] = tag_value[1:] - - case "user-id": - dict_parsed_tags["user-id"] = tag_value - - case "display-name": - dict_parsed_tags["display-name"] = tag_value - - case "emote-sets": - dict_parsed_tags["emote-sets"] = tag_value - - case _: - pass - - return dict_parsed_tags +from hasherino.parse_irc import ParsedMessage class TwitchWebsocket: