Skip to content

Commit

Permalink
Added NotifierBasedCanStack + protect against non-thread-safe calls (#…
Browse files Browse the repository at this point in the history
…105)

* Added NotifierBasedCanStack
* Added protection against non-thread-safe calls while threads are running
  • Loading branch information
pylessard authored Dec 29, 2023
1 parent caeede8 commit 54fac74
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 102 deletions.
15 changes: 10 additions & 5 deletions doc/source/isotp/implementation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ In such case, the :class:`isotp.TransportLayer<isotp.TransportLayer>` will be th

.. autoclass:: isotp.TransportLayer

If python-can must be used as CAN layer, one can use the :class:`isotp.CanStack<isotp.CanStack>` which extends the TransportLayer object with predefined functions that calls python-can.
If python-can must be used as CAN layer, one can use the :class:`isotp.CanStack<isotp.CanStack>` and :class:`isotp.NotifierBasedCanStack<isotp.NotifierBasedCanStack>` which extends the TransportLayer object with predefined functions that calls python-can.

.. autoclass:: isotp.CanStack
.. autoclass:: isotp.NotifierBasedCanStack

The CAN messages going in and out from the transport layer are defined with :class:`isotp.CanMessage<isotp.CanMessage>`.

Expand Down Expand Up @@ -309,7 +310,9 @@ Legacy methods (v1.x)
---------------------

With isotp v2.x, the processing of the transport layer is done from an internal thread. For backward compatibility, the following methods are still accessible to the
users, but **should not** be called from the user thread if ``start()`` has been called. It is safe to call them if no call to ``start()`` is done.
users, but **should not** be called from the user thread if ``start()`` has been called. It is safe to call them if no call to ``start()`` is done.

Calls to non-thread-safe method (``reset()``, ``process()``) while the internal thread is running will cause an exception to raise.

.. automethod:: isotp.TransportLayer.reset
.. automethod:: isotp.TransportLayer.process
Expand Down Expand Up @@ -385,7 +388,9 @@ Still, reaching good timing with the pure python :class:`TransportLayer<isotp.Tr

The fact #2 is useful for compatibility, allowing to couple the isotp layer with any kind of link layer. Unfortunately, it has the drawback of preventing cross-layer optimizations.
For that reason, this module employs a 3 thread strategy and rely on the python ``Queue`` object for synchronization. The python ``Queue`` module employ OS primitives for synchronization, such
as condition variables to pass control between threads. See the following figure
as condition variables to pass control between threads, which are as performant as they can be.

See the following figure

.. image:: assets/threads.png
:width: 800px
Expand All @@ -397,7 +402,7 @@ as condition variables to pass control between threads. See the following figure
- The worker thread does blocking reads to the relay queue and gets woken up right away by Python when a message arrives
- When the user calls ``send()``, a ``None`` is injected in the relay queue, forcing the worker thread to wake up and process the user provided payload right away.

Using the approach described above, a message can be read from the link-layer and processed after 2 context switches, which are achievable in about 20us on both Windows and Linux. This
Using the approach described above, a message can be read from the link-layer and processed after 2 context switches, which are achievable in about 20us each on both Windows and Linux. This
40us latency is far better than the latency caused by calls to ``time.sleep()`` required with v1.x. Considering that a CAN bus running at 500kbps has a message duration of about 230us,
the latency is in the acceptable range.

Expand All @@ -413,5 +418,5 @@ In v1.x, the user had to handle timing and repeatedly call the ``process()`` met
``TransportLayerLogic``, therefore the old interface is still accessible under the same name. Inheritance is used instead of composition for that purpose.

When calling ``start()`` and ``stop()``, which have been added in the 2.x extension, it is assumed that the user will uses the TransportLayer as documented by the v2.x documentation,
otherwise race conditions could occur. Put simply, ``process()`` should never be called after ``start()`` has been called.
otherwise race conditions could occur. Put simply, ``process()`` should never be called after ``start()`` has been called, otherwise, an exception will be raised.

2 changes: 1 addition & 1 deletion isotp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from isotp.errors import *
from isotp.can_message import CanMessage
from isotp.address import AddressingMode, TargetAddressType, Address
from isotp.protocol import TransportLayerLogic, TransportLayer, CanStack
from isotp.protocol import TransportLayerLogic, TransportLayer, CanStack, NotifierBasedCanStack
from isotp.tpsock import socket
176 changes: 115 additions & 61 deletions isotp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'TransportLayerLogic',
'TransportLayer',
'CanStack',
'NotifierBasedCanStack'
]

import queue
Expand Down Expand Up @@ -293,33 +294,6 @@ def inform_byte_sent(self, datalen: int) -> None:


class TransportLayerLogic:
"""
The IsoTP transport layer raw implementation. When using this class, the user is responsible of handling timings by calling the ``process()`` function
as fast as possible, like the legacy V1 library was requiring. For an easier solution with less degrees of freedom, use the :ref:`TransportLayer<TransportLayer>`
:param rxfn: Function to be called by the transport layer to read the CAN layer. Must return a :class:`isotp.CanMessage<isotp.CanMessage>` or None if no message has been received.
:type rxfn: Callable : expected signature: ``my_txfn(msg:isotp.CanMessage) -> None``
:param txfn: Function to be called by the transport layer to send a message on the CAN layer. This function should receive a :class:`isotp.CanMessage<isotp.CanMessage>`
:type txfn: Callable : expected signature: ``my_rxfn(timeout:float) -> Optional[isotp.CanMessage]``
:param address: The address information of CAN messages. Includes the addressing mode, txid/rxid, source/target address and address extension.
See :class:`isotp.Address<isotp.Address>` for more details.
:type address: :class:`isotp.Address<isotp.Address>`
:param error_handler: A function to be called when an error has been detected. An :class:`isotp.IsoTpError<isotp.IsoTpError>` (inheriting Exception class)
will be given as sole parameter. See the :ref:`Error section<Errors>`
:type error_handler: Callable : Expected signature: ``my_error_handler(error:isotp.IsoTpError) -> None``
:param params: List of parameters for the transport layer. See :ref:`the list of parameters<parameters>
:type params: dict
:param post_send_callback: An optional callback to be called right after a send request has been enqueued. The main purpose of this parameter is to allow
a facility to stop waiting for a message (if blocked in ``rxfn``) and start transmitting right away if possible.
the function must have this signature : ``my_func(send_request)`` Where ``send_request`` is an instance of :ref:`SendRequest<TransportLayer.SendRequest>`
:type post_send_callback: Callable
"""

LOGGER_NAME = 'isotp'

@dataclass(slots=True, init=False)
Expand Down Expand Up @@ -536,7 +510,7 @@ class ProcessTxReport:
msg: Optional[CanMessage]
immediate_rx_required: bool

RxFn = Callable[[Optional[float]], Optional[CanMessage]]
RxFn = Callable[[float], Optional[CanMessage]]
TxFn = Callable[[CanMessage], None]
PostSendCallback = Callable[[SendRequest], None]
ErrorHandler = Callable[[Exception], None]
Expand Down Expand Up @@ -1389,10 +1363,10 @@ class TransportLayer(TransportLayerLogic):
:param rxfn: Function to be called by the transport layer to read the CAN layer. Must return a :class:`isotp.CanMessage<isotp.CanMessage>` or None if no message has been received.
For optimal performance, this function should perform a blocking read that waits on IO
:type rxfn: Callable : expected signature: ``my_txfn(msg:isotp.CanMessage) -> None``
:type rxfn: Callable : expected signature: ``my_rxfn(timeout:float) -> Optional[isotp.CanMessage]``
:param txfn: Function to be called by the transport layer to send a message on the CAN layer. This function should receive a :class:`isotp.CanMessage<isotp.CanMessage>`
:type txfn: Callable : expected signature: ``my_rxfn(timeout:float) -> Optional[isotp.CanMessage]``
:type txfn: Callable : expected signature: ``my_txfn(msg:isotp.CanMessage) -> None``
:param address: The address information of CAN messages. Includes the addressing mode, txid/rxid, source/target address and address extension. See :class:`isotp.Address<isotp.Address>` for more details.
:type address: isotp.Address
Expand Down Expand Up @@ -1496,14 +1470,14 @@ def stop(self) -> None:
self.events.stop_requested.set()
self.rx_relay_queue.put(None)

# self.logger.debug("Joining main")
if self.main_thread is not None:
if self.main_thread.is_alive():
self.main_thread.join()
self.main_thread.join()
self.main_thread = None

# self.logger.debug("Joining relay_thread")
if self.relay_thread is not None:
if self.relay_thread.is_alive():
self.relay_thread.join()
self.relay_thread.join()
self.relay_thread = None

self.events.main_thread_ready.clear()
Expand All @@ -1512,7 +1486,9 @@ def stop(self) -> None:
self.events.reset_tx.clear()
self.events.reset_rx.clear()

self.reset()
super().reset()
while not self.rx_relay_queue.empty():
self.rx_relay_queue.get()
self._set_rxfn(self.user_rxfn) # Switch back to the given user rxfn. Backward compatibility with v1.x
self.started = False
self.logger.debug(f"{self.__class__.__name__} Stopped")
Expand Down Expand Up @@ -1547,10 +1523,10 @@ def _main_thread_fn(self) -> None:
if delay > 0:
time.sleep(delay) # If we are transmitting CFs, no need to call rxfn, we can stream those CF with short sleep
if not self.events.stop_requested.is_set():
self.process(do_rx=False, do_tx=True)
super().process(do_rx=False, do_tx=True)
else:
rx_timeout = 0.0 if self.is_tx_throttled() else self.default_read_timeout
self.process(rx_timeout)
super().process(rx_timeout)

if self.events.reset_tx.is_set():
self._stop_sending(success=False)
Expand All @@ -1561,7 +1537,7 @@ def _main_thread_fn(self) -> None:
self.events.reset_rx.clear()

finally:
self.reset()
super().reset()
self.logger.debug("Thread is exiting")

@is_documented_by(TransportLayerLogic.stop_sending)
Expand All @@ -1586,6 +1562,19 @@ def stop_receiving(self):
else:
self._stop_receiving()

# Protect against usage of non thread-safe methods while threads are running. We don't hide those method for backward compatibility
@is_documented_by(TransportLayerLogic.process)
def process(self, *args, **kwargs):
if self.started:
raise RuntimeError("Cannot call process() after a start(). See documentation and notes about backward compatibility.")
super().process(*args, **kwargs)

@is_documented_by(TransportLayerLogic.reset)
def reset(self, *args, **kwargs):
if self.started:
raise RuntimeError("Cannot call reset() after a start(). See documentation and notes about backward compatibility.")
super().reset(*args, **kwargs)


class BusOwner:
bus: "can.BusABC"
Expand All @@ -1601,58 +1590,123 @@ def python_can_tx_canbus_3minus(owner: BusOwner, msg: CanMessage) -> None:
extended_id=msg.is_extended_id, is_fd=msg.is_fd, bitrate_switch=msg.bitrate_switch)) # type:ignore


def make_python_can_tx_func(owner: BusOwner) -> Callable[[CanMessage], None]:
def _make_python_can_tx_func(owner: BusOwner) -> Callable[[CanMessage], None]:
message_input_args = inspect.signature(can.Message.__init__).parameters
if 'is_extended_id' in message_input_args:
return functools.partial(python_can_tx_canbus_3plus, owner)
else:
return functools.partial(python_can_tx_canbus_3minus, owner)


def _python_can_to_isotp_message(msg: Optional["can.Message"]) -> Optional[CanMessage]:
if msg is None:
return None

if msg.is_error_frame or msg.is_remote_frame:
return None

return CanMessage(arbitration_id=msg.arbitration_id, data=msg.data, extended_id=msg.is_extended_id, is_fd=msg.is_fd, bitrate_switch=msg.bitrate_switch)


class CanStack(TransportLayer, BusOwner):
"""
The IsoTP transport layer preconfigured to use `python-can <https://python-can.readthedocs.io>`__ as CAN layer. python-can must be installed in order to use this class.
All parameters except the ``bus`` parameter will be given to the :class:`TransportLayer<isotp.TransportLayer>` constructor
:param bus: A python-can bus object implementing ``recv`` and ``send``
:type bus: BusABC
This class directly calls ``bus.recv``, consuming the message from the receive queue, potentially starving other application. Consider using the :class:`NotifierBasedCanStack<isotp.NotifierBasedCanStack>`
to avoid starvation issues
:param bus: A python-can bus object implementing ``recv`` and ``send``
:type bus: BusABC
:param address: The address information of CAN messages. Includes the addressing mode, txid/rxid, source/target address and address extension. See :class:`isotp.Address<isotp.Address>` for more details.
:type address: isotp.Address
:type bus: can.BusABC
:param error_handler: A function to be called when an error has been detected. An :class:`isotp.protocol.IsoTpError<isotp.protocol.IsoTpError>` (inheriting Exception class) will be given as sole parameter
:type error_handler: Callable
:param params: List of parameters for the transport layer
:type params: dict
:param args: Passed down to :class:`TransportLayer<isotp.TransportLayer>`. ``rxfn`` and ``txfn`` are predefined
:type args: N/A
:param kwargs: Passed down to :class:`TransportLayer<isotp.TransportLayer>`. ``rxfn`` and ``txfn`` are predefined
:type kwargs: N/A
"""
default_read_timeout: float
bus: "can.BusABC"

def rx_canbus(self, timeout: Optional[float] = None) -> Optional[CanMessage]:
if timeout is None:
timeout = self.default_read_timeout
def _rx_canbus(self, timeout: float) -> Optional[CanMessage]:
msg = self.bus.recv(timeout)
if msg is not None:
return CanMessage(arbitration_id=msg.arbitration_id, data=msg.data, extended_id=msg.is_extended_id, is_fd=msg.is_fd, bitrate_switch=msg.bitrate_switch)
return None
return _python_can_to_isotp_message(msg)

def __init__(self, bus: "can.BusABC", read_timeout: float = 0.0, *args, **kwargs):
def __init__(self, bus: "can.BusABC", *args, **kwargs):
if not _can_available:
raise RuntimeError(f"python-can is not installed in this environment and is required for the {self.__class__.__name__} object.")

self.set_bus(bus)
self.default_read_timeout = read_timeout
kwargs.update(dict(
rxfn=self.rx_canbus,
txfn=make_python_can_tx_func(self),
rxfn=self._rx_canbus,
txfn=_make_python_can_tx_func(self),
))
super().__init__(*args, **kwargs)

def set_bus(self, bus: "can.BusABC") -> None:
if not isinstance(bus, can.BusABC):
raise ValueError('bus must be a python-can BusABC object')
self.bus = bus


class NotifierBasedCanStack(TransportLayer, BusOwner):
"""
The IsoTP transport layer preconfigured to use `python-can <https://python-can.readthedocs.io>`__ as CAN layer and reading through a ``can.Notifier``. python-can must be installed in order to use this class.
All parameters except the ``bus`` and the ``notifier`` parameter will be given to the :class:`TransportLayer<isotp.TransportLayer>` constructor
This class reads by registering a listener to the given notifier and sends by calling ``bus.recv``.
:param bus: A python-can Bus object implementing ``send`` used for transmission
:type bus: can.BusABC
:param notifier: A python-can Notifier object onto which a new listener will be added
:type notifier: can.Notifier
:param args: Passed down to :class:`TransportLayer<isotp.TransportLayer>`. ``rxfn`` and ``txfn`` are predefined
:type args: N/A
:param kwargs: Passed down to :class:`TransportLayer<isotp.TransportLayer>`. ``rxfn`` and ``txfn`` are predefined
:type kwargs: N/A
"""

buffered_reader: Optional["can.BufferedReader"]
notifier: "can.Notifier"
bus: "can.BusABC"

def _rx_canbus(self, timeout: float):
if self.buffered_reader is None:
return None

msg = self.buffered_reader.get_message(timeout=timeout)
return _python_can_to_isotp_message(msg)

def __init__(self, bus: "can.BusABC", notifier: "can.Notifier", *args, **kwargs):
if not _can_available:
raise RuntimeError(f"python-can is not installed in this environment and is required for the {self.__class__.__name__} object.")

if not isinstance(bus, can.BusABC):
raise ValueError('bus must be a python-can BusABC object')

if not isinstance(notifier, can.Notifier):
raise ValueError("notifier must be a valid can.Notifier object")

self.bus = bus
self.notifier = notifier
self.buffered_reader = None
kwargs.update(dict(
rxfn=self._rx_canbus,
txfn=_make_python_can_tx_func(self),
))
super().__init__(*args, **kwargs)

def start(self):
self.buffered_reader = can.BufferedReader()
self.notifier.add_listener(self.buffered_reader)
super().start()

def stop(self):
try:
self.notifier.remove_listener(self.buffered_reader)
except Exception:
pass
self.buffered_reader = None
super().stop()
Loading

0 comments on commit 54fac74

Please sign in to comment.