Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SocketPanda python library #2073

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions python/socketpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import socket
import struct

# /**
# * struct canfd_frame - CAN flexible data rate frame structure
# * @can_id: CAN ID of the frame and CAN_*_FLAG flags, see canid_t definition
# * @len: frame payload length in byte (0 .. CANFD_MAX_DLEN)
# * @flags: additional flags for CAN FD
# * @__res0: reserved / padding
# * @__res1: reserved / padding
# * @data: CAN FD frame payload (up to CANFD_MAX_DLEN byte)
# */
# struct canfd_frame {
# canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
# __u8 len; /* frame payload length in byte */
# __u8 flags; /* additional flags for CAN FD */
# __u8 __res0; /* reserved / padding */
# __u8 __res1; /* reserved / padding */
# __u8 data[CANFD_MAX_DLEN] __attribute__((aligned(8)));
# };
CAN_HEADER_FMT = "=IBB2x"
CAN_HEADER_LEN = struct.calcsize(CAN_HEADER_FMT)
CAN_MAX_DLEN = 8
CANFD_MAX_DLEN = 64

CANFD_BRS = 0x01 # bit rate switch (second bitrate for payload data)
CANFD_FDF = 0x04 # mark CAN FD for dual use of struct canfd_frame

# socket.SO_RXQ_OVFL is missing
# https://github.com/torvalds/linux/blob/47ac09b91befbb6a235ab620c32af719f8208399/include/uapi/asm-generic/socket.h#L61
SO_RXQ_OVFL = 40

def create_socketcan(interface:str, recv_buffer_size:int, fd:bool) -> socket.socket:
# settings mostly from https://github.com/linux-can/can-utils/blob/master/candump.c
socketcan = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
if fd:
socketcan.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1)
socketcan.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buffer_size)
# TODO: why is it always 2x the requested size?
assert socketcan.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) == recv_buffer_size * 2
# TODO: how to dectect and alert on buffer overflow?
socketcan.setsockopt(socket.SOL_SOCKET, SO_RXQ_OVFL, 1)
socketcan.bind((interface,))
return socketcan

# Panda class substitute for socketcan device (to support using the uds/iso-tp/xcp/ccp library)
class SocketPanda():
def __init__(self, interface:str="can0", bus:int=0, fd:bool=False, recv_buffer_size:int=212992) -> None:
self.interface = interface
self.bus = bus
self.fd = fd
self.flags = CANFD_BRS | CANFD_FDF if fd else 0
self.data_len = CANFD_MAX_DLEN if fd else CAN_MAX_DLEN
self.recv_buffer_size = recv_buffer_size
self.socket = create_socketcan(interface, recv_buffer_size, fd)

def __del__(self):
self.socket.close()

def get_serial(self) -> tuple[int, int]:
return (0, 0) # TODO: implemented in panda socketcan driver

def get_version(self) -> int:
return 0 # TODO: implemented in panda socketcan driver

def can_clear(self, bus:int) -> None:
# TODO: implemented in panda socketcan driver
self.socket.close()
self.socket = create_socketcan(self.interface, self.recv_buffer_size, self.fd)

def set_safety_mode(self, mode:int, param=0) -> None:
pass # TODO: implemented in panda socketcan driver

def has_obd(self) -> bool:
return False # TODO: implemented in panda socketcan driver

def can_send(self, addr, dat, bus=0, timeout=0) -> None:
msg_len = len(dat)
msg_dat = dat.ljust(self.data_len, b'\x00')
can_frame = struct.pack(CAN_HEADER_FMT, addr, msg_len, self.flags) + msg_dat
self.socket.sendto(can_frame, (self.interface,))

def can_recv(self) -> list[tuple[int, bytes, int]]:
msgs = list()
while True:
try:
dat, _ = self.socket.recvfrom(self.recv_buffer_size, socket.MSG_DONTWAIT)
assert len(dat) == CAN_HEADER_LEN + self.data_len, f"ERROR: received {len(dat)} bytes"
can_id, msg_len, _ = struct.unpack(CAN_HEADER_FMT, dat[:CAN_HEADER_LEN])
msg_dat = dat[CAN_HEADER_LEN:CAN_HEADER_LEN+msg_len]
msgs.append((can_id, msg_dat, self.bus))
except BlockingIOError:
break # buffered data exhausted
return msgs
Loading