Skip to content

Commit

Permalink
Mypy passes strict check (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
pylessard authored Jan 23, 2024
1 parent bd512b1 commit 8f871ba
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 44 deletions.
33 changes: 33 additions & 0 deletions isotp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
_major_version_ = 2

__all__ = [
'CanMessage',
'AddressingMode',
'TargetAddressType',
'Address',
'AsymmetricAddress',
'TransportLayerLogic',
'TransportLayer',
'CanStack',
'NotifierBasedCanStack',
'socket',

'IsoTpError',
'BlockingSendFailure',
'BadGeneratorError',
'BlockingSendTimeout',
'FlowControlTimeoutError',
'ConsecutiveFrameTimeoutError',
'InvalidCanDataError',
'UnexpectedFlowControlError',
'UnexpectedConsecutiveFrameError',
'ReceptionInterruptedWithSingleFrameError',
'ReceptionInterruptedWithFirstFrameError',
'WrongSequenceNumberError',
'UnsupportedWaitFrameError',
'MaximumWaitFrameReachedError',
'FrameTooLongError',
'ChangingInvalidRXDLError',
'MissingEscapeSequenceError',
'InvalidCanFdFirstFrameRXDL',
'OverflowError'
]

from isotp.errors import *
from isotp.can_message import CanMessage
from isotp.address import AddressingMode, TargetAddressType, Address, AsymmetricAddress
Expand Down
17 changes: 12 additions & 5 deletions isotp/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AddressingMode(Enum):
Mixed_29bits = 6

@classmethod
def get_name(cls, num):
def get_name(cls, num: Union[int, "AddressingMode"]) -> str:
return cls(num).name


Expand Down Expand Up @@ -76,6 +76,10 @@ def get_tx_payload_prefix(self) -> bytes:
def is_partial_address(self) -> bool:
raise NotImplementedError("Abstract method")

@abc.abstractmethod
def get_content_str(self) -> str:
raise NotImplementedError("Abstract method")


class Address(AbstractAddress):
"""
Expand Down Expand Up @@ -203,7 +207,7 @@ def __init__(self,
else:
raise RuntimeError('This exception should never be raised.')

def not_implemented_func_with_partial(*args, **kwargs):
def not_implemented_func_with_partial(*args: Any, **kwargs: Any) -> None:
raise NotImplementedError("Not possible with partial address")

# Remove unavailable functions to be strict
Expand All @@ -222,7 +226,7 @@ def not_implemented_func_with_partial(*args, **kwargs):
setattr(self, 'is_tx_29bits', not_implemented_func_with_partial)
setattr(self, 'get_tx_payload_prefix', not_implemented_func_with_partial)

def validate(self):
def validate(self) -> None:
if self._rx_only and self._tx_only:
raise ValueError("Address cannot be tx only and rx only")

Expand Down Expand Up @@ -434,7 +438,7 @@ def get_content_str(self) -> str:
vals_str = ', '.join(['%s:0x%02x' % (k, val_dict[k]) for k in val_dict])
return '[%s - %s]' % (AddressingMode.get_name(self._addressing_mode), vals_str)

def __repr__(self):
def __repr__(self) -> str:
return '<IsoTP Address %s at 0x%08x>' % (self.get_content_str(), id(self))


Expand Down Expand Up @@ -504,5 +508,8 @@ def get_tx_payload_prefix(self) -> bytes:
def is_partial_address(self) -> bool:
return False

def get_content_str(self) -> str:
return f"RxAddr: {self.rx_addr.__repr__()} - TxAddr: {self.tx_addr.__repr__()}"

def __repr__(self) -> str:
return f'<{self.__class__.__name__} - RxAddr: {self.rx_addr.__repr__()} - TxAddr: {self.tx_addr.__repr__()}>'
return f'<{self.__class__.__name__} - {self.get_content_str()}>'
25 changes: 24 additions & 1 deletion isotp/errors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
__all__ = [
'IsoTpError',
'BlockingSendFailure',
'BadGeneratorError',
'BlockingSendTimeout',
'FlowControlTimeoutError',
'ConsecutiveFrameTimeoutError',
'InvalidCanDataError',
'UnexpectedFlowControlError',
'UnexpectedConsecutiveFrameError',
'ReceptionInterruptedWithSingleFrameError',
'ReceptionInterruptedWithFirstFrameError',
'WrongSequenceNumberError',
'UnsupportedWaitFrameError',
'MaximumWaitFrameReachedError',
'FrameTooLongError',
'ChangingInvalidRXDLError',
'MissingEscapeSequenceError',
'InvalidCanFdFirstFrameRXDL',
'OverflowError'
]

from typing import Any


class IsoTpError(Exception):
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
Exception.__init__(self, *args, **kwargs)


Expand Down
54 changes: 28 additions & 26 deletions isotp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
_can_available = False


def is_documented_by(original):
def wrapper(target):
def is_documented_by(original: Callable[[Any], Any]) -> Callable[[Any], Any]:
def wrapper(target: Callable[[Any], Any]) -> Callable[[Any], Any]:
target.__doc__ = original.__doc__
return target
return wrapper
Expand Down Expand Up @@ -75,7 +75,7 @@ class FlowStatus:
escape_sequence: bool
can_dl: int

def __init__(self, msg: CanMessage, start_of_data: int = 0):
def __init__(self, msg: CanMessage, start_of_data: int = 0) -> None:

self.data = bytes()
self.length = None
Expand Down Expand Up @@ -172,7 +172,7 @@ def __init__(self, msg: CanMessage, start_of_data: int = 0):
def craft_flow_control_data(cls, flow_status: int, blocksize: int, stmin: int) -> bytes:
return bytes([(0x30 | (flow_status) & 0xF), blocksize & 0xFF, stmin & 0xFF])

def name(self):
def name(self) -> str:
if self.type is None:
return "[None]"

Expand Down Expand Up @@ -200,7 +200,7 @@ class RateLimiter:
bit_total: int
window_bit_max: float

def __init__(self, mean_bitrate=None, window_size_sec=0.1):
def __init__(self, mean_bitrate: float = 10000000, window_size_sec: float = 0.1) -> None:
self.enabled = False
self.mean_bitrate = mean_bitrate
self.window_size_sec = window_size_sec
Expand Down Expand Up @@ -346,7 +346,7 @@ class Params:
logger_name: str
wait_func: Callable[[float], None]

def __init__(self):
def __init__(self) -> None:
self.stmin = 0
self.blocksize = 8
self.override_receiver_stmin = None
Expand Down Expand Up @@ -552,7 +552,7 @@ class ProcessStats:
sent: int
frame_received: int

def __repr__(self):
def __repr__(self) -> str:
return f'<{self.__class__.__name__} received:{self.received} (processed: {self.received_processed}, sent: {self.sent})>'

@dataclass(frozen=True)
Expand Down Expand Up @@ -689,7 +689,8 @@ def load_params(self) -> None:
def send(self,
data: Union[bytes, bytearray, SendGenerator],
target_address_type: Optional[Union[isotp.address.TargetAddressType, int]] = None,
send_timeout: Optional[float] = None):
send_timeout: Optional[float] = None
) -> None:
"""
Enqueue an IsoTP frame to be sent over CAN network.
When performing a blocking send, this method returns only when the transmission is complete or raise an exception when a failure or a timeout occurs.
Expand Down Expand Up @@ -874,7 +875,7 @@ def process(self, rx_timeout: float = 0.0, do_rx: bool = True, do_tx: bool = Tru
frame_received=nb_frame_received
)

def _set_rxfn(self, rxfn: "TransportLayerLogic.RxFn"):
def _set_rxfn(self, rxfn: "TransportLayerLogic.RxFn") -> None:
"""
Allow post init change of rxfn. This is a trick to implement the threaded Transport Layer
and keeping the ability to run the TransportLayerLogic without threads for backward compatibility
Expand Down Expand Up @@ -1185,7 +1186,7 @@ def set_sleep_timing(self, idle: float, wait_fc: float) -> None:
(self.RxState.IDLE, self.TxState.WAIT_FC): wait_fc,
}

def set_address(self, address: Union[isotp.address.Address, isotp.address.AsymmetricAddress]):
def set_address(self, address: Union[isotp.address.Address, isotp.address.AsymmetricAddress]) -> None:
"""
Sets the layer address. Can be set after initialization if needed. May cause a timeout if called while a transmission is active.
Expand Down Expand Up @@ -1249,10 +1250,10 @@ def _start_rx_cf_timer(self) -> None:
self.timer_rx_cf = Timer(timeout=float(self.params.rx_consecutive_frame_timeout) / 1000)
self.timer_rx_cf.start()

def _append_rx_data(self, data) -> None:
def _append_rx_data(self, data: Union[bytes, bytearray]) -> None:
self.rx_buffer.extend(data)

def _request_tx_flowcontrol(self, status=PDU.FlowStatus.ContinueToSend) -> None:
def _request_tx_flowcontrol(self, status: int = PDU.FlowStatus.ContinueToSend) -> None:
self.pending_flow_control_tx = True
self.pending_flowcontrol_status = status

Expand Down Expand Up @@ -1302,7 +1303,7 @@ def _get_nearest_can_fd_size(self, size: int) -> int:
if size <= 64: return 64
raise ValueError("Impossible data size for CAN FD : %d " % (size))

def _make_flow_control(self, flow_status: int = PDU.FlowStatus.ContinueToSend, blocksize: Optional[int] = None, stmin: Optional[int] = None):
def _make_flow_control(self, flow_status: int = PDU.FlowStatus.ContinueToSend, blocksize: Optional[int] = None, stmin: Optional[int] = None) -> CanMessage:
if blocksize is None:
blocksize = self.params.blocksize

Expand Down Expand Up @@ -1347,11 +1348,11 @@ def _stop_receiving(self) -> None:
self._stop_sending_flow_control()
self.timer_rx_cf.stop()

def clear_rx_queue(self):
def clear_rx_queue(self) -> None:
while not self.rx_queue.empty():
self.rx_queue.get_nowait()

def clear_tx_queue(self):
def clear_tx_queue(self) -> None:
while not self.tx_queue.empty():
self.tx_queue.get_nowait()

Expand Down Expand Up @@ -1476,7 +1477,7 @@ class Events:
reset_tx: threading.Event
reset_rx: threading.Event

def __init__(self):
def __init__(self) -> None:
self.main_thread_ready = threading.Event()
self.relay_thread_ready = threading.Event()
self.stop_requested = threading.Event()
Expand All @@ -1497,7 +1498,7 @@ def __init__(self,
address: isotp.Address,
error_handler: Optional[TransportLayerLogic.ErrorHandler] = None,
params: Optional[Dict[str, Any]] = None,
read_timeout=0.05):
read_timeout: float = 0.05) -> None:

self.rx_relay_queue = queue.Queue()
self.started = False
Expand Down Expand Up @@ -1624,7 +1625,7 @@ def _main_thread_fn(self) -> None:
self.logger.debug("Main thread is exiting")

@is_documented_by(TransportLayerLogic.stop_sending)
def stop_sending(self):
def stop_sending(self) -> None:
if self.started:
if not self.events.stop_requested.is_set():
if self.main_thread is not None and self.main_thread.is_alive():
Expand All @@ -1635,7 +1636,7 @@ def stop_sending(self):
self._stop_sending(success=False)

@is_documented_by(TransportLayerLogic.stop_receiving)
def stop_receiving(self):
def stop_receiving(self) -> None:
if self.started:
if not self.events.stop_requested.is_set():
if self.main_thread is not None and self.main_thread.is_alive():
Expand All @@ -1653,7 +1654,7 @@ def process(self, rx_timeout: float = 0.0, do_rx: bool = True, do_tx: bool = Tru
return super().process(rx_timeout=rx_timeout, do_rx=do_rx, do_tx=do_tx)

@is_documented_by(TransportLayerLogic.reset)
def reset(self):
def reset(self) -> None:
if self.started:
raise RuntimeError("Cannot call reset() after a start(). See documentation and notes about backward compatibility.")
super().reset()
Expand Down Expand Up @@ -1714,7 +1715,7 @@ def _rx_canbus(self, timeout: float) -> Optional[CanMessage]:
msg = self.bus.recv(timeout)
return _python_can_to_isotp_message(msg)

def __init__(self, bus: "can.BusABC", *args, **kwargs):
def __init__(self, bus: "can.BusABC", *args: Any, **kwargs: Any):
if not _can_available:
raise RuntimeError(f"python-can is not installed in this environment and is required for the {self.__class__.__name__} object.")

Expand Down Expand Up @@ -1755,14 +1756,14 @@ class NotifierBasedCanStack(TransportLayer, BusOwner):
notifier: "can.Notifier"
bus: "can.BusABC"

def _rx_canbus(self, timeout: float):
def _rx_canbus(self, timeout: float) -> Optional[CanMessage]:
if self.buffered_reader is None:
return None

msg = self.buffered_reader.get_message(timeout=timeout)
return _python_can_to_isotp_message(msg)

def __init__(self, bus: "can.BusABC", notifier: "can.Notifier", *args, **kwargs):
def __init__(self, bus: "can.BusABC", notifier: "can.Notifier", *args: Any, **kwargs: Any) -> None:
if not _can_available:
raise RuntimeError(f"python-can is not installed in this environment and is required for the {self.__class__.__name__} object.")

Expand All @@ -1781,14 +1782,15 @@ def __init__(self, bus: "can.BusABC", notifier: "can.Notifier", *args, **kwargs)
))
super().__init__(*args, **kwargs)

def start(self):
def start(self) -> None:
self.buffered_reader = can.BufferedReader()
self.notifier.add_listener(self.buffered_reader)
super().start()

def stop(self):
def stop(self) -> None:
try:
self.notifier.remove_listener(self.buffered_reader)
if self.buffered_reader is not None:
self.notifier.remove_listener(self.buffered_reader)
except Exception:
pass
self.buffered_reader = None
Expand Down
4 changes: 2 additions & 2 deletions isotp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ class Timer:
start_time: Optional[int]
timeout: int

def __init__(self, timeout: float):
def __init__(self, timeout: float) -> None:
self.set_timeout(timeout)
self.start_time = None

def set_timeout(self, timeout: float) -> None:
self.timeout = int(timeout * 1e9)

def start(self, timeout=None) -> None:
def start(self, timeout: Optional[float] = None) -> None:
if timeout is not None:
self.set_timeout(timeout)
self.start_time = time.perf_counter_ns()
Expand Down
8 changes: 4 additions & 4 deletions isotp/tpsock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class socket:
closed: bool
_socket: socket_module.socket

def __init__(self, timeout=None):
def __init__(self, timeout: Optional[float] = None) -> None:
check_support()
from . import opts # import only if required.
self.interface = None
Expand Down Expand Up @@ -287,13 +287,13 @@ def close(self) -> None:
self.closed = True
self.address = None

def __delete__(self):
def __delete__(self) -> None:
if isinstance(self._socket, socket_module.socket):
self._socket.close()
self._socket = None

def __repr__(self):
def __repr__(self) -> str:
if self.bound:
assert self.address is not None
return "<ISO-TP Socket: %s, %s>" % (self.interface, self.address.get_content_str())
else:
status = "Closed" if self.closed else "Unbound"
Expand Down
Loading

0 comments on commit 8f871ba

Please sign in to comment.