From abf57aa038eab42976710fdea938ab0ba691baa7 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 24 Aug 2023 12:49:48 -0400 Subject: [PATCH] Add TransportClientMultiplexer, MuxStreamClient. Adding to TransportClientMultiplexer as multiplexing wrapper that can spawn specialized MuxStreamClient subtype of TransportLayerClient. --- .../dmod/communication/client.py | 193 +++++++++++++++++- 1 file changed, 192 insertions(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index dd05b1cec..8a30bb610 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -7,7 +7,7 @@ from asyncio import AbstractEventLoop from deprecated import deprecated from pathlib import Path -from typing import Generic, Optional, Type, TypeVar, Union +from typing import Dict, Generic, List, Optional, Type, TypeVar, Union import websockets @@ -504,6 +504,197 @@ def is_new_session(self) -> Optional[bool]: return self._is_new_session +class TransportClientMultiplexer: + """ + Wrapper around a ::class:`TransportLayerClient` that allows for multiplexing over that client's connection. + + Type supports spawning attached ::class:`MuxStreamClient` instances which then utilize this type's multiplexing + capabilities. + """ + + def __init__(self, wrapped_client: TransportLayerClient, len_mux_ids: int = 8, *args, **kwargs): + super().__init__(*args, **kwargs) + self._wrapped_client: TransportLayerClient = wrapped_client + self._recv_queues: Dict[str, List[str]] = dict() + """ Per-mux-stream queues ::method:`mux_recv` to hold data via ::method:`async_recv` for other streams. """ + self._active_mux_clients: Dict[str, MuxStreamClient] = dict() + """ Clients spawned by ::method:`spawn_mux_client` and not yet retired with ::method:`retire_mux_client`. """ + self._perform_recv_lock: asyncio.Lock = asyncio.Lock() + """ A lock to avoid getting async things out of order due to poorly time queuing in ::method:`mux_recv`. """ + self._int_mux_id = -1 + """ Int basis for str mux ids, initially ``-1`` so later logic increments and uses ``0`` as 1st base value. """ + + self._len_mux_id = len_mux_ids + + async def mux_recv(self, mux_id: str) -> str: + """ + Receive and return data for a particular mux stream from server over a multiplexed connection. + + Parameters + ---------- + mux_id : str + A unique identifier for the specific isolated data stream to receive from. + + Returns + ------- + str + The data received from the server, as a string. + """ + while True: + # Lock the connection to avoid trouble if two consecutive messages are received AFTER we check the queue, + # but a different function call beats "this" one's async await to the first message (and then "this" one + # end up with the second message out of order) + async with self._perform_recv_lock: + # First check if we already have something + if self._recv_queues.get(mux_id, None): + return self._recv_queues[mux_id].pop(0) + + # Then try receiving + mux_encoded_data = await self._wrapped_client.async_recv() + received_mux_id, data = mux_encoded_data[:self._len_mux_id], mux_encoded_data[self._len_mux_id:] + + if mux_id == received_mux_id: + return data + elif received_mux_id in self._recv_queues: + self._recv_queues[received_mux_id].append(data) + else: + # FIXME: is this the best thing to do? Should we instead just throw this out? Or save elsewhere? + msg = f"{self.__class__.__name__} received data on unexpected mux stream {received_mux_id}" + raise RuntimeError(msg) + + async def mux_send(self, data: Union[str, bytearray, bytes], mux_id: str): + """ + Send data to server on a particular mux stream. + + Parameters + ---------- + data: Union[str, bytearray, bytes] + The data to send. + mux_id : str + A unique identifier for an isolated data stream. + """ + if len(mux_id) != self._len_mux_id: + msg = f"{self.__class__.__name__} can't send using mux id {mux_id} (expected length {self._len_mux_id!s})" + raise ValueError(msg) + await self._wrapped_client.async_send(f"{mux_id}{data}") + + def retire_mux_client(self, mux_client: 'MuxStreamClient') -> bool: + """ + Retire the mux_id and associated, spawned client reference, making the id eligible for reuse. + + By "retire," this instance simple removes the entry with the provided mux id key from its internal dictionary + of previously-spawned clients (and queue received data), assuming this particular client was present. + + The function will not "retire" the client and will return ``False`` if there is received data queued for this + particular client that was received during a ::method:`mux_recv` for a different client. Also, a + ::class:`ValueError` will be raised if this instance does not recognize the mux client as active. + + Parameters + ---------- + mux_client : MuxStreamClient + The client to "retire." + + Returns + ------- + bool + Whether this particular client has now been "retired." + """ + lookup_client = self._active_mux_clients.get(mux_client.mux_id, None) + + if lookup_client is None: + raise ValueError(f"{self.__class__.__name__} can't retire unrecognized {mux_client.__class__.__name__} " + f"with id {mux_client.mux_id}") + if lookup_client is not mux_client: + raise ValueError(f"{self.__class__.__name__} can't retire {mux_client.__class__.__name__} that is different" + f" than internal reference for client with id {mux_client.mux_id}") + # Don't retire if there is still data queued for the client + if len(self._recv_queues[mux_client.mux_id]) > 0: + return False + else: + self._active_mux_clients.pop(mux_client.mux_id) + self._recv_queues.pop(mux_client.mux_id) + return True + + def spawn_mux_client(self) -> 'MuxStreamClient': + """ + Create and return a ::class:`MuxStreamClient` backed by this instances. + + Returns + ------- + MuxStreamClient + A transport client object backed by a particular mux stream of this instance's shared connection. + """ + # Loop through possible mux_id values until we find a free one, then use that to build and return a client + while True: + # Increment first, unless we overflow + self._int_mux_id = (self._int_mux_id + 1) if (self._int_mux_id + 1) < (10 ** self._len_mux_id) else 0 + + # Use that new mux_int for the id (padded with 0s) + mux_id = str(self._int_mux_id).zfill(self._len_mux_id) + + # Of course, only exit the loop and return if that mux_id isn't in use + if mux_id not in self._active_mux_clients: + # Do these first here to make sure the keys are in the dict, or else an exception should raise when + # initializing the client + self._active_mux_clients[mux_id] = None + self._recv_queues[mux_id] = list() + + client = MuxStreamClient(mux_id=mux_id, parent=self) + self._active_mux_clients[mux_id] = client + return client + + +class MuxStreamClient(TransportLayerClient): + """ + Subtype of ::class:`TransportLayerClient` for working over a mux stream from a ::class:`TransportClientMultiplexer`. + + Class is tightly coupled to ::class:`TransportClientMultiplexer`. Instances should only be created using + ::method:`TransportClientMultiplexer.spawn_mux_client`. + """ + + def __init__(self, mux_id: str, parent: TransportClientMultiplexer, *args, **kwargs): + # To ensure clients are only created by "spawning" from a parent, do a few ... interesting checks + # These all are consistent with what should happen within TransportClientMultiplexer.spawn_mux_client() before a + # new stream client object is created + if (mux_id not in parent._active_mux_clients + or parent._active_mux_clients[mux_id] is not None # yes, should be None; changed right after this __init__ + or not isinstance(parent._recv_queues.get(mux_id), list) + or len(parent._recv_queues[mux_id]) != 0): + raise RuntimeError(f"Do not create {self.__class__.__name__} except by using `spawn_mux_client`") + + super().__init__(*args, **kwargs) + self._mux_id = mux_id + self._parent = parent + self._is_retired = False + + async def async_send(self, data: Union[str, bytearray, bytes], await_response: bool = False) -> Optional[str]: + await self._parent.mux_send(data=data, mux_id=self._mux_id) + if await_response: + return await self._parent.mux_recv(mux_id=self._mux_id) + + async def async_recv(self) -> str: + return await self._parent.mux_recv(mux_id=self._mux_id) + + @property + def endpoint_uri(self) -> str: + return self._parent._wrapped_client.endpoint_uri + + @property + def mux_id(self) -> str: + return self._mux_id + + def retire(self) -> bool: + """ + Have this instances parent retire it from use. + + Returns + ------- + bool + Whether retiring was successful. + """ + return self._parent.retire_mux_client(mux_client=self) + + class RequestClient: """ Simple DMOD service client, dealing with DMOD request message and response objects.