Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to control multiple PIDs using different addresses #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions metlinkpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,15 @@ class Message(ABC):

@classmethod
@abstractmethod
def marker(cls) -> bytes:
def marker(cls, address: bytes) -> bytes:
"""
The :class:`bytes` that a raw byte representation must start with
in order to possibly be an instance of this :class:`Message` subclass.
"""

@classmethod
@abstractmethod
def from_bytes(cls, bytes_in: bytes) -> 'Message':
def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'Message':
"""
Construct an instance of this :class:`Message` subclass from a raw byte representation
(not including the CRC-checksumming and packet-framing required for transmission).
Expand Down Expand Up @@ -384,23 +384,24 @@ class PingMessage(Message):
"""

unspecified_byte: int = attr.ib(default=0x6F)
address: bytes = attr.ib(default=b'\x01')

@classmethod
def marker(cls) -> bytes:
return b'\x01\x50'
def marker(cls, address: bytes) -> bytes:
return address + b'\x50'

@classmethod
def from_bytes(cls, bytes_in: bytes) -> 'PingMessage':
if not bytes_in.startswith(cls.marker()):
def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'PingMessage':
if not bytes_in.startswith(cls.marker(address)):
raise ValueError('incorrect header for PingMessage')
if len(bytes_in) < 3:
raise ValueError('unexpected end of data')
if len(bytes_in) > 3:
raise ValueError('unexpected data')
return PingMessage(unspecified_byte=bytes_in[2])
return PingMessage(unspecified_byte=bytes_in[2], address=address)

def to_bytes(self) -> bytes:
return self.marker() + bytes([self.unspecified_byte])
return self.marker(self.address) + bytes([self.unspecified_byte])


@attr.s(frozen=True)
Expand All @@ -422,25 +423,26 @@ class ResponseMessage(Message):
"""

unspecified_byte: int = attr.ib()
address: bytes = attr.ib(default=b'\x01')

@classmethod
def marker(cls) -> bytes:
return b'\x01\x52'
def marker(cls, address: bytes) -> bytes:
return address + b'\x52'

@classmethod
def from_bytes(cls, bytes_in: bytes) -> 'ResponseMessage':
if not bytes_in.startswith(cls.marker()):
def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'ResponseMessage':
if not bytes_in.startswith(cls.marker(address)):
raise ValueError('incorrect header for ResponseMessage')
if len(bytes_in) < 4:
raise ValueError('unexpected end of data')
if len(bytes_in) > 4:
raise ValueError('unexpected data')
if not bytes_in.endswith(b'\x00'):
raise ValueError('unexpected value at offset 3: {!r}'.format(bytes([bytes_in[-1]])))
return ResponseMessage(unspecified_byte=bytes_in[2])
return ResponseMessage(unspecified_byte=bytes_in[2], address=address)

def to_bytes(self) -> bytes:
return self.marker() + bytes([self.unspecified_byte]) + b'\x00'
return self.marker(self.address) + bytes([self.unspecified_byte]) + b'\x00'


@attr.s(frozen=True, repr=False)
Expand All @@ -461,15 +463,16 @@ class DisplayMessage(Message):
a :class:`tuple` of :class:`Page` objects comprising the message.
"""
pages: Sequence[Page] = attr.ib(converter=tuple)
address: bytes = attr.ib(default=b'\x01')

_PAGE_SEP = '|'

@classmethod
def marker(cls) -> bytes:
return b'\x01\x44\x00'
def marker(cls, address: bytes) -> bytes:
return address + b'\x44\x00'

@classmethod
def from_str(cls, string: str) -> 'DisplayMessage':
def from_str(cls, string: str, address: bytes) -> 'DisplayMessage':
"""
Construct a :class:`DisplayMessage` object from a string representation.

Expand Down Expand Up @@ -513,10 +516,10 @@ def from_str(cls, string: str) -> 'DisplayMessage':
default_animate=PageAnimate.VSCROLL if index == 0 else PageAnimate.HSCROLL
)
for index, page_str in enumerate(string.split(cls._PAGE_SEP))
))
), address=address)

@classmethod
def from_bytes(cls, bytes_in: bytes) -> 'DisplayMessage':
def from_bytes(cls, bytes_in: bytes, address: bytes) -> 'DisplayMessage':
"""
Construct a :class:`DisplayMessage` object from a raw byte representation
(not including the CRC-checksumming and packet-framing required for transmission).
Expand All @@ -527,8 +530,8 @@ def from_bytes(cls, bytes_in: bytes) -> 'DisplayMessage':
:raise ValueError:
if the bytes could not be understood.
"""
if not bytes_in.startswith(cls.marker()):
raise ValueError(f'data must start with {cls.marker()!r}')
if not bytes_in.startswith(cls.marker(address)):
raise ValueError(f'data must start with {cls.marker(address)!r}')
index = len(cls.marker())
pages = []
while True:
Expand All @@ -544,7 +547,7 @@ def from_bytes(cls, bytes_in: bytes) -> 'DisplayMessage':
index = end + 1
if index == len(bytes_in):
break
return DisplayMessage(pages=pages)
return DisplayMessage(pages=pages, address=address)

def __str__(self) -> str:
"""
Expand All @@ -560,7 +563,7 @@ def to_bytes(self) -> bytes:
The raw byte representation of the :class:`DisplayMessage` as understood by the display
(not including the CRC-checksumming and packet-framing required for transmission).
"""
return self.marker() + b'\x0D\x01'.join(page.to_bytes() for page in self.pages) + b'\x0D'
return self.marker(self.address) + b'\x0D\x01'.join(page.to_bytes() for page in self.pages) + b'\x0D'

def __repr__(self) -> str:
return 'DisplayMessage.from_str({!r})'.format(str(self))
Expand Down Expand Up @@ -602,13 +605,18 @@ class PID(AbstractContextManager):
whenever :meth:`PID.send` is called.
Defaults to ``False``.

:param address:
the address of the PID. Allows for one controller
to control multiple PIDs.

"""

serial: Serial = attr.ib()
ignore_responses: bool = attr.ib(False)
address: bytes = attr.ib(default=b'\x01')

@classmethod
def for_device(cls, port: str, ignore_responses: bool = False) -> 'PID':
def for_device(cls, port: str, ignore_responses: bool = False, address: bytes = b'\x01') -> 'PID':
"""
Construct a :class:`PID` object connected to the specified serial device
with a correctly configured :class:`serial.Serial` object.
Expand All @@ -631,10 +639,14 @@ def for_device(cls, port: str, ignore_responses: bool = False) -> 'PID':
whenever :meth:`PID.send` is called.
Defaults to ``False``.

:param address:
the address of the PID. Allows for one controller
to control multiple PIDs.

:raise serial.SerialException:
if the serial device can't be found or can't be configured.
"""
return PID(serial=Serial(port=port, timeout=0.5), ignore_responses=ignore_responses)
return PID(serial=Serial(port=port, timeout=0.5), ignore_responses=ignore_responses, address=address)

def send(self, data: Union[str, Message, bytes]) -> None:
"""
Expand Down Expand Up @@ -667,7 +679,7 @@ def send(self, data: Union[str, Message, bytes]) -> None:
such as the serial port being closed.
"""
if isinstance(data, str):
data = DisplayMessage.from_str(data)
data = DisplayMessage.from_str(data, self.address)
if isinstance(data, Message):
data = data.to_bytes()
# noinspection PyUnusedLocal
Expand All @@ -682,7 +694,7 @@ def send(self, data: Union[str, Message, bytes]) -> None:
response = _uncrc(dlestxetx.read(self.serial))
if self.serial.in_waiting:
raise NotImplementedError('more data came back from the PID device than expected')
if not isinstance(inspect(response), ResponseMessage):
if not isinstance(inspect(response, self.address), ResponseMessage):
raise NotImplementedError('unexpected response from the PID device: {!r}'.format(response))

def ping(self) -> None:
Expand All @@ -706,7 +718,7 @@ def ping(self) -> None:
if any other serial device error occurs,
such as the serial port being closed.
"""
self.send(PingMessage())
self.send(PingMessage(address=self.address))

def close(self) -> None:
"""
Expand Down Expand Up @@ -738,7 +750,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> bool:
return False


def inspect(bytes_in: bytes) -> Message:
def inspect(bytes_in: bytes, address: bytes) -> Message:
"""
The :func:`inspect` function is used
to determine how the display would interpret an arbitrary sequence of bytes.
Expand Down Expand Up @@ -769,7 +781,7 @@ def inspect(bytes_in: bytes) -> Message:
# noinspection PyUnusedLocal
cls: Type[Message]
for cls in Message.__subclasses__():
if bytes_in.startswith(cls.marker()):
if bytes_in.startswith(cls.marker(address)):
return cls.from_bytes(bytes_in)
raise ValueError('unrecognised byte sequence')

Expand Down