diff --git a/conn/manager/internal/connection_manager.py b/conn/manager/internal/connection_manager.py index e630e13..6130b9b 100644 --- a/conn/manager/internal/connection_manager.py +++ b/conn/manager/internal/connection_manager.py @@ -1,3 +1,4 @@ +import asyncio from fastapi.websockets import WebSocket from conn import Conn from message import Message @@ -65,9 +66,14 @@ def generate_conn_id(): @staticmethod async def receive_broadcast_event(message: Message): overwrite_event(message) + + coroutines = [] + for id in ConnectionManager.conns: conn = ConnectionManager.conns[id] - await conn.send(message) + coroutines.append(conn.send(message)) + + await asyncio.gather(*coroutines) @EventBroker.add_receiver("multicast") @staticmethod @@ -75,12 +81,17 @@ async def receive_multicast_event(message: Message): overwrite_event(message) if "target_conns" not in message.header: raise DumbHumanException() + + coroutines = [] + for conn_id in message.header["target_conns"]: conn = ConnectionManager.get_conn(conn_id) if not conn: raise DumbHumanException() - await conn.send(message) + coroutines.append(conn.send(message)) + + await asyncio.gather(*coroutines) @staticmethod async def handle_message(message: Message): diff --git a/event/internal/event_broker.py b/event/internal/event_broker.py index 6301391..df78933 100644 --- a/event/internal/event_broker.py +++ b/event/internal/event_broker.py @@ -1,4 +1,5 @@ from __future__ import annotations +import asyncio from typing import Callable, Generic from message import Message from .exceptions import NoMatchingReceiverException @@ -78,10 +79,14 @@ async def publish(message: Message): if message.event not in EventBroker.event_dict: raise NoMatchingReceiverException(message.event) + coroutines = [] + receiver_ids = EventBroker.event_dict[message.event] for id in receiver_ids: receiver = Receiver.get_receiver(id) - await receiver(message) + coroutines.append(receiver(message)) + + await asyncio.gather(*coroutines) def _debug(message: Message): print(message.to_str(del_header=False))