Skip to content

Commit

Permalink
Update HSFZ with more details (#4544)
Browse files Browse the repository at this point in the history
  • Loading branch information
polybassa authored Oct 2, 2024
1 parent 87b6e26 commit c38a5de
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 43 deletions.
73 changes: 49 additions & 24 deletions scapy/contrib/automotive/bmw/hsfz.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,9 @@
# scapy.contrib.description = HSFZ - BMW High-Speed-Fahrzeug-Zugang
# scapy.contrib.status = loads
import logging
import struct
import socket
import struct
import time

from scapy.contrib.automotive import log_automotive
from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.fields import IntField, ShortEnumField, XByteField
from scapy.layers.inet import TCP
from scapy.supersocket import StreamSocket
from scapy.contrib.automotive.uds import UDS, UDS_TP
from scapy.data import MTU

from typing import (
Any,
Optional,
Expand All @@ -28,29 +19,58 @@
Union,
)

from scapy.contrib.automotive import log_automotive
from scapy.contrib.automotive.uds import UDS, UDS_TP
from scapy.data import MTU
from scapy.fields import (IntField, ShortEnumField, XByteField,
ConditionalField, StrFixedLenField)
from scapy.layers.inet import TCP, UDP
from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.supersocket import StreamSocket

"""
BMW HSFZ (High-Speed-Fahrzeug-Zugang / High-Speed-Car-Access).
BMW specific diagnostic over IP protocol implementation.
The physical interface for this connection is called ENET.
"""


# #########################HSFZ###################################


class HSFZ(Packet):
control_words = {
0x01: "diagnostic_req_res",
0x02: "acknowledge_transfer",
0x10: "terminal15",
0x11: "vehicle_ident_data",
0x12: "alive_check",
0x13: "status_data_inquiry",
0x40: "incorrect_tester_address",
0x41: "incorrect_control_word",
0x42: "incorrect_format",
0x43: "incorrect_dest_address",
0x44: "message_too_large",
0x45: "diag_app_not_ready",
0xFF: "out_of_memory"
}
name = 'HSFZ'
fields_desc = [
IntField('length', None),
ShortEnumField('type', 1, {0x01: "message",
0x02: "echo"}),
XByteField('src', 0),
XByteField('dst', 0),
ShortEnumField('control', 1, control_words),
ConditionalField(
XByteField('source', 0), lambda p: p.control == 1),
ConditionalField(
XByteField('target', 0), lambda p: p.control == 1),
ConditionalField(
StrFixedLenField("identification_string",
None, None, lambda p: p.length),
lambda p: p.control == 0x11)
]

def hashret(self):
# type: () -> bytes
hdr_hash = struct.pack("B", self.src ^ self.dst)
hdr_hash = struct.pack("B", self.source ^ self.target)
pay_hash = self.payload.hashret()
return hdr_hash + pay_hash

Expand All @@ -71,6 +91,11 @@ def post_build(self, pkt, pay):
bind_bottom_up(TCP, HSFZ, sport=6801)
bind_bottom_up(TCP, HSFZ, dport=6801)
bind_layers(TCP, HSFZ, sport=6801, dport=6801)

bind_bottom_up(UDP, HSFZ, sport=6811)
bind_bottom_up(UDP, HSFZ, dport=6811)
bind_layers(UDP, HSFZ, sport=6811, dport=6811)

bind_layers(HSFZ, UDS)


Expand Down Expand Up @@ -111,11 +136,11 @@ def recv(self, x=MTU, **kwargs):


class UDS_HSFZSocket(HSFZSocket):
def __init__(self, src, dst, ip='127.0.0.1', port=6801, basecls=UDS):
def __init__(self, source, target, ip='127.0.0.1', port=6801, basecls=UDS):
# type: (int, int, str, int, Type[Packet]) -> None
super(UDS_HSFZSocket, self).__init__(ip, port)
self.src = src
self.dst = dst
self.source = source
self.target = target
self.basecls = HSFZ
self.outputcls = basecls

Expand All @@ -128,7 +153,7 @@ def send(self, x):

try:
return super(UDS_HSFZSocket, self).send(
HSFZ(src=self.src, dst=self.dst) / x)
HSFZ(source=self.source, target=self.target) / x)
except Exception as e:
# Workaround:
# This catch block is currently necessary to detect errors
Expand All @@ -153,7 +178,7 @@ def recv(self, x=MTU, **kwargs):

