From e77afb8902bf4133431d85a44572a4bacbb9c448 Mon Sep 17 00:00:00 2001 From: Eyeball Edward Date: Thu, 22 Feb 2024 21:45:13 +1100 Subject: [PATCH] Add in address to control multiple PIDs --- metlinkpid.py | 72 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/metlinkpid.py b/metlinkpid.py index 8b65403..34d8559 100644 --- a/metlinkpid.py +++ b/metlinkpid.py @@ -345,7 +345,7 @@ class Message(ABC): @classmethod @abstractmethod - def marker(cls) -> bytes: + def marker(cls, address: bytes) -> bytes: """ The :class:`bytes` that a raw byte representation must start with in order to possibly be an instance of this :class:`Message` subclass. @@ -353,7 +353,7 @@ def marker(cls) -> bytes: @classmethod @abstractmethod - def from_bytes(cls, bytes_in: bytes) -> 'Message': + def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'Message': """ Construct an instance of this :class:`Message` subclass from a raw byte representation (not including the CRC-checksumming and packet-framing required for transmission). @@ -384,23 +384,24 @@ class PingMessage(Message): """ unspecified_byte: int = attr.ib(default=0x6F) + address: bytes = attr.ib(default=b'\x01') @classmethod - def marker(cls) -> bytes: - return b'\x01\x50' + def marker(cls, address: bytes) -> bytes: + return address + b'\x50' @classmethod - def from_bytes(cls, bytes_in: bytes) -> 'PingMessage': - if not bytes_in.startswith(cls.marker()): + def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'PingMessage': + if not bytes_in.startswith(cls.marker(address)): raise ValueError('incorrect header for PingMessage') if len(bytes_in) < 3: raise ValueError('unexpected end of data') if len(bytes_in) > 3: raise ValueError('unexpected data') - return PingMessage(unspecified_byte=bytes_in[2]) + return PingMessage(unspecified_byte=bytes_in[2], address=address) def to_bytes(self) -> bytes: - return self.marker() + bytes([self.unspecified_byte]) + return self.marker(self.address) + bytes([self.unspecified_byte]) @attr.s(frozen=True) @@ -422,14 +423,15 @@ class ResponseMessage(Message): """ unspecified_byte: int = attr.ib() + address: bytes = attr.ib(default=b'\x01') @classmethod - def marker(cls) -> bytes: - return b'\x01\x52' + def marker(cls, address: bytes) -> bytes: + return address + b'\x52' @classmethod - def from_bytes(cls, bytes_in: bytes) -> 'ResponseMessage': - if not bytes_in.startswith(cls.marker()): + def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'ResponseMessage': + if not bytes_in.startswith(cls.marker(address)): raise ValueError('incorrect header for ResponseMessage') if len(bytes_in) < 4: raise ValueError('unexpected end of data') @@ -437,10 +439,10 @@ def from_bytes(cls, bytes_in: bytes) -> 'ResponseMessage': raise ValueError('unexpected data') if not bytes_in.endswith(b'\x00'): raise ValueError('unexpected value at offset 3: {!r}'.format(bytes([bytes_in[-1]]))) - return ResponseMessage(unspecified_byte=bytes_in[2]) + return ResponseMessage(unspecified_byte=bytes_in[2], address=address) def to_bytes(self) -> bytes: - return self.marker() + bytes([self.unspecified_byte]) + b'\x00' + return self.marker(self.address) + bytes([self.unspecified_byte]) + b'\x00' @attr.s(frozen=True, repr=False) @@ -461,15 +463,16 @@ class DisplayMessage(Message): a :class:`tuple` of :class:`Page` objects comprising the message. """ pages: Sequence[Page] = attr.ib(converter=tuple) + address: bytes = attr.ib(default=b'\x01') _PAGE_SEP = '|' @classmethod - def marker(cls) -> bytes: - return b'\x01\x44\x00' + def marker(cls, address: bytes) -> bytes: + return address + b'\x44\x00' @classmethod - def from_str(cls, string: str) -> 'DisplayMessage': + def from_str(cls, string: str, address: bytes) -> 'DisplayMessage': """ Construct a :class:`DisplayMessage` object from a string representation. @@ -513,10 +516,10 @@ def from_str(cls, string: str) -> 'DisplayMessage': default_animate=PageAnimate.VSCROLL if index == 0 else PageAnimate.HSCROLL ) for index, page_str in enumerate(string.split(cls._PAGE_SEP)) - )) + ), address=address) @classmethod - def from_bytes(cls, bytes_in: bytes) -> 'DisplayMessage': + def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'DisplayMessage': """ Construct a :class:`DisplayMessage` object from a raw byte representation (not including the CRC-checksumming and packet-framing required for transmission). @@ -527,8 +530,8 @@ def from_bytes(cls, bytes_in: bytes) -> 'DisplayMessage': :raise ValueError: if the bytes could not be understood. """ - if not bytes_in.startswith(cls.marker()): - raise ValueError(f'data must start with {cls.marker()!r}') + if not bytes_in.startswith(cls.marker(address)): + raise ValueError(f'data must start with {cls.marker(address)!r}') index = len(cls.marker()) pages = [] while True: @@ -544,7 +547,7 @@ def from_bytes(cls, bytes_in: bytes) -> 'DisplayMessage': index = end + 1 if index == len(bytes_in): break - return DisplayMessage(pages=pages) + return DisplayMessage(pages=pages, address=address) def __str__(self) -> str: """ @@ -560,7 +563,7 @@ def to_bytes(self) -> bytes: The raw byte representation of the :class:`DisplayMessage` as understood by the display (not including the CRC-checksumming and packet-framing required for transmission). """ - return self.marker() + b'\x0D\x01'.join(page.to_bytes() for page in self.pages) + b'\x0D' + return self.marker(self.address) + b'\x0D\x01'.join(page.to_bytes() for page in self.pages) + b'\x0D' def __repr__(self) -> str: return 'DisplayMessage.from_str({!r})'.format(str(self)) @@ -602,13 +605,18 @@ class PID(AbstractContextManager): whenever :meth:`PID.send` is called. Defaults to ``False``. + :param address: + the address of the PID. Allows for one controller + to control multiple PIDs. + """ serial: Serial = attr.ib() ignore_responses: bool = attr.ib(False) + address: bytes = attr.ib(default=b'\x01') @classmethod - def for_device(cls, port: str, ignore_responses: bool = False) -> 'PID': + def for_device(cls, port: str, ignore_responses: bool = False, address: bytes = b'\x01') -> 'PID': """ Construct a :class:`PID` object connected to the specified serial device with a correctly configured :class:`serial.Serial` object. @@ -631,10 +639,14 @@ def for_device(cls, port: str, ignore_responses: bool = False) -> 'PID': whenever :meth:`PID.send` is called. Defaults to ``False``. + :param address: + the address of the PID. Allows for one controller + to control multiple PIDs. + :raise serial.SerialException: if the serial device can't be found or can't be configured. """ - return PID(serial=Serial(port=port, timeout=0.5), ignore_responses=ignore_responses) + return PID(serial=Serial(port=port, timeout=0.5), ignore_responses=ignore_responses, address=address) def send(self, data: Union[str, Message, bytes]) -> None: """ @@ -667,7 +679,7 @@ def send(self, data: Union[str, Message, bytes]) -> None: such as the serial port being closed. """ if isinstance(data, str): - data = DisplayMessage.from_str(data) + data = DisplayMessage.from_str(data, self.address) if isinstance(data, Message): data = data.to_bytes() # noinspection PyUnusedLocal @@ -682,7 +694,7 @@ def send(self, data: Union[str, Message, bytes]) -> None: response = _uncrc(dlestxetx.read(self.serial)) if self.serial.in_waiting: raise NotImplementedError('more data came back from the PID device than expected') - if not isinstance(inspect(response), ResponseMessage): + if not isinstance(inspect(response, self.address), ResponseMessage): raise NotImplementedError('unexpected response from the PID device: {!r}'.format(response)) def ping(self) -> None: @@ -706,7 +718,7 @@ def ping(self) -> None: if any other serial device error occurs, such as the serial port being closed. """ - self.send(PingMessage()) + self.send(PingMessage(address=self.address)) def close(self) -> None: """ @@ -738,7 +750,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> bool: return False -def inspect(bytes_in: bytes) -> Message: +def inspect(bytes_in: bytes, address: bytes) -> Message: """ The :func:`inspect` function is used to determine how the display would interpret an arbitrary sequence of bytes. @@ -769,7 +781,7 @@ def inspect(bytes_in: bytes) -> Message: # noinspection PyUnusedLocal cls: Type[Message] for cls in Message.__subclasses__(): - if bytes_in.startswith(cls.marker()): + if bytes_in.startswith(cls.marker(address)): return cls.from_bytes(bytes_in) raise ValueError('unrecognised byte sequence')