From 4d15123e5ac6220c315f681b47518b778fcb9bd7 Mon Sep 17 00:00:00 2001 From: Pier-Yves Lessard Date: Wed, 3 Jan 2024 11:47:10 -0500 Subject: [PATCH] Support asymmetric addresses (#107) --- doc/source/isotp/addressing.rst | 65 +++++ doc/source/isotp/examples.rst | 24 +- isotp/__init__.py | 2 +- isotp/address.py | 483 +++++++++++++++++++++++--------- isotp/protocol.py | 56 ++-- isotp/tpsock/__init__.py | 29 +- test/test_addressing_modes.py | 280 +++++++++++++++++- test/test_layer_vs_socket.py | 35 ++- test/test_socket.py | 10 + test/test_transport_layer.py | 98 +++++++ 10 files changed, 891 insertions(+), 191 deletions(-) diff --git a/doc/source/isotp/addressing.rst b/doc/source/isotp/addressing.rst index 0c60997..1522a56 100755 --- a/doc/source/isotp/addressing.rst +++ b/doc/source/isotp/addressing.rst @@ -158,3 +158,68 @@ Example : 0x18CE55AA [8] 99 10 0A 00 01 02 03 // First frame 0x18CEAA55 [5] 99 30 00 08 00 // Flow control 0x18CE55AA [8] 99 21 04 05 06 07 08 09 // consecutive frame + +------ + + +Asymmetric addresses +-------------------- + +It is possible to send and receive with different address schemes. The :class:`AsymmetricAddress` serves that purpose + +.. autoclass:: isotp.AsymmetricAddress + +When using an asymmetric, both ``tx_addr`` and ``rx_addr`` must be partial addresses, meaning that either ``tx_only=True`` or ``rx_only=True`` is set. +Address object instantiated with ``rx_only=True`` will not expect parameter meant for transmission and conversely, when instantiated with ``tx_only=True`` +parameters required for reception won't be needed. + + +Example : + + - Transmission (``NormalFixed_29bits``): + + - source_address : 0x55 + - target_address : 0xAA + + - Reception (``Mixed_11bits``) + + - rxid : 0x123 + - address_extension : 0x99 + +.. code-block:: python + + import isotp + address = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=ta, source_address=sa, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, address_extension=0x99, rx_only=True) # txid is not required + ) + + +:: + + // Reception of a 10 bytes payload + 0x123 [8] 99 10 0A 00 01 02 03 04 // First frame + 0x18DAAA55 [4] 30 00 08 00 // Flow control + 0x123 [7] 99 21 05 06 07 08 09 // Consecutive frame + + +The following table indicates the required parameter to construct a :class:`Address` object for all possible scenario + +.. csv-table:: :class:`Address` required parameters + :header: "Addressing mode", "Full address", "Partial Tx (``tx_only=True``)", "Partial Rx (``rx_only=True``)" + + "Normal_11bits", "``rxid`` ``txid``", "``txid``", "``rxid``" + "Normal_29bits", "``rxid`` ``txid``", "``txid``", "``rxid``" + "NormalFixed_29bits", "``source_address`` ``target_address``", "``source_address`` ``target_address``", "``source_address`` ``target_address``" + "Extended_11bits", "``txid`` ``target_address`` ``rxid`` ``source_address``", "``txid`` ``target_address``", "``rxid`` ``source_address``" + "Extended_29bits", "``txid`` ``target_address`` ``rxid`` ``source_address``", "``txid`` ``target_address``", "``rxid`` ``source_address``" + "Mixed_11bits", "``rxid`` ``txid`` ``address_extension``", "``txid`` ``address_extension``", "``rxid`` ``address_extension``" + "Mixed_29bits", "``source_address`` ``target_address`` ``address_extension``", "``source_address`` ``target_address`` ``address_extension``", "``source_address`` ``target_address`` ``address_extension``" + + + + + + + + diff --git a/doc/source/isotp/examples.rst b/doc/source/isotp/examples.rst index 7202dfb..e7afd13 100755 --- a/doc/source/isotp/examples.rst +++ b/doc/source/isotp/examples.rst @@ -81,15 +81,21 @@ Different type of addresses .. code-block:: python - import isotp - - isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456) - isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=0x123456, txid=0x789ABC) - isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x11, target_address=0x22) - isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=0x123, txid=0x456, source_address=0x55, target_address=0xAA) - isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=0x123456, txid=0x789ABC, source_address=0x55, target_address=0xAA) - isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, txid=0x456, address_extension=0x99) - isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x11, target_address=0x22, address_extension=0x99) + import isotp + + isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456) + isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=0x123456, txid=0x789ABC) + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x11, target_address=0x22) + isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=0x123, txid=0x456, source_address=0x55, target_address=0xAA) + isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=0x123456, txid=0x789ABC, source_address=0x55, target_address=0xAA) + isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, txid=0x456, address_extension=0x99) + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x11, target_address=0x22, address_extension=0x99) + + # Asymmetric Addresses + isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=ta, source_address=sa, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, address_extension=0x99, rx_only=True) # txid is not required + ) ------ diff --git a/isotp/__init__.py b/isotp/__init__.py index aaf813c..2aecf6c 100755 --- a/isotp/__init__.py +++ b/isotp/__init__.py @@ -2,6 +2,6 @@ from isotp.errors import * from isotp.can_message import CanMessage -from isotp.address import AddressingMode, TargetAddressType, Address +from isotp.address import AddressingMode, TargetAddressType, Address, AsymmetricAddress from isotp.protocol import TransportLayerLogic, TransportLayer, CanStack, NotifierBasedCanStack from isotp.tpsock import socket diff --git a/isotp/address.py b/isotp/address.py index 0fa7d15..e5f80d9 100755 --- a/isotp/address.py +++ b/isotp/address.py @@ -2,6 +2,7 @@ from enum import Enum from isotp import CanMessage +import abc from typing import Optional, Any, List, Callable, Dict, Tuple, Union @@ -25,7 +26,58 @@ class TargetAddressType(Enum): Functional = 1 # 1 to n communication -class Address: +class AbstractAddress(abc.ABC): + + @abc.abstractmethod + def get_tx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def requires_tx_extension_byte(self) -> bool: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def requires_rx_extension_byte(self) -> bool: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def get_tx_extension_byte(self) -> Optional[int]: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def get_rx_extension_byte(self) -> Optional[int]: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def is_tx_29bits(self) -> bool: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def is_rx_29bits(self) -> bool: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def is_for_me(self, msg: CanMessage) -> bool: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def get_rx_prefix_size(self) -> int: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def get_tx_payload_prefix(self) -> bytes: + raise NotImplementedError("Abstract method") + + @abc.abstractmethod + def is_partial_address(self) -> bool: + raise NotImplementedError("Abstract method") + + +class Address(AbstractAddress): """ Represents the addressing information (N_AI) of the IsoTP layer. Will define what messages will be received and how to craft transmitted message to reach a specific party. @@ -56,22 +108,30 @@ class Address: :param address_extension: Address extension (N_AE) used in ``Mixed_11bits``, ``Mixed_29bits`` addressing mode :type address_extension: int | None + + :param rx_only: When using :class:`AsymmetricAddress`, indicates that this address is the RX part, disabling validation of TX part + :type rx_only: bool + + :param tx_only: When using :class:`AsymmetricAddress`, indicates that this address is the TX part, disabling validation of RX part + :type tx_only: bool + """ - addressing_mode: AddressingMode - target_address: Optional[int] - source_address: Optional[int] - address_extension: Optional[int] - txid: Optional[int] - rxid: Optional[int] - is_29bits: bool - tx_arbitration_id_physical: int - tx_arbitration_id_functional: int - rx_arbitration_id_physical: int - rx_arbitration_id_functional: int - tx_payload_prefix: bytes - rx_prefix_size: int - is_for_me: Callable[[CanMessage], bool] + _addressing_mode: AddressingMode + _target_address: Optional[int] + _source_address: Optional[int] + _address_extension: Optional[int] + _txid: Optional[int] + _rxid: Optional[int] + _is_29bits: bool + _tx_arbitration_id_physical: int + _tx_arbitration_id_functional: int + _rx_arbitration_id_physical: int + _rx_arbitration_id_functional: int + _tx_payload_prefix: bytes + _rx_prefix_size: int + _rx_only: bool + _tx_only: bool def __init__(self, addressing_mode: AddressingMode = AddressingMode.Normal_11bits, @@ -82,220 +142,367 @@ def __init__(self, physical_id: Optional[int] = None, functional_id: Optional[int] = None, address_extension: Optional[int] = None, - **kwargs + rx_only: bool = False, + tx_only: bool = False ): - self.addressing_mode = addressing_mode - self.target_address = target_address - self.source_address = source_address - self.address_extension = address_extension - self.txid = txid - self.rxid = rxid - self.is_29bits = True if self.addressing_mode in [ + self._rx_only = rx_only + self._tx_only = tx_only + self._addressing_mode = addressing_mode + self._target_address = target_address + self._source_address = source_address + self._address_extension = address_extension + self._txid = txid + self._rxid = rxid + self._is_29bits = True if self._addressing_mode in [ AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_29bits] else False - if self.addressing_mode == AddressingMode.NormalFixed_29bits: + if self._addressing_mode == AddressingMode.NormalFixed_29bits: self.physical_id = 0x18DA0000 if physical_id is None else physical_id & 0x1FFF0000 self.functional_id = 0x18DB0000 if functional_id is None else functional_id & 0x1FFF0000 - if self.addressing_mode == AddressingMode.Mixed_29bits: + if self._addressing_mode == AddressingMode.Mixed_29bits: self.physical_id = 0x18CE0000 if physical_id is None else physical_id & 0x1FFF0000 self.functional_id = 0x18CD0000 if functional_id is None else functional_id & 0x1FFF0000 self.validate() # From here, input is good. Do some precomputing for speed optimization without bothering about types or values - self.tx_arbitration_id_physical = self._get_tx_arbitration_id(TargetAddressType.Physical) - self.tx_arbitration_id_functional = self._get_tx_arbitration_id(TargetAddressType.Functional) - - self.rx_arbitration_id_physical = self._get_rx_arbitration_id(TargetAddressType.Physical) - self.rx_arbitration_id_functional = self._get_rx_arbitration_id(TargetAddressType.Functional) - - self.tx_payload_prefix = bytes() - self.rx_prefix_size = 0 - - if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: - assert self.target_address is not None - self.tx_payload_prefix = bytes([self.target_address]) - self.rx_prefix_size = 1 - elif self.addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: - assert self.address_extension is not None - self.tx_payload_prefix = bytes([self.address_extension]) - self.rx_prefix_size = 1 - - if self.addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: - self.is_for_me = self._is_for_me_normal - elif self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: - self.is_for_me = self._is_for_me_extended - elif self.addressing_mode == AddressingMode.NormalFixed_29bits: - self.is_for_me = self._is_for_me_normal_fixed - elif self.addressing_mode == AddressingMode.Mixed_11bits: - self.is_for_me = self._is_for_me_mixed_11bits - elif self.addressing_mode == AddressingMode.Mixed_29bits: - self.is_for_me = self._is_for_me_mixed_29bits - else: - raise RuntimeError('This exception should never be raised.') + self._tx_payload_prefix = bytes() + self._rx_prefix_size = 0 + + if not self._tx_only: # Rx supported + self._rx_arbitration_id_physical = self._get_rx_arbitration_id(TargetAddressType.Physical) + self._rx_arbitration_id_functional = self._get_rx_arbitration_id(TargetAddressType.Functional) + + if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: + self._rx_prefix_size = 1 + + if not self._rx_only: # Tx supported + self._tx_arbitration_id_physical = self._get_tx_arbitration_id(TargetAddressType.Physical) + self._tx_arbitration_id_functional = self._get_tx_arbitration_id(TargetAddressType.Functional) + + if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: + assert self._target_address is not None + self._tx_payload_prefix = bytes([self._target_address]) + elif self._addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: + assert self._address_extension is not None + self._tx_payload_prefix = bytes([self._address_extension]) + + if not self._tx_only: + if self._addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: + setattr(self, 'is_for_me', self._is_for_me_normal) + elif self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: + setattr(self, 'is_for_me', self._is_for_me_extended) + elif self._addressing_mode == AddressingMode.NormalFixed_29bits: + setattr(self, 'is_for_me', self._is_for_me_normal_fixed) + elif self._addressing_mode == AddressingMode.Mixed_11bits: + setattr(self, 'is_for_me', self._is_for_me_mixed_11bits) + elif self._addressing_mode == AddressingMode.Mixed_29bits: + setattr(self, 'is_for_me', self._is_for_me_mixed_29bits) + else: + raise RuntimeError('This exception should never be raised.') + + def not_implemented_func_with_partial(*args, **kwargs): + raise NotImplementedError("Not possible with partial address") + + # Remove unavailable functions to be strict + if self._tx_only: + setattr(self, 'get_rx_arbitration_id', not_implemented_func_with_partial) + setattr(self, 'requires_rx_extension_byte', not_implemented_func_with_partial) + setattr(self, 'get_rx_extension_byte', not_implemented_func_with_partial) + setattr(self, 'is_rx_29bits', not_implemented_func_with_partial) + setattr(self, 'is_for_me', not_implemented_func_with_partial) + setattr(self, 'get_rx_prefix_size', not_implemented_func_with_partial) + + if self._rx_only: + setattr(self, 'get_tx_arbitration_id', not_implemented_func_with_partial) + setattr(self, 'requires_tx_extension_byte', not_implemented_func_with_partial) + setattr(self, 'get_tx_extension_byte', not_implemented_func_with_partial) + setattr(self, 'is_tx_29bits', not_implemented_func_with_partial) + setattr(self, 'get_tx_payload_prefix', not_implemented_func_with_partial) def validate(self): - if self.addressing_mode not in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: + if self._rx_only and self._tx_only: + raise ValueError("Address cannot be tx only and rx only") + + if self._addressing_mode not in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: raise ValueError('Addressing mode is not valid') - if self.addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: - if self.rxid is None or self.txid is None: - raise ValueError('txid and rxid must be specified for Normal addressing mode (11 or 29 bits ID)') - if self.rxid == self.txid: + if self._addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: + if self._rxid is None and not self._tx_only: + raise ValueError('rxid must be specified for Normal addressing mode (11 or 29 bits ID)') + if self._txid is None and not self._rx_only: + raise ValueError('txid must be specified for Normal addressing mode (11 or 29 bits ID)') + if self._rxid == self._txid: raise ValueError('txid and rxid must be different for Normal addressing mode') - elif self.addressing_mode == AddressingMode.NormalFixed_29bits: - if self.target_address is None or self.source_address is None: + elif self._addressing_mode == AddressingMode.NormalFixed_29bits: + if self._target_address is None or self._source_address is None: raise ValueError('target_address and source_address must be specified for Normal Fixed addressing (29 bits ID)') - elif self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: - if self.target_address is None or self.rxid is None or self.txid is None: - raise ValueError('target_address, rxid and txid must be specified for Extended addressing mode (11 or 29 bits ID)') - if self.rxid == self.txid: + elif self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: + if not self._rx_only: + if self._target_address is None or self._txid is None: + raise ValueError('target_address and txid must be specified for Extended addressing mode (11 or 29 bits ID)') + + if not self._tx_only: + if self._source_address is None or self._rxid is None: + raise ValueError('source_address and rxid must be specified for Extended addressing mode (11 or 29 bits ID)') + + if self._rxid == self._txid: raise ValueError('txid and rxid must be different') - elif self.addressing_mode == AddressingMode.Mixed_11bits: - if self.rxid is None or self.txid is None or self.address_extension is None: - raise ValueError('rxid, txid and address_extension must be specified for Mixed addressing mode (11 bits ID)') + elif self._addressing_mode == AddressingMode.Mixed_11bits: + if self._address_extension is None: + raise ValueError('address_extension must be specified for Mixed addressing mode (11 bits ID)') - elif self.addressing_mode == AddressingMode.Mixed_29bits: - if self.target_address is None or self.source_address is None or self.address_extension is None: + if self._rxid is None and not self._tx_only: + raise ValueError('rxid must be specified for Mixed addressing mode (11 bits ID)') + if self._txid is None and not self._rx_only: + raise ValueError('txid must be specified for Mixed addressing mode (11 bits ID)') + if self._rxid == self._txid: + raise ValueError('txid and rxid must be different for Mixed addressing mode (11 bits ID)') + + elif self._addressing_mode == AddressingMode.Mixed_29bits: + # partial or full address requires all 3 params. + if self._target_address is None or self._source_address is None or self._address_extension is None: raise ValueError('target_address, source_address and address_extension must be specified for Mixed addressing mode (29 bits ID)') - if self.target_address is not None: - if not isinstance(self.target_address, int): + if self._target_address is not None: + if not isinstance(self._target_address, int): raise ValueError('target_address must be an integer') - if self.target_address < 0 or self.target_address > 0xFF: + if self._target_address < 0 or self._target_address > 0xFF: raise ValueError('target_address must be an integer between 0x00 and 0xFF') - if self.source_address is not None: - if not isinstance(self.source_address, int): + if self._source_address is not None: + if not isinstance(self._source_address, int): raise ValueError('source_address must be an integer') - if self.source_address < 0 or self.source_address > 0xFF: + if self._source_address < 0 or self._source_address > 0xFF: raise ValueError('source_address must be an integer between 0x00 and 0xFF') - if self.address_extension is not None: - if not isinstance(self.address_extension, int): + if self._address_extension is not None: + if not isinstance(self._address_extension, int): raise ValueError('source_address must be an integer') - if self.address_extension < 0 or self.address_extension > 0xFF: + if self._address_extension < 0 or self._address_extension > 0xFF: raise ValueError('address_extension must be an integer between 0x00 and 0xFF') - if self.txid is not None: - if not isinstance(self.txid, int): + if self._txid is not None: + if not isinstance(self._txid, int): raise ValueError('txid must be an integer') - if self.txid < 0: + if self._txid < 0: raise ValueError('txid must be greater than 0') - if not self.is_29bits: - if self.txid > 0x7FF: + if not self._is_29bits: + if self._txid > 0x7FF: raise ValueError('txid must be smaller than 0x7FF for 11 bits identifier') - if self.rxid is not None: - if not isinstance(self.rxid, int): + if self._rxid is not None: + if not isinstance(self._rxid, int): raise ValueError('rxid must be an integer') - if self.rxid < 0: + if self._rxid < 0: raise ValueError('rxid must be greater than 0') - if not self.is_29bits: - if self.rxid > 0x7FF: + if not self._is_29bits: + if self._rxid > 0x7FF: raise ValueError('rxid must be smaller than 0x7FF for 11 bits identifier') + def is_partial_address(self) -> bool: + return self._tx_only or self._rx_only + + def is_tx_only(self) -> bool: + return self._tx_only + + def is_rx_only(self) -> bool: + return self._rx_only + + def get_rx_prefix_size(self) -> int: + return self._rx_prefix_size + + def get_tx_payload_prefix(self) -> bytes: + return self._tx_payload_prefix + + def is_for_me(self, msg: CanMessage) -> bool: + raise NotImplementedError("is_for_me should be overriden in constructor") + def get_tx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: if address_type == TargetAddressType.Physical: - return self.tx_arbitration_id_physical + return self._tx_arbitration_id_physical else: - return self.tx_arbitration_id_functional + return self._tx_arbitration_id_functional def get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: if address_type == TargetAddressType.Physical: - return self.rx_arbitration_id_physical + return self._rx_arbitration_id_physical else: - return self.rx_arbitration_id_functional + return self._rx_arbitration_id_functional def _get_tx_arbitration_id(self, address_type: TargetAddressType) -> int: - if self.addressing_mode in (AddressingMode.Normal_11bits, - AddressingMode.Normal_29bits, - AddressingMode.Extended_11bits, - AddressingMode.Extended_29bits, - AddressingMode.Mixed_11bits): - assert self.txid is not None - return self.txid - elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: - assert self.target_address is not None - assert self.source_address is not None + if self._addressing_mode in (AddressingMode.Normal_11bits, + AddressingMode.Normal_29bits, + AddressingMode.Extended_11bits, + AddressingMode.Extended_29bits, + AddressingMode.Mixed_11bits): + assert self._txid is not None + return self._txid + elif self._addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: + assert self._target_address is not None + assert self._source_address is not None bits28_16 = self.physical_id if address_type == TargetAddressType.Physical else self.functional_id - return bits28_16 | (self.target_address << 8) | self.source_address + return bits28_16 | (self._target_address << 8) | self._source_address raise ValueError("Unsupported addressing mode") def _get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: - if self.addressing_mode in (AddressingMode.Normal_11bits, - AddressingMode.Normal_29bits, - AddressingMode.Extended_11bits, - AddressingMode.Extended_29bits, - AddressingMode.Mixed_11bits): - assert self.rxid is not None - return self.rxid - elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: - assert self.target_address is not None - assert self.source_address is not None + if self._addressing_mode in (AddressingMode.Normal_11bits, + AddressingMode.Normal_29bits, + AddressingMode.Extended_11bits, + AddressingMode.Extended_29bits, + AddressingMode.Mixed_11bits): + assert self._rxid is not None + return self._rxid + elif self._addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: + assert self._target_address is not None + assert self._source_address is not None bits28_16 = self.physical_id if address_type == TargetAddressType.Physical else self.functional_id - return bits28_16 | (self.source_address << 8) | self.target_address + return bits28_16 | (self._source_address << 8) | self._target_address raise ValueError("Unsupported addressing mode") def _is_for_me_normal(self, msg: CanMessage) -> bool: - if self.is_29bits == msg.is_extended_id: - return msg.arbitration_id == self.rxid + if self._is_29bits == msg.is_extended_id: + return msg.arbitration_id == self._rxid return False def _is_for_me_extended(self, msg: CanMessage) -> bool: - if self.is_29bits == msg.is_extended_id: + if self._is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: - return msg.arbitration_id == self.rxid and int(msg.data[0]) == self.source_address + return msg.arbitration_id == self._rxid and int(msg.data[0]) == self._source_address return False def _is_for_me_normal_fixed(self, msg: CanMessage) -> bool: - if self.is_29bits == msg.is_extended_id: - return (msg.arbitration_id & 0x1FFF0000 in [self.physical_id, self.functional_id]) and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address + if self._is_29bits == msg.is_extended_id: + return (msg.arbitration_id & 0x1FFF0000 in [self.physical_id, self.functional_id]) and (msg.arbitration_id & 0xFF00) >> 8 == self._source_address and msg.arbitration_id & 0xFF == self._target_address return False def _is_for_me_mixed_11bits(self, msg: CanMessage) -> bool: - if self.is_29bits == msg.is_extended_id: + if self._is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: - return msg.arbitration_id == self.rxid and int(msg.data[0]) == self.address_extension + return msg.arbitration_id == self._rxid and int(msg.data[0]) == self._address_extension return False def _is_for_me_mixed_29bits(self, msg: CanMessage) -> bool: - if self.is_29bits == msg.is_extended_id: + if self._is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: - return (msg.arbitration_id & 0x1FFF0000) in [self.physical_id, self.functional_id] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address and int(msg.data[0]) == self.address_extension + return (msg.arbitration_id & 0x1FFF0000) in [self.physical_id, self.functional_id] and (msg.arbitration_id & 0xFF00) >> 8 == self._source_address and msg.arbitration_id & 0xFF == self._target_address and int(msg.data[0]) == self._address_extension return False - def requires_extension_byte(self) -> bool: - return True if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits] else False + def _requires_extension_byte(self) -> bool: + return True if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits] else False + + def requires_rx_extension_byte(self) -> bool: + return self._requires_extension_byte() + + def requires_tx_extension_byte(self) -> bool: + return self._requires_extension_byte() def get_tx_extension_byte(self) -> Optional[int]: - if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: - return self.target_address - if self.addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: - return self.address_extension + if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: + return self._target_address + if self._addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: + return self._address_extension return None def get_rx_extension_byte(self) -> Optional[int]: - if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: - return self.source_address - if self.addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: - return self.address_extension + if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: + return self._source_address + if self._addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: + return self._address_extension return None + def is_tx_29bits(self) -> bool: + return self._is_29bits + + def is_rx_29bits(self) -> bool: + return self._is_29bits + def get_content_str(self) -> str: val_dict = {} - keys = ['target_address', 'source_address', 'address_extension', 'txid', 'rxid'] + keys = ['_target_address', '_source_address', '_address_extension', '_txid', '_rxid'] for key in keys: val = getattr(self, key) + if key.startswith('_'): + key = key[1:] if val is not None: val_dict[key] = val vals_str = ', '.join(['%s:0x%02x' % (k, val_dict[k]) for k in val_dict]) - return '[%s - %s]' % (AddressingMode.get_name(self.addressing_mode), vals_str) + return '[%s - %s]' % (AddressingMode.get_name(self._addressing_mode), vals_str) def __repr__(self): return '' % (self.get_content_str(), id(self)) + + +class AsymmetricAddress(AbstractAddress): + """ + Address that uses independent addressing modes for transmission and reception. + + :param tx_addr: The Address object used for transmission + :type tx_addr: :class:`Address` with ``tx_only=True`` + + :param rx_addr: The Address object used for reception + :type rx_addr: :class:`Address` with ``rx_only=True`` + + """ + tx_addr: Address + rx_addr: Address + + def __init__(self, tx_addr: Address, rx_addr: Address): + if not isinstance(rx_addr, Address): + raise ValueError("rx_addr must be an isotp.Address instance") + + if not isinstance(tx_addr, Address): + raise ValueError("tx_addr must be an isotp.Address instance") + + if not tx_addr.is_tx_only(): + raise ValueError("tx_addr must be configured with tx_only=True") + + if not rx_addr.is_rx_only(): + raise ValueError("rx_addr must be configured with rx_only=True") + + self.tx_addr = tx_addr + self.rx_addr = rx_addr + + def get_tx_extension_byte(self) -> Optional[int]: + return self.tx_addr.get_tx_extension_byte() + + def get_rx_extension_byte(self) -> Optional[int]: + return self.rx_addr.get_rx_extension_byte() + + def is_for_me(self, msg: CanMessage) -> bool: + return self.rx_addr.is_for_me(msg) + + def get_tx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: + return self.tx_addr.get_tx_arbitration_id(address_type) + + def get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: + return self.rx_addr.get_rx_arbitration_id(address_type) + + def is_tx_29bits(self) -> bool: + return self.tx_addr.is_tx_29bits() + + def is_rx_29bits(self) -> bool: + return self.rx_addr.is_rx_29bits() + + def requires_tx_extension_byte(self) -> bool: + return self.tx_addr.requires_tx_extension_byte() + + def requires_rx_extension_byte(self) -> bool: + return self.rx_addr.requires_rx_extension_byte() + + def get_rx_prefix_size(self) -> int: + return self.rx_addr.get_rx_prefix_size() + + def get_tx_payload_prefix(self) -> bytes: + return self.tx_addr.get_tx_payload_prefix() + + def is_partial_address(self) -> bool: + return False + + def __repr__(self) -> str: + return f'<{self.__class__.__name__} - RxAddr: {self.rx_addr.__repr__()} - TxAddr: {self.tx_addr.__repr__()}>' diff --git a/isotp/protocol.py b/isotp/protocol.py index 89aa7d2..001e41a 100755 --- a/isotp/protocol.py +++ b/isotp/protocol.py @@ -559,7 +559,7 @@ class ProcessTxReport: timings: Dict[Tuple[RxState, TxState], float] active_send_request: Optional[SendRequest] rx_buffer: bytearray - address: isotp.Address + address: isotp.address.AbstractAddress timer_rx_fc: Timer timer_rx_cf: Timer rate_limiter: RateLimiter @@ -685,7 +685,7 @@ def send(self, if target_address_type == isotp.address.TargetAddressType.Functional: length_bytes = 1 if self.params.tx_data_length == 8 else 2 - maxlen = self.params.tx_data_length - length_bytes - len(self.address.tx_payload_prefix) + maxlen = self.params.tx_data_length - length_bytes - len(self.address.get_tx_payload_prefix()) if send_request.generator.total_length() > maxlen: raise ValueError('Cannot send multi packet frame with Functional TargetAddressType') @@ -844,7 +844,7 @@ def _process_rx(self, msg: CanMessage) -> ProcessRxReport: """Process the reception of a CAN message. Moves the reception state machine accordingly and optionally""" # Decoding of message into PDU try: - pdu = PDU(msg, start_of_data=self.address.rx_prefix_size) + pdu = PDU(msg, start_of_data=self.address.get_rx_prefix_size()) except Exception as e: self._trigger_error(isotp.errors.InvalidCanDataError("Received invalid CAN frame. %s" % (str(e)))) self._stop_receiving() @@ -1015,20 +1015,20 @@ def _process_tx(self) -> ProcessTxReport: read_tx_queue = True # Read another frame from tx_queue self.active_send_request.complete(True) else: - size_on_first_byte = (self.active_send_request.generator.remaining_size() + len(self.address.tx_payload_prefix)) <= 7 + size_on_first_byte = (self.active_send_request.generator.remaining_size() + len(self.address.get_tx_payload_prefix())) <= 7 size_offset = 1 if size_on_first_byte else 2 try: # Single frame total_size = self.active_send_request.generator.total_length() - if total_size <= self.params.tx_data_length - size_offset - len(self.address.tx_payload_prefix): + if total_size <= self.params.tx_data_length - size_offset - len(self.address.get_tx_payload_prefix()): # Will raise if size is not what was requested payload = self.active_send_request.generator.consume(total_size, enforce_exact=True) if size_on_first_byte: - msg_data = self.address.tx_payload_prefix + bytearray([0x0 | len(payload)]) + payload + msg_data = self.address.get_tx_payload_prefix() + bytearray([0x0 | len(payload)]) + payload else: - msg_data = self.address.tx_payload_prefix + bytearray([0x0, len(payload)]) + payload + msg_data = self.address.get_tx_payload_prefix() + bytearray([0x0, len(payload)]) + payload arbitration_id = self.address.get_tx_arbitration_id(self.active_send_request.target_address_type) msg_temp = self._make_tx_msg(arbitration_id, msg_data) @@ -1044,14 +1044,14 @@ def _process_tx(self) -> ProcessTxReport: self.tx_frame_length = total_size encode_length_on_2_first_bytes = True if self.tx_frame_length <= 0xFFF else False if encode_length_on_2_first_bytes: - data_length = self.params.tx_data_length - 2 - len(self.address.tx_payload_prefix) + data_length = self.params.tx_data_length - 2 - len(self.address.get_tx_payload_prefix()) payload = self.active_send_request.generator.consume(data_length, enforce_exact=True) - msg_data = self.address.tx_payload_prefix + \ + msg_data = self.address.get_tx_payload_prefix() + \ bytearray([0x10 | ((self.tx_frame_length >> 8) & 0xF), self.tx_frame_length & 0xFF]) + payload else: - data_length = self.params.tx_data_length - 6 - len(self.address.tx_payload_prefix) + data_length = self.params.tx_data_length - 6 - len(self.address.get_tx_payload_prefix()) payload = self.active_send_request.generator.consume(data_length, enforce_exact=True) - msg_data = self.address.tx_payload_prefix + bytearray([0x10, 0x00, (self.tx_frame_length >> 24) & 0xFF, (self.tx_frame_length >> 16) & 0xFF, ( + msg_data = self.address.get_tx_payload_prefix() + bytearray([0x10, 0x00, (self.tx_frame_length >> 24) & 0xFF, (self.tx_frame_length >> 16) & 0xFF, ( self.tx_frame_length >> 8) & 0xFF, (self.tx_frame_length >> 0) & 0xFF]) + payload arbitration_id = self.address.get_tx_arbitration_id() @@ -1092,13 +1092,13 @@ def _process_tx(self) -> ProcessTxReport: assert self.remote_blocksize is not None assert self.active_send_request is not None if self.timer_tx_stmin.is_timed_out(): - data_length = self.params.tx_data_length - 1 - len(self.address.tx_payload_prefix) + data_length = self.params.tx_data_length - 1 - len(self.address.get_tx_payload_prefix()) payload_length = min(data_length, self.active_send_request.generator.remaining_size()) if payload_length <= allowed_bytes: # We may have less data than requested payload = self.active_send_request.generator.consume(payload_length, enforce_exact=False) if len(payload) > 0: # Corner case. If generator size is a multiple of ll_data_length, we will get an empty payload on last frame. - msg_data = self.address.tx_payload_prefix + bytearray([0x20 | self.tx_seqnum]) + payload + msg_data = self.address.get_tx_payload_prefix() + bytearray([0x20 | self.tx_seqnum]) + payload arbitration_id = self.address.get_tx_arbitration_id() output_msg = self._make_tx_msg(arbitration_id, msg_data) self.tx_seqnum = (self.tx_seqnum + 1) & 0xF @@ -1127,28 +1127,38 @@ def set_sleep_timing(self, idle: float, wait_fc: float) -> None: """ Sets values in seconds that can be passed to ``time.sleep()`` when the stack is processed in a different thread. - :param idle: - :param wait_fc: + :param idle: Time when rx state machine is idle + :type idle: float + + :param wait_fc: Time when rx state machine is waiting for a flow control message + :type wait_fc: float """ self.timings = { (self.RxState.IDLE, self.TxState.IDLE): idle, (self.RxState.IDLE, self.TxState.WAIT_FC): wait_fc, } - def set_address(self, address: isotp.address.Address): + def set_address(self, address: Union[isotp.address.Address, isotp.address.AsymmetricAddress]): """ - Sets the layer :class:`Address`. Can be set after initialization if needed. May cause a timeout if called while a transmission is active. + Sets the layer address. Can be set after initialization if needed. May cause a timeout if called while a transmission is active. + + :param address: Address to use + :type address: :class:`Address` or :class:`AsymmetricAddress` """ - if not isinstance(address, isotp.address.Address): + if not isinstance(address, (isotp.address.Address, isotp.address.AsymmetricAddress)): raise ValueError('address must be a valid Address instance') - self.address = address + if address.is_partial_address(): + raise ValueError('Cannot use a partially defined address. Either use a fully defined isotp.Address or an isotp.AsymmetricAddress') - if self.address.txid is not None and (self.address.txid > 0x7F4 and self.address.txid < 0x7F6 or self.address.txid > 0x7FA and self.address.txid < 0x7FB): + self.address = address + txid = self.address.get_tx_arbitration_id(isotp.TargetAddressType.Physical) + rxid = self.address.get_rx_arbitration_id(isotp.TargetAddressType.Physical) + if (txid > 0x7F4 and txid < 0x7F6 or txid > 0x7FA and txid < 0x7FB): self.logger.warning('Used txid overlaps the range of ID reserved by ISO-15765 (0x7F4-0x7F6 and 0x7FA-0x7FB)') - if self.address.rxid is not None and (self.address.rxid > 0x7F4 and self.address.rxid < 0x7F6 or self.address.rxid > 0x7FA and self.address.rxid < 0x7FB): + if (rxid > 0x7F4 and rxid < 0x7F6 or rxid > 0x7FA and rxid < 0x7FB): self.logger.warning('Used rxid overlaps the range of ID reserved by ISO-15765 (0x7F4-0x7F6 and 0x7FA-0x7FB)') def _pad_message_data(self, msg_data: bytes) -> bytes: @@ -1209,7 +1219,7 @@ def _make_tx_msg(self, arbitration_id: int, data: bytes) -> CanMessage: arbitration_id=arbitration_id, dlc=self._get_dlc(data, validate_tx=True), data=data, - extended_id=self.address.is_29bits, + extended_id=self.address.is_tx_29bits(), is_fd=self.params.can_fd, bitrate_switch=self.params.bitrate_switch ) @@ -1253,7 +1263,7 @@ def _make_flow_control(self, flow_status: int = PDU.FlowStatus.ContinueToSend, b stmin = self.params.stmin data = PDU.craft_flow_control_data(flow_status, blocksize, stmin) - return self._make_tx_msg(self.address.get_tx_arbitration_id(), self.address.tx_payload_prefix + data) + return self._make_tx_msg(self.address.get_tx_arbitration_id(), self.address.get_tx_payload_prefix() + data) def stop_sending(self) -> None: """ diff --git a/isotp/tpsock/__init__.py b/isotp/tpsock/__init__.py index e551fea..7a67e38 100755 --- a/isotp/tpsock/__init__.py +++ b/isotp/tpsock/__init__.py @@ -2,7 +2,7 @@ import os import isotp.address -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union if TYPE_CHECKING: @@ -79,7 +79,7 @@ class socket: LinkLayerProtocol = LinkLayerProtocol interface: Optional[str] - address: Optional[isotp.address.Address] + address: Optional[isotp.address.AbstractAddress] bound: bool closed: bool _socket: socket_module.socket @@ -219,7 +219,7 @@ def get_opts(self) -> "opts.GeneralOpts": def get_fc_opts(self) -> "opts.FlowControlOpts": return opts.FlowControlOpts.read(self._socket) - def bind(self, interface: str, address: isotp.Address) -> None: + def bind(self, interface: str, address: Union[isotp.Address, isotp.AsymmetricAddress]) -> None: """ Binds the socket to an address. @@ -233,31 +233,40 @@ def bind(self, interface: str, address: isotp.Address) -> None: if not isinstance(interface, str): raise ValueError("interface must be a string") - if not isinstance(address, isotp.Address): - raise ValueError("address and instance of isotp.Address") + if not isinstance(address, (isotp.Address, isotp.AsymmetricAddress)): + raise ValueError("address and instance of isotp.Address or isotp.AsymmetricAddress") + + if isinstance(address, isotp.AsymmetricAddress): + if address.requires_rx_extension_byte() != address.requires_tx_extension_byte(): + # See https://github.com/hartkopp/can-isotp/issues/62 + raise ValueError("The IsoTP socket module does not support asymmetric addresses with inconsistent address_extension byte") self.interface = interface self.address = address - # IsoTP sockets doesn't provide an interface to modify the target address type. We asusme physical. + # IsoTP sockets doesn't provide an interface to modify the target address type. We assume physical. # If functional is required, it Ids can be manually crafted in Normal / extended mode rxid = self.address.get_rx_arbitration_id(isotp.TargetAddressType.Physical) txid = self.address.get_tx_arbitration_id(isotp.TargetAddressType.Physical) - if self.address.is_29bits == True: + if self.address.is_rx_29bits(): rxid = (rxid & socket_module.CAN_EFF_MASK) | socket_module.CAN_EFF_FLAG else: rxid = rxid & socket_module.CAN_SFF_MASK - if self.address.is_29bits == True: + if self.address.is_tx_29bits(): txid = (txid & socket_module.CAN_EFF_MASK) | socket_module.CAN_EFF_FLAG else: txid = txid & socket_module.CAN_SFF_MASK - if self.address.requires_extension_byte(): + if self.address.requires_tx_extension_byte() or self.address.requires_rx_extension_byte(): o = self.get_opts() assert o.optflag is not None - o.optflag |= self.flags.EXTEND_ADDR | self.flags.RX_EXT_ADDR + if self.address.requires_tx_extension_byte(): + o.optflag |= self.flags.EXTEND_ADDR + if self.address.requires_rx_extension_byte(): + o.optflag |= self.flags.RX_EXT_ADDR + self.set_opts(optflag=o.optflag, ext_address=self.address.get_tx_extension_byte(), rx_ext_address=self.address.get_rx_extension_byte()) self._socket.bind((interface, rxid, txid)) diff --git a/test/test_addressing_modes.py b/test/test_addressing_modes.py index 9299852..a9c5aeb 100755 --- a/test/test_addressing_modes.py +++ b/test/test_addressing_modes.py @@ -16,11 +16,190 @@ def test_create_address(self): isotp.Address(isotp.AddressingMode.Normal_11bits, txid=1, rxid=2) isotp.Address(isotp.AddressingMode.Normal_29bits, txid=1, rxid=2) isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=1, target_address=2) - isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, rxid=2, target_address=3) - isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, rxid=2, target_address=3) + isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, rxid=2, target_address=3, source_address=5) + isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, rxid=2, target_address=3, source_address=5) isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=1, rxid=2, address_extension=3) isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, target_address=2, address_extension=3) + def test_create_address_bad_params(self): + # Make sure that any missing param is catched + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_11bits, txid=1) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=1) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=2) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_29bits, txid=1) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=2) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=1) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=2, target_address=3, source_address=5) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, target_address=3, source_address=5) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, rxid=2, source_address=5) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, rxid=2, target_address=3) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=2, target_address=3, source_address=5) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, target_address=3, source_address=5) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, rxid=2, source_address=5) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, rxid=2, target_address=3) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=2, address_extension=3) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=1, address_extension=3) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=1, rxid=2) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, target_address=2, address_extension=3) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, address_extension=3) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, target_address=2) + + def test_create_partial_address(self): + # Valid partial addresses + isotp.Address(isotp.AddressingMode.Normal_11bits, txid=1, tx_only=True) + isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=1, rx_only=True) + isotp.Address(isotp.AddressingMode.Normal_29bits, txid=1, tx_only=True) + isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=2, rx_only=True) + + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=1, target_address=2, tx_only=True) + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=1, target_address=2, rx_only=True) + + isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, target_address=3, tx_only=True) + isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=2, source_address=5, rx_only=True) + + isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, target_address=3, tx_only=True) + isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=2, source_address=5, rx_only=True) + + isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=1, address_extension=3, tx_only=True) + isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=1, address_extension=3, rx_only=True) + + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, target_address=2, address_extension=3, tx_only=True) + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, target_address=2, address_extension=3, rx_only=True) + + def test_create_partial_address_bad_params(self): + # Create partial addresses with missing parameters + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=1, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_11bits, txid=1, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=1, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Normal_29bits, txid=2, rx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=1, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=2, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=1, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=2, rx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, target_address=3, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, source_address=5, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=2, rx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, target_address=3, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, tx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, source_address=5, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=2, rx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, address_extension=3, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=1, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, address_extension=3, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=1, rx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, target_address=2, address_extension=3, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, address_extension=3, tx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, target_address=2, tx_only=True) + + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, target_address=2, address_extension=3, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, address_extension=3, rx_only=True) + with self.assertRaises(Exception): + isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=1, target_address=2, rx_only=True) + + def test_create_address_asymmetric(self): + required_params_per_mode_and_dir = { + isotp.AddressingMode.Normal_11bits: {'tx': ['txid'], 'rx': ['rxid']}, + isotp.AddressingMode.Normal_29bits: {'tx': ['txid'], 'rx': ['rxid']}, + isotp.AddressingMode.NormalFixed_29bits: {'tx': ['source_address', 'target_address'], 'rx': ['source_address', 'target_address']}, + isotp.AddressingMode.Extended_11bits: {'tx': ['txid', 'target_address'], 'rx': ['rxid', 'source_address']}, + isotp.AddressingMode.Extended_29bits: {'tx': ['txid', 'target_address'], 'rx': ['rxid', 'source_address']}, + isotp.AddressingMode.Mixed_11bits: {'tx': ['txid', 'address_extension'], 'rx': ['rxid', 'address_extension']}, + isotp.AddressingMode.Mixed_29bits: { + 'tx': ['source_address', 'target_address', 'address_extension'], + 'rx': ['source_address', 'target_address', 'address_extension'] + } + } + + for tx_mode in required_params_per_mode_and_dir: + tx_params_list = required_params_per_mode_and_dir[tx_mode]['tx'] + tx_params = dict(zip(tx_params_list, range(len(tx_params_list)))) # Make a dummy value + for rx_mode in required_params_per_mode_and_dir: + rx_params_list = required_params_per_mode_and_dir[rx_mode]['rx'] + rx_params = dict(zip(rx_params_list, range(len(rx_params_list)))) # Make a dummy value + unittest_logging.logger.debug(f"tx_mode={tx_mode}, rx_mode={rx_mode}") + txaddr = isotp.Address(tx_mode, tx_only=True, **tx_params) + rxaddr = isotp.Address(rx_mode, rx_only=True, **rx_params) + addr = isotp.AsymmetricAddress(tx_addr=txaddr, rx_addr=rxaddr) + + self.assertTrue(txaddr.is_partial_address()) + self.assertTrue(rxaddr.is_partial_address()) + self.assertFalse(addr.is_partial_address()) + + # Let's make sure that the interface is not broken. All methods can be called without raising an exception + addr.get_rx_arbitration_id(isotp.TargetAddressType.Functional) + addr.get_rx_arbitration_id(isotp.TargetAddressType.Physical) + addr.get_tx_arbitration_id(isotp.TargetAddressType.Functional) + addr.get_tx_arbitration_id(isotp.TargetAddressType.Physical) + + addr.requires_tx_extension_byte() + addr.requires_rx_extension_byte() + addr.get_tx_extension_byte() + addr.get_rx_extension_byte() + addr.is_tx_29bits() + addr.is_rx_29bits() + addr.is_for_me(isotp.CanMessage()) + addr.get_rx_prefix_size() + addr.get_tx_payload_prefix() + addr.is_partial_address() + def test_single_frame_only_function_tatype(self): tatype = isotp.TargetAddressType.Functional @@ -42,13 +221,13 @@ def test_single_frame_only_function_tatype(self): with self.assertRaises(ValueError): layer.send(self.make_payload(8), tatype) - address = isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, rxid=2, target_address=3) + address = isotp.Address(isotp.AddressingMode.Extended_11bits, txid=1, rxid=2, target_address=3, source_address=4) layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address) layer.send(self.make_payload(6), tatype) with self.assertRaises(ValueError): layer.send(self.make_payload(7), tatype) - address = isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, rxid=2, target_address=3) + address = isotp.Address(isotp.AddressingMode.Extended_29bits, txid=1, rxid=2, target_address=3, source_address=4) layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address) layer.send(self.make_payload(6), tatype) with self.assertRaises(ValueError): @@ -236,6 +415,25 @@ def test_29bits_normal_through_layer(self): self.assertEqual(msg.data, bytearray([0x21, 0x0A, 0x0B])) self.assertTrue(msg.is_extended_id) + def test_asymmetric_address_normal_11_29(self): + rxid = 0x123 + txid = 0x789ABC + address = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.Normal_29bits, txid=txid, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=rxid, rx_only=True) + ) + self.assertTrue(address.is_tx_29bits()) + self.assertFalse(address.is_rx_29bits()) + + self.assertTrue(address.is_for_me(Message(arbitration_id=rxid))) + self.assertFalse(address.is_for_me(Message(arbitration_id=rxid, extended_id=True))) + self.assertFalse(address.is_for_me(Message(arbitration_id=rxid + 1))) + + self.assertEqual(address.get_tx_arbitration_id(isotp.TargetAddressType.Physical), txid) + self.assertEqual(address.get_tx_arbitration_id(isotp.TargetAddressType.Functional), txid) + self.assertEqual(address.get_rx_arbitration_id(isotp.TargetAddressType.Physical), rxid) + self.assertEqual(address.get_rx_arbitration_id(isotp.TargetAddressType.Functional), rxid) + def test_29bits_normal_fixed(self): ta = 0x55 sa = 0xAA @@ -963,3 +1161,77 @@ def test_29bits_mixed_custom_id_through_layer(self): self.assertEqual(msg.arbitration_id, txid_physical) self.assertEqual(msg.data, bytearray([ae, 0x21, 0x09, 0x0A, 0x0B])) self.assertTrue(msg.is_extended_id) + + def test_asymmetric_address_normalfixed_29_Mixed_11_through_layer(self): + functional = isotp.TargetAddressType.Functional + physical = isotp.TargetAddressType.Physical + ta = 0x55 + sa = 0xAA + rxid = 0x111 + rx_address_extension = 0x88 + txid_physical = 0x18DA55AA + txid_functional = 0x18DB55AA + + address = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=ta, source_address=sa, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=rxid, address_extension=rx_address_extension, rx_only=True) + ) + + layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address, params={'stmin': 0, 'blocksize': 0}) + + # Receive Single frame - Physical + self.simulate_rx_msg(Message(arbitration_id=rxid, data=bytearray([rx_address_extension, 0x03, 0x01, 0x02, 0x03]), extended_id=False)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03') + + # Receive multiframe - Physical + layer.reset() + self.simulate_rx_msg(Message(arbitration_id=rxid, data=bytearray( + [rx_address_extension, 0x10, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05]), extended_id=False)) + layer.process() + self.assert_sent_flow_control(stmin=0, blocksize=0) + self.simulate_rx_msg(Message(arbitration_id=rxid, data=bytearray([rx_address_extension, 0x21, 0x06, 0x07, 0x08]), extended_id=False)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03\x04\x05\x06\x07\x08') + + # Transmit single frame - Physical + layer.reset() + layer.send(b'\x04\x05\x06', physical) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([0x03, 0x04, 0x05, 0x06])) + self.assertTrue(msg.is_extended_id) + + # Transmit single frame - Functional + layer.reset() + layer.send(b'\x04\x05\x06', functional) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_functional) + self.assertEqual(msg.data, bytearray([0x03, 0x04, 0x05, 0x06])) + self.assertTrue(msg.is_extended_id) + + # Transmit multiframe - Physical + layer.reset() + layer.send(b'\x04\x05\x06\x07\x08\x09\x0A\x0B', physical) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([0x10, 0x08, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09])) + self.assertTrue(msg.is_extended_id) + flow_control_payload = bytearray([rx_address_extension]) + self.make_flow_control_data(flow_status=0, stmin=0, blocksize=0) + self.simulate_rx_msg(Message(arbitration_id=rxid, data=flow_control_payload, extended_id=False)) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([0x21, 0x0A, 0x0B])) + self.assertTrue(msg.is_extended_id) diff --git a/test/test_layer_vs_socket.py b/test/test_layer_vs_socket.py index 7023bbf..7053cad 100755 --- a/test/test_layer_vs_socket.py +++ b/test/test_layer_vs_socket.py @@ -182,7 +182,7 @@ def _test_receive_tx_data_length_64_8(self): return self.do_test_tx_dl_receive_client(tx_data_length=64) def do_test_tx_dl_transmit_server(self, remote_tx_data_length=8): - s = self.make_socket(tx_data_length=remote_tx_data_length, can_fd=True) + s = self.make_socket(tx_data_length=remote_tx_data_length, can_fd=True, timeout=5) s.bind(tools.get_test_interface_config("channel"), isotp.Address(txid=self.stack_rxid, rxid=self.stack_txid)) self.socket_ready.set() self.wait_stack_ready() # Creating the stack may take some time as we delete the previous and create a new one @@ -230,11 +230,10 @@ def _test_transmit_tx_data_length_64_8(self): return self.do_test_tx_dl_transmit_client(tx_data_length=64) def test_transmit_long_stmin(self): - s = self.make_socket() + s = self.make_socket(timeout=5) s.set_fc_opts(stmin=100) s.bind(tools.get_test_interface_config("channel"), isotp.Address(txid=self.stack_rxid, rxid=self.stack_txid)) self.socket_ready.set() - self.wait_transmission_complete(5) frame = s.recv() self.assertEqual(frame, self.make_payload(150)) self.reception_complete.set() @@ -288,7 +287,7 @@ def _test_receive_extended_29bits(self): self.assertEqual(frame, self.make_payload(100)) def test_transmit_extended_29bits(self): - s = self.make_socket() + s = self.make_socket(timeout=3) addr = isotp.Address(isotp.AddressingMode.Extended_29bits, txid=self.stack_rxid, rxid=self.stack_txid, source_address=0x88, target_address=0x99) s.bind(tools.get_test_interface_config("channel"), addr) @@ -325,10 +324,9 @@ def _test_receive_mixed_29bits(self): self.assertEqual(frame, self.make_payload(100, 2)) def test_transmit_mixed_29bits(self): - s = self.make_socket() + s = self.make_socket(timeout=3) addr = isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x88, target_address=0x99, address_extension=0xEE) s.bind(tools.get_test_interface_config("channel"), addr) - time.sleep(0.2) self.socket_ready.set() self.wait_stack_ready() frame = s.recv() @@ -343,3 +341,28 @@ def _test_transmit_mixed_29bits(self): self.stack.send(self.make_payload(100, 5)) self.process_stack_send(2) self.wait_reception_complete() + + def test_transmit_asymmetric_address(self): + s = self.make_socket(timeout=3) + addr = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=0x123, address_extension=0xAA, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x88, target_address=0x99, address_extension=0xBB, rx_only=True) + ) + s.bind(tools.get_test_interface_config("channel"), addr) + self.socket_ready.set() + self.wait_stack_ready() + frame = s.recv() + self.assertEqual(frame, self.make_payload(100, 5)) + self.reception_complete.set() + + def _test_transmit_asymmetric_address(self): + self.stack_ready.set() + self.wait_socket_ready() + addr = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x99, target_address=0x88, address_extension=0xBB, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, address_extension=0xAA, rx_only=True) + ) + self.stack.set_address(addr) + self.stack.send(self.make_payload(100, 5)) + self.process_stack_send(2) + self.wait_reception_complete() diff --git a/test/test_socket.py b/test/test_socket.py index 00573f9..60232c4 100755 --- a/test/test_socket.py +++ b/test/test_socket.py @@ -163,3 +163,13 @@ def test_addressing_mixed_29bits(self): self.assertNotEqual(opts.optflag & isotp.socket.flags.RX_EXT_ADDR, 0) self.assertEqual(opts.ext_address, 0x99) self.assertEqual(opts.rx_ext_address, 0x99) + + def test_asymmetric_address_no_addrext(self): + # This is not supported by the isotp socket module. flag to the user. + s = self.make_socket(timeout=3) + addr = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x88, target_address=0x99, address_extension=0xBB, rx_only=True) + ) + with self.assertRaises(ValueError): + s.bind(tools.get_test_interface_config("channel"), addr) diff --git a/test/test_transport_layer.py b/test/test_transport_layer.py index 583c8f9..7a044f4 100755 --- a/test/test_transport_layer.py +++ b/test/test_transport_layer.py @@ -213,3 +213,101 @@ def test_no_call_to_process_after_start(self): self.layer1.stop() self.layer1.process() # OK to call backwrd compatible process() when not running + + +class TestTransportLayerStackAgainstStackAsymetricAddress(unittest.TestCase): + STACK_PARAMS = { + 'stmin': 2, + 'blocksize': 8, + 'override_receiver_stmin': None, + 'rx_flowcontrol_timeout': 1000, + 'rx_consecutive_frame_timeout': 1000, + 'wftmax': 0, + 'tx_data_length': 8, + 'tx_padding': None, + 'rx_flowcontrol_timeout': 1000, + 'rx_consecutive_frame_timeout': 1000, + 'can_fd': False, + 'max_frame_size': 65536, + 'bitrate_switch': False, + 'rate_limit_enable': False, + 'listen_mode': False, + 'blocking_send': False + } + + def setUp(self): + self.error_triggered = {} + self.queue1to2 = SpliceableQueue() + self.queue2to1 = SpliceableQueue() + + params1 = self.STACK_PARAMS.copy() + params1.update(dict(logger_name='layer1')) + + params2 = self.STACK_PARAMS.copy() + params2.update(dict(logger_name='layer2')) + + self.address1 = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, txid=0x123, address_extension=0xAA, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x88, target_address=0x99, address_extension=0xBB, rx_only=True) + ) + self.address2 = isotp.AsymmetricAddress( + tx_addr=isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x99, target_address=0x88, address_extension=0xBB, tx_only=True), + rx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, address_extension=0xAA, rx_only=True) + ) + self.layer1 = isotp.TransportLayer( + txfn=partial(self.send_queue, self.queue1to2), + rxfn=partial(self.read_queue_blocking, self.queue2to1), + address=self.address1, + error_handler=self.error_handler, + params=params1 + ) + + self.layer2 = isotp.TransportLayer( + txfn=partial(self.send_queue, self.queue2to1), + rxfn=partial(self.read_queue_blocking, self.queue1to2), + address=self.address2, + error_handler=self.error_handler, + params=params2 + ) + + unittest_logging.configure_transport_layer(self.layer1) + unittest_logging.configure_transport_layer(self.layer2) + + self.layer1.start() + self.layer2.start() + + def tearDown(self) -> None: + self.layer1.stop() + self.layer2.stop() + + def error_handler(self, error): + if error.__class__ not in self.error_triggered: + self.error_triggered[error.__class__] = [] + unittest_logging.logger.debug("Error reported:%s" % error) + self.error_triggered[error.__class__].append(error) + + def assert_no_error_reported(self): + self.assertEqual(len(self.error_triggered), 0, "At least 1 error was reported") + + def read_queue_blocking(self, q: queue.Queue, timeout: float): + try: + return q.get(block=True, timeout=timeout) + except queue.Empty: + return None + + def send_queue(self, q: queue.Queue, val: isotp.CanMessage, timeout: float = 1): + q.put(val, block=False, timeout=timeout) + + def test_layer1_2_layer2(self): + payload = bytearray([x & 0xFF for x in range(100)]) + self.layer1.send(payload) + data = self.layer2.recv(block=True, timeout=3) + self.assertEqual(data, payload) + self.assert_no_error_reported() + + def test_layer2_2_layer1(self): + payload = bytearray([x & 0xFF for x in range(100)]) + self.layer2.send(payload) + data = self.layer1.recv(block=True, timeout=3) + self.assertEqual(data, payload) + self.assert_no_error_reported()