def hsfz_scan(ip, # type: str
scan_range=range(0x100), # type: Iterable[int]
src=0xf4, # type: int
source=0xf4, # type: int
timeout=0.1, # type: Union[int, float]
verbose=True # type: bool
):
Expand All @@ -166,7 +191,7 @@ def hsfz_scan(ip, # type: str
:param ip: IPv4 address of target to scan
:param scan_range: Range for HSFZ destination address
:param src: HSFZ source address, used during the scan
:param source: HSFZ source address, used during the scan
:param timeout: Timeout for each request
:param verbose: Show information during scan, if True
:return: A list of open UDS_HSFZSockets
Expand All @@ -175,7 +200,7 @@ def hsfz_scan(ip, # type: str
log_automotive.setLevel(logging.DEBUG)
results = list()
for i in scan_range:
with UDS_HSFZSocket(src, i, ip) as sock:
with UDS_HSFZSocket(source, i, ip) as sock:
try:
resp = sock.sr1(UDS() / UDS_TP(),
timeout=timeout,
Expand All @@ -184,8 +209,8 @@ def hsfz_scan(ip, # type: str
results.append((i, resp))
if resp:
log_automotive.debug(
"Found endpoint %s, src=0x%x, dst=0x%x" % (ip, src, i))
"Found endpoint %s, source=0x%x, target=0x%x" % (ip, source, i))
except Exception as e:
log_automotive.exception(
"Error %s at destination address 0x%x" % (e, i))
return [UDS_HSFZSocket(0xf4, dst, ip) for dst, _ in results]
return [UDS_HSFZSocket(0xf4, target, ip) for target, _ in results]
52 changes: 33 additions & 19 deletions test/contrib/automotive/bmw/hsfz.uts
Original file line number Diff line number Diff line change
Expand Up @@ -13,73 +13,87 @@ load_contrib("automotive.bmw.hsfz", globals_dict=globals())

= Basic Test 1

pkt = HSFZ(type=1, src=0xf4, dst=0x10)/Raw(b'\x11\x22\x33')
pkt = HSFZ(control=1, source=0xf4, target=0x10)/Raw(b'\x11\x22\x33')
assert bytes(pkt) == b'\x00\x00\x00\x05\x00\x01\xf4\x10\x11"3'

= Basic Test 2

pkt = HSFZ(type=1, src=0xf4, dst=0x10)/Raw(b'\x11\x22\x33\x11\x11\x11\x11\x11')
pkt = HSFZ(control=1, source=0xf4, target=0x10)/Raw(b'\x11\x22\x33\x11\x11\x11\x11\x11')
assert bytes(pkt) == b'\x00\x00\x00\x0a\x00\x01\xf4\x10\x11"3\x11\x11\x11\x11\x11'

= Basic Dissect Test

pkt = HSFZ(b'\x00\x00\x00\x0a\x00\x01\xf4\x10\x11"3\x11\x11\x11\x11\x11')
assert pkt.length == 10
assert pkt.src == 0xf4
assert pkt.dst == 0x10
assert pkt.type == 1
assert pkt.source == 0xf4
assert pkt.target == 0x10
assert pkt.control == 1
assert pkt[1].service == 17
assert pkt[2].resetType == 34

= Build Test

pkt = HSFZ(src=0xf4, dst=0x10)/Raw(b"0" * 20)
pkt = HSFZ(source=0xf4, target=0x10)/Raw(b"0" * 20)
assert bytes(pkt) == b'\x00\x00\x00\x16\x00\x01\xf4\x10' + b"0" * 20

= Dissect Test

pkt = HSFZ(b'\x00\x00\x00\x18\x00\x01\xf4\x10\x67\x01' + b"0" * 20)
assert pkt.length == 24
assert pkt.src == 0xf4
assert pkt.dst == 0x10
assert pkt.type == 1
assert pkt.source == 0xf4
assert pkt.target == 0x10
assert pkt.control == 1
assert pkt.securitySeed == b"0" * 20
assert len(pkt[1]) == pkt.length - 2

= Dissect Test with padding

pkt = HSFZ(b'\x00\x00\x00\x18\x00\x01\xf4\x10\x67\x01' + b"0" * 20 + b"p" * 100)
assert pkt.length == 24
assert pkt.src == 0xf4
assert pkt.dst == 0x10
assert pkt.type == 1
assert pkt.source == 0xf4
assert pkt.target == 0x10
assert pkt.control == 1
assert pkt.securitySeed == b"0" * 20
assert pkt.load == b'p' * 100

= Dissect Test to short packet

pkt = HSFZ(b'\x00\x00\x00\x18\x00\x01\xf4\x10\x67\x01' + b"0" * 19)
assert pkt.length == 24
assert pkt.src == 0xf4
assert pkt.dst == 0x10
assert pkt.type == 1
assert pkt.source == 0xf4
assert pkt.target == 0x10
assert pkt.control == 1
assert pkt.securitySeed == b"0" * 19


= Dissect Test very long packet

pkt = HSFZ(b'\x00\x0f\xff\x04\x00\x01\xf4\x10\x67\x01' + b"0" * 0xfff00)
assert pkt.length == 0xfff04
assert pkt.src == 0xf4
assert pkt.dst == 0x10
assert pkt.type == 1
assert pkt.source == 0xf4
assert pkt.target == 0x10
assert pkt.control == 1
assert pkt.securitySeed == b"0" * 0xfff00


= Dissect identification

pkt = HSFZ(bytes.fromhex("000000320011444941474144523130424d574d4143374346436343463837393343424d5756494e5742413558373333333246483735373334"))
assert pkt.length == 50
assert pkt.control == 0x11
assert b"BMW" in pkt.identification_string

pkt = UDP(bytes.fromhex("1a9be2d90040d67d000000320011444941474144523130424d574d4143374346436343463837393343424d5756494e5742413558373333333246483735373334"))
assert pkt.length == 50
assert pkt.control == 0x11
assert b"BMW" in pkt.identification_string

= Test HSFZSocket


server_up = threading.Event()
def server():
buffer = bytes(HSFZ(type=1, src=0xf4, dst=0x10) / Raw(b'\x11\x22\x33' * 1024))
buffer = bytes(HSFZ(control=1, source=0xf4, target=0x10) / Raw(b'\x11\x22\x33' * 1024))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
Expand Down

0 comments on commit c38a5de

Please sign in to comment.