From 4b14235e9837571fa8d7d6152cebc28c26b58150 Mon Sep 17 00:00:00 2001 From: Yves Richard Date: Fri, 27 Sep 2024 17:19:06 +0200 Subject: [PATCH 1/6] add --nfc flag to use NFC apdu media --- speculos/main.py | 3 +- speculos/mcu/nfc.py | 75 ++++++++++++++++++++++++++++++++++++++ speculos/mcu/seproxyhal.py | 22 ++++++++++- 3 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 speculos/mcu/nfc.py diff --git a/speculos/main.py b/speculos/main.py index f7ba54db..95fb5204 100644 --- a/speculos/main.py +++ b/speculos/main.py @@ -269,6 +269,7 @@ def main(prog=None) -> int: parser.add_argument('-t', '--trace', action='store_true', help='Trace syscalls') parser.add_argument('-u', '--usb', default='hid', help='Configure the USB transport protocol, ' 'either HID (default) or U2F') + parser.add_argument('--nfc', action='store_true', help='Use NFC transport instead of USB') group = parser.add_argument_group('network arguments') group.add_argument('--apdu-port', default=9999, type=int, help='ApduServer TCP port') @@ -473,7 +474,7 @@ def main(prog=None) -> int: use_bagl=use_bagl, automation=automation_path, automation_server=automation_server, - transport=args.usb) + transport='nfc' if args.nfc else args.usb) button = None if args.button_port: diff --git a/speculos/mcu/nfc.py b/speculos/mcu/nfc.py new file mode 100644 index 00000000..28c1bd14 --- /dev/null +++ b/speculos/mcu/nfc.py @@ -0,0 +1,75 @@ +""" +Forward NFC packets between the MCU and the SE +""" + +from abc import ABC, abstractmethod +from construct import Int8ub, Int16ub, Int16ul, Struct +import binascii +import enum +import logging + + +class SephNfcTag(enum.IntEnum): + NFC_APDU_EVENT = 0x1C + NFC_EVENT = 0x1E + + +class NFC: + def __init__(self, _queue_event_packet): + self._queue_event_packet = _queue_event_packet + self.packets_to_send = [] + self.MTU=140 + self.rx_sequence = 0 + self.rx_size = 0 + self.rx_data = [] + + self.logger = logging.getLogger("nfc") + + + def handle_rapdu_chunk(self, data): + """concatenate apdu chunks into full apdu""" + + # example of data + # 0000050000002b3330000409312e302e302d72633104e600000008362e312e302d646508352e312e302d6465010001009000 + + # only APDU packets are suported + if data[2] != 0x05: + return None + + sequence = int.from_bytes(data[3:5], 'big') + if self.rx_sequence != sequence: + print(f"Unexpected sequence number:{sequence}") + return None + + if sequence == 0: + self.rx_size = int.from_bytes(data[5:7], "big") + self.rx_data = data[7:] + else: + self.rx_data.append(data[5:]) + + if len(self.rx_data) == self.rx_size: + #prepare for next call + self.rx_sequence = 0 + return self.rx_data + + return None + + + def apdu(self, data): + chunks: List[bytes] = [] + data_len = len(data) + + while len(data) > 0: + size = self.MTU-5 + chunks.append(data[:size]) + data = data[size:] + + for i, chunk in enumerate(chunks): + # ledger protocol header + header = bytes([0x00, 0x00, 0x05]) # APDU + header += i.to_bytes(2, "big") + # first packet contains the size of full buffer + if i == 0: + header += data_len.to_bytes(2, "big") + + self._queue_event_packet(SephNfcTag.NFC_APDU_EVENT, header+chunk) diff --git a/speculos/mcu/seproxyhal.py b/speculos/mcu/seproxyhal.py index 8eed5ca3..37f1834e 100644 --- a/speculos/mcu/seproxyhal.py +++ b/speculos/mcu/seproxyhal.py @@ -9,6 +9,7 @@ from speculos.observer import BroadcastInterface, TextEvent from . import usb +from . import nfc from .automation import Automation from .display import DisplayNotifier, IODevice from .nbgl import NBGL @@ -30,6 +31,9 @@ class SephTag(IntEnum): USB_CONFIG = 0x4f USB_EP_PREPARE = 0x50 + NFC_RAPDU = 0x4A + NFC_POWER = 0x34 + REQUEST_STATUS = 0x52 RAPDU = 0x53 PLAY_TUNE = 0x56 @@ -261,6 +265,7 @@ def __init__(self, self.automation_server = automation_server self.events: List[TextEvent] = [] self.refreshed = False + self.transport = transport self.status_event = threading.Event() self.socket_helper = SocketHelper(self._socket, self.status_event) @@ -270,7 +275,10 @@ def __init__(self, self.socket_helper.wait_until_tick_is_processed) self.time_ticker_thread.start() - self.usb = usb.USB(self.socket_helper.queue_packet, transport=transport) + usb_transport = transport if transport in ['hid', 'u2f'] else 'hid' + self.usb = usb.USB(self.socket_helper.queue_packet, transport=usb_transport) + + self.nfc = nfc.NFC(self.socket_helper.queue_packet) self.ocr = OCR(model, use_bagl) @@ -449,6 +457,13 @@ def can_read(self, screen: DisplayNotifier): assert isinstance(screen.display.gl, NBGL) screen.display.gl.hal_draw_image_file(data) + elif tag == SephTag.NFC_RAPDU: + data = self.nfc.handle_rapdu_chunk(data) + if data: + for c in self.apdu_callbacks: + c(data) + screen.display.forward_to_apdu_client(data) + else: self.logger.error(f"unknown tag: {tag:#x}") sys.exit(0) @@ -506,7 +521,10 @@ def to_app(self, packet: bytes): tag, packet = packet[4], packet[5:] self.socket_helper.queue_packet(SephTag(tag), packet) else: - self.usb.xfer(packet) + if self.transport == 'nfc': + self.nfc.apdu(packet) + else: + self.usb.xfer(packet) def get_tick_count(self): return self.socket_helper.get_tick_count() From a673c1cf6899b6a7c681e537a481500eedee0c86 Mon Sep 17 00:00:00 2001 From: Yves Richard Date: Fri, 27 Sep 2024 17:52:24 +0200 Subject: [PATCH 2/6] Fix regression on --usb parameter --- speculos/mcu/seproxyhal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speculos/mcu/seproxyhal.py b/speculos/mcu/seproxyhal.py index 37f1834e..8d3b6147 100644 --- a/speculos/mcu/seproxyhal.py +++ b/speculos/mcu/seproxyhal.py @@ -275,7 +275,7 @@ def __init__(self, self.socket_helper.wait_until_tick_is_processed) self.time_ticker_thread.start() - usb_transport = transport if transport in ['hid', 'u2f'] else 'hid' + usb_transport = transport if transport.lower() in ['hid', 'u2f'] else 'hid' self.usb = usb.USB(self.socket_helper.queue_packet, transport=usb_transport) self.nfc = nfc.NFC(self.socket_helper.queue_packet) From 71160c996986a1ccf3d4b32bbb0abde77bfb6f13 Mon Sep 17 00:00:00 2001 From: Lucas PASCAL Date: Tue, 12 Nov 2024 15:40:54 +0100 Subject: [PATCH 3/6] [add] Enable NFC for Flex & Stax + minor fixes --- CHANGELOG.md | 8 ++++++++ speculos/mcu/nfc.py | 32 ++++++++++++-------------------- speculos/mcu/seproxyhal.py | 2 +- src/bolos/os.c | 3 ++- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1c3337e..b2eec930 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.12.0] 2024-??-?? + +### Added + +- NFC always enabled on Stax & Flex OSes +- Starting Speculos with the `--nfc` argument will funel all communications as NFC + ## [0.11.0] 2024-11-12 ### Added diff --git a/speculos/mcu/nfc.py b/speculos/mcu/nfc.py index 28c1bd14..15278316 100644 --- a/speculos/mcu/nfc.py +++ b/speculos/mcu/nfc.py @@ -2,11 +2,9 @@ Forward NFC packets between the MCU and the SE """ -from abc import ABC, abstractmethod -from construct import Int8ub, Int16ub, Int16ul, Struct -import binascii import enum import logging +from typing import List, Optional class SephNfcTag(enum.IntEnum): @@ -18,42 +16,36 @@ class NFC: def __init__(self, _queue_event_packet): self._queue_event_packet = _queue_event_packet self.packets_to_send = [] - self.MTU=140 + self.MTU = 140 self.rx_sequence = 0 self.rx_size = 0 - self.rx_data = [] - + self.rx_data: bytes = b'' self.logger = logging.getLogger("nfc") - - def handle_rapdu_chunk(self, data): + def handle_rapdu_chunk(self, data: bytes) -> Optional[List[bytes]]: """concatenate apdu chunks into full apdu""" - # example of data # 0000050000002b3330000409312e302e302d72633104e600000008362e312e302d646508352e312e302d6465010001009000 # only APDU packets are suported if data[2] != 0x05: - return None + return sequence = int.from_bytes(data[3:5], 'big') - if self.rx_sequence != sequence: - print(f"Unexpected sequence number:{sequence}") - return None + assert self.rx_sequence == sequence, f"Unexpected sequence number:{sequence}" if sequence == 0: self.rx_size = int.from_bytes(data[5:7], "big") self.rx_data = data[7:] else: - self.rx_data.append(data[5:]) + self.rx_data += data[5:] if len(self.rx_data) == self.rx_size: - #prepare for next call + # prepare for next call self.rx_sequence = 0 return self.rx_data - - return None - + else: + self.rx_sequence += 1 def apdu(self, data): chunks: List[bytes] = [] @@ -65,11 +57,11 @@ def apdu(self, data): data = data[size:] for i, chunk in enumerate(chunks): - # ledger protocol header + # Ledger protocol header header = bytes([0x00, 0x00, 0x05]) # APDU header += i.to_bytes(2, "big") # first packet contains the size of full buffer if i == 0: header += data_len.to_bytes(2, "big") - self._queue_event_packet(SephNfcTag.NFC_APDU_EVENT, header+chunk) + self._queue_event_packet(SephNfcTag.NFC_APDU_EVENT, header + chunk) diff --git a/speculos/mcu/seproxyhal.py b/speculos/mcu/seproxyhal.py index 8d3b6147..b3532ced 100644 --- a/speculos/mcu/seproxyhal.py +++ b/speculos/mcu/seproxyhal.py @@ -459,7 +459,7 @@ def can_read(self, screen: DisplayNotifier): elif tag == SephTag.NFC_RAPDU: data = self.nfc.handle_rapdu_chunk(data) - if data: + if data is not None: for c in self.apdu_callbacks: c(data) screen.display.forward_to_apdu_client(data) diff --git a/src/bolos/os.c b/src/bolos/os.c index 596a13dd..ccbe8a67 100644 --- a/src/bolos/os.c +++ b/src/bolos/os.c @@ -9,6 +9,7 @@ #define OS_SETTING_PLANEMODE_OLD 5 #define OS_SETTING_PLANEMODE_NEW 6 #define OS_SETTING_SOUND 9 +#define OS_SETTING_FEATURES 14 #undef PATH_MAX #define PATH_MAX 1024 @@ -54,7 +55,7 @@ unsigned long sys_os_setting_get(unsigned int setting_id, return 1; } if (((hw_model == MODEL_STAX) || (hw_model == MODEL_FLEX)) && - setting_id == OS_SETTING_SOUND) { + (setting_id == OS_SETTING_SOUND || setting_id == OS_SETTING_FEATURES)) { return 0xff; } } From 40ae8a0aa0f1a893be764d8ea66a7164ae27a4cf Mon Sep 17 00:00:00 2001 From: Lucas PASCAL Date: Thu, 21 Nov 2024 11:58:11 +0100 Subject: [PATCH 4/6] [clean] Common interface between USB and NFC transport, +only one instance as they are exclusive --- speculos/main.py | 3 +- speculos/mcu/seproxyhal.py | 22 ++--- speculos/mcu/transport/__init__.py | 15 +++ speculos/mcu/transport/interface.py | 36 +++++++ speculos/mcu/{ => transport}/nfc.py | 30 +++--- speculos/mcu/{ => transport}/usb.py | 142 +++++++++++++++------------- 6 files changed, 156 insertions(+), 92 deletions(-) create mode 100644 speculos/mcu/transport/__init__.py create mode 100644 speculos/mcu/transport/interface.py rename speculos/mcu/{ => transport}/nfc.py (64%) rename speculos/mcu/{ => transport}/usb.py (64%) diff --git a/speculos/main.py b/speculos/main.py index 95fb5204..d070eefa 100644 --- a/speculos/main.py +++ b/speculos/main.py @@ -29,6 +29,7 @@ from .mcu.finger_tcp import FakeFinger from .mcu.struct import DisplayArgs, ServerArgs from .mcu.vnc import VNC +from .mcu.transport import TransportType from .observer import BroadcastInterface from .resources_importer import resources @@ -474,7 +475,7 @@ def main(prog=None) -> int: use_bagl=use_bagl, automation=automation_path, automation_server=automation_server, - transport='nfc' if args.nfc else args.usb) + transport=TransportType.NFC if args.nfc else TransportType[args.usb.upper()]) button = None if args.button_port: diff --git a/speculos/mcu/seproxyhal.py b/speculos/mcu/seproxyhal.py index b3532ced..09810a7f 100644 --- a/speculos/mcu/seproxyhal.py +++ b/speculos/mcu/seproxyhal.py @@ -8,8 +8,7 @@ from typing import Callable, List, Optional, Tuple from speculos.observer import BroadcastInterface, TextEvent -from . import usb -from . import nfc +from .transport import build_transport, TransportType from .automation import Automation from .display import DisplayNotifier, IODevice from .nbgl import NBGL @@ -257,7 +256,7 @@ def __init__(self, use_bagl: bool, automation: Optional[Automation] = None, automation_server: Optional[BroadcastInterface] = None, - transport: str = 'hid'): + transport: TransportType = TransportType.HID): self._socket = sock self.logger = logging.getLogger("seproxyhal") self.printf_queue = '' @@ -265,7 +264,6 @@ def __init__(self, self.automation_server = automation_server self.events: List[TextEvent] = [] self.refreshed = False - self.transport = transport self.status_event = threading.Event() self.socket_helper = SocketHelper(self._socket, self.status_event) @@ -275,10 +273,7 @@ def __init__(self, self.socket_helper.wait_until_tick_is_processed) self.time_ticker_thread.start() - usb_transport = transport if transport.lower() in ['hid', 'u2f'] else 'hid' - self.usb = usb.USB(self.socket_helper.queue_packet, transport=usb_transport) - - self.nfc = nfc.NFC(self.socket_helper.queue_packet) + self.transport = build_transport(self.socket_helper.queue_packet, transport) self.ocr = OCR(model, use_bagl) @@ -397,10 +392,10 @@ def can_read(self, screen: DisplayNotifier): c(data) elif tag == SephTag.USB_CONFIG: - self.usb.config(data) + self.transport.config(data) elif tag == SephTag.USB_EP_PREPARE: - data = self.usb.prepare(data) + data = self.transport.prepare(data) if data: for c in self.apdu_callbacks: c(data) @@ -458,7 +453,7 @@ def can_read(self, screen: DisplayNotifier): screen.display.gl.hal_draw_image_file(data) elif tag == SephTag.NFC_RAPDU: - data = self.nfc.handle_rapdu_chunk(data) + data = self.transport.handle_rapdu(data) if data is not None: for c in self.apdu_callbacks: c(data) @@ -521,10 +516,7 @@ def to_app(self, packet: bytes): tag, packet = packet[4], packet[5:] self.socket_helper.queue_packet(SephTag(tag), packet) else: - if self.transport == 'nfc': - self.nfc.apdu(packet) - else: - self.usb.xfer(packet) + self.transport.send(packet) def get_tick_count(self): return self.socket_helper.get_tick_count() diff --git a/speculos/mcu/transport/__init__.py b/speculos/mcu/transport/__init__.py new file mode 100644 index 00000000..aa59cf53 --- /dev/null +++ b/speculos/mcu/transport/__init__.py @@ -0,0 +1,15 @@ +from typing import Callable + +from .interface import TransportLayer, TransportType +from .nfc import NFC +from .usb import USB + + +def build_transport(cb: Callable, transport: TransportType) -> TransportLayer: + if transport is TransportType.NFC: + return NFC(cb, transport) + else: + return USB(cb, transport) + + +__all__ = ["build_transport", "TransportType"] diff --git a/speculos/mcu/transport/interface.py b/speculos/mcu/transport/interface.py new file mode 100644 index 00000000..a59e430b --- /dev/null +++ b/speculos/mcu/transport/interface.py @@ -0,0 +1,36 @@ +from abc import ABC, abstractmethod +from enum import auto, IntEnum +from typing import Callable, Optional + + +class TransportType(IntEnum): + HID = auto() + NFC = auto() + U2F = auto() + + +class TransportLayer(ABC): + + def __init__(self, send_cb: Callable, transport: TransportType): + self._transport = transport + self._send_cb = send_cb + + @property + def type(self) -> TransportType: + return self._transport + + @abstractmethod + def config(self, data: bytes) -> None: + raise NotImplementedError + + @abstractmethod + def prepare(self, data: bytes) -> None: + raise NotImplementedError + + @abstractmethod + def send(self, data: bytes) -> None: + raise NotImplementedError + + @abstractmethod + def handle_rapdu(self, data: bytes) -> Optional[bytes]: + raise NotImplementedError diff --git a/speculos/mcu/nfc.py b/speculos/mcu/transport/nfc.py similarity index 64% rename from speculos/mcu/nfc.py rename to speculos/mcu/transport/nfc.py index 15278316..30a82a42 100644 --- a/speculos/mcu/nfc.py +++ b/speculos/mcu/transport/nfc.py @@ -6,30 +6,37 @@ import logging from typing import List, Optional +from .interface import TransportLayer, TransportType + class SephNfcTag(enum.IntEnum): NFC_APDU_EVENT = 0x1C NFC_EVENT = 0x1E -class NFC: - def __init__(self, _queue_event_packet): - self._queue_event_packet = _queue_event_packet - self.packets_to_send = [] +class NFC(TransportLayer): + def __init__(self, send_cb, transport: TransportType): + super().__init__(send_cb, transport) self.MTU = 140 self.rx_sequence = 0 self.rx_size = 0 self.rx_data: bytes = b'' - self.logger = logging.getLogger("nfc") + self.logger = logging.getLogger("NFC") + + def config(self, data: bytes) -> None: + self.logger.warning("USB-specific 'config' method called on NFC transport. Ignored.") + + def prepare(self, data: bytes) -> None: + self.logger.warning("USB-specific 'prepare' method called on NFC transport. Ignored.") - def handle_rapdu_chunk(self, data: bytes) -> Optional[List[bytes]]: + def handle_rapdu(self, data: bytes) -> Optional[bytes]: """concatenate apdu chunks into full apdu""" # example of data # 0000050000002b3330000409312e302e302d72633104e600000008362e312e302d646508352e312e302d6465010001009000 # only APDU packets are suported if data[2] != 0x05: - return + return None sequence = int.from_bytes(data[3:5], 'big') assert self.rx_sequence == sequence, f"Unexpected sequence number:{sequence}" @@ -46,22 +53,23 @@ def handle_rapdu_chunk(self, data: bytes) -> Optional[List[bytes]]: return self.rx_data else: self.rx_sequence += 1 + return None - def apdu(self, data): + def send(self, data: bytes) -> None: chunks: List[bytes] = [] data_len = len(data) while len(data) > 0: - size = self.MTU-5 + size = self.MTU - 5 chunks.append(data[:size]) data = data[size:] for i, chunk in enumerate(chunks): # Ledger protocol header - header = bytes([0x00, 0x00, 0x05]) # APDU + header = bytes([0x00, 0x00, 0x05]) # APDU header += i.to_bytes(2, "big") # first packet contains the size of full buffer if i == 0: header += data_len.to_bytes(2, "big") - self._queue_event_packet(SephNfcTag.NFC_APDU_EVENT, header + chunk) + self._send_cb(SephNfcTag.NFC_APDU_EVENT, header + chunk) diff --git a/speculos/mcu/usb.py b/speculos/mcu/transport/usb.py similarity index 64% rename from speculos/mcu/usb.py rename to speculos/mcu/transport/usb.py index d45e85ab..0f5ed23c 100644 --- a/speculos/mcu/usb.py +++ b/speculos/mcu/transport/usb.py @@ -3,20 +3,23 @@ protocol. """ -from abc import ABC, abstractmethod -from construct import Int8ub, Int16ub, Int16ul, Struct import binascii import enum import logging +from abc import ABC, abstractmethod +from construct import Int8ub, Int16ub, Int16ul, Struct +from typing import Callable, List, Optional +from .interface import TransportLayer, TransportType -class UsbReq(enum.IntEnum): + +class USBReq(enum.IntEnum): RECIPIENT_DEVICE = 0x00 SET_ADDRESS = 0x05 SET_CONFIGURATION = 0x09 -class SephUsbTag(enum.IntEnum): +class SephUSBTag(enum.IntEnum): XFER_SETUP = 0x01 XFER_IN = 0x02 XFER_OUT = 0x04 @@ -24,14 +27,14 @@ class SephUsbTag(enum.IntEnum): PREPARE_DIR_IN = 0x20 -class SephUsbConfig(enum.IntEnum): +class SephUSBConfig(enum.IntEnum): CONNECT = 0x01 DISCONNECT = 0x02 ADDR = 0x03 ENDPOINTS = 0x04 -class SephUsbPrepare(enum.IntEnum): +class SephUSBPrepare(enum.IntEnum): SETUP = 0x10 IN = 0x20 OUT = 0x30 @@ -39,19 +42,19 @@ class SephUsbPrepare(enum.IntEnum): UNSTALL = 0x80 -class HidEndpoint(enum.IntEnum): +class HIDEndpoint(enum.IntEnum): OUT_ADDR = 0x00 IN_ADDR = 0x80 -class UsbDevState(enum.IntEnum): +class USBDevState(enum.IntEnum): DISCONNECTED = 0 DEFAULT = 1 ADDRESSED = 2 CONFIGURED = 3 -class UsbInterface(enum.IntEnum): +class USBInterface(enum.IntEnum): GENERIC = 0 U2F = 1 HID = 2 @@ -81,7 +84,7 @@ class UsbInterface(enum.IntEnum): ) -class HidPacket: +class HIDPacket: def __init__(self): self.reset(0) @@ -103,9 +106,10 @@ def complete(self): return self.remaining_size == 0 -class Transport(ABC): - def __init__(self, interface, send_xfer): - self.interface = interface +class USBTransport(ABC): + INTERFACE = USBInterface.GENERIC + + def __init__(self, send_xfer: Callable): self.send_xfer = send_xfer @abstractmethod @@ -125,16 +129,18 @@ def config(self, tag): @property def endpoint_in(self): - return HidEndpoint.IN_ADDR | self.interface + return HIDEndpoint.IN_ADDR | self.INTERFACE @property def endpoint_out(self): - return HidEndpoint.OUT_ADDR | self.interface + return HIDEndpoint.OUT_ADDR | self.INTERFACE -class U2f(Transport): - def __init__(self, send_xfer): - super().__init__(UsbInterface.U2F, send_xfer) +class U2F(USBTransport): + INTERFACE = USBInterface.U2F + + def __init__(self, send_xfer: Callable): + super().__init__(send_xfer) def build_xfer(self, tag, data): packet = usb_header.build(dict(endpoint=self.endpoint_out, tag=tag, length=len(data))) @@ -143,23 +149,25 @@ def build_xfer(self, tag, data): def xfer(self, data): assert len(data) == USB_SIZE - packet = self.build_xfer(SephUsbTag.XFER_OUT, data) + packet = self.build_xfer(SephUSBTag.XFER_OUT, data) self.send_xfer(packet) def prepare(self, data): assert len(data) == USB_SIZE - packet = self.build_xfer(SephUsbTag.XFER_IN, b'') + packet = self.build_xfer(SephUSBTag.XFER_IN, b'') self.send_xfer(packet) return data -class Hid(Transport): +class HID(USBTransport): + INTERFACE = USBInterface.HID + USB_CHANNEL = 0x0101 USB_COMMAND = 0x05 def __init__(self, send_xfer): - super().__init__(UsbInterface.HID, send_xfer) - self.hid_packet = HidPacket() + super().__init__(send_xfer) + self.hid_packet = HIDPacket() def _build_header(self, data, length, seq): header = hid_header.build(dict(channel=self.USB_CHANNEL, command=self.USB_COMMAND, seq=seq, length=length)) @@ -192,17 +200,17 @@ def xfer(self, data): else: length = len(chunk) - packet = self.build_xfer(SephUsbTag.XFER_OUT, chunk, seq, length) + packet = self.build_xfer(SephUSBTag.XFER_OUT, chunk, seq, length) self.send_xfer(packet) offset += len(chunk) seq += 1 def config(self, tag): - if tag == UsbDevState.DISCONNECTED: + if tag == USBDevState.DISCONNECTED: self.hid_packet.reset(0) - def prepare(self, data): + def prepare(self, data: bytes) -> Optional[bytes]: hid = hid_header.parse(data) assert hid.channel == self.USB_CHANNEL assert hid.command == self.USB_COMMAND @@ -215,7 +223,7 @@ def prepare(self, data): chunk = data[hid_header.sizeof() - 2:] self.hid_packet.append_data(chunk) - packet = self.build_xfer(SephUsbTag.XFER_IN, b'', self.hid_packet.seq) + packet = self.build_xfer(SephUSBTag.XFER_IN, b'', self.hid_packet.seq) self.send_xfer(packet) if self.hid_packet.complete(): @@ -227,35 +235,36 @@ def prepare(self, data): return answer -class USB: - def __init__(self, _queue_event_packet, transport='hid'): - self._queue_event_packet = _queue_event_packet - self.packets_to_send = [] - self.state = UsbDevState.DISCONNECTED +class USB(TransportLayer): + def __init__(self, send_cb, transport: TransportType = TransportType.HID): + super().__init__(send_cb, transport) + self.packets_to_send: List[bytes] = [] + self.state = USBDevState.DISCONNECTED - if transport.lower() == 'hid': - self.transport = Hid(self.send_xfer) - elif transport.lower() == 'u2f': - self.transport = U2f(self.send_xfer) + self.transport_impl: USBTransport + if transport is TransportType.HID: + self.transport_impl = HID(self._send_xfer) + elif transport is TransportType.U2F: + self.transport_impl = U2F(self._send_xfer) else: - raise ValueError(f"Unsupported USB transport {transport!r}") + raise ValueError(f"Unsupported USB transport {transport.name!r}") - self.logger = logging.getLogger("usb") + self.logger = logging.getLogger("USB") - def send_xfer(self, packet): + def _send_xfer(self, packet: bytes): # don't send packets until the endpoint is configured - if self.state != UsbDevState.CONFIGURED or len(self.packets_to_send) > 0: + if self.state != USBDevState.CONFIGURED or len(self.packets_to_send) > 0: self.packets_to_send.append(packet) return - self.logger.debug("[SEND_XFER] {}".format(binascii.hexlify(packet))) - self._queue_event_packet(SephUsbTag.XFER_EVENT, packet) + self.logger.debug("[SEND_XFER] %s", binascii.hexlify(packet)) + self._send_cb(SephUSBTag.XFER_EVENT, packet) def _send_setup(self, breq, wValue): - data = usb_header.build(dict(endpoint=self.transport.endpoint_out, tag=SephUsbTag.XFER_SETUP, length=0)) - data += usb_setup.build(dict(bmreq=UsbReq.RECIPIENT_DEVICE, breq=breq, wValue=wValue, wIndex=0, wLength=0)) + data = usb_header.build(dict(endpoint=self.transport_impl.endpoint_out, tag=SephUSBTag.XFER_SETUP, length=0)) + data += usb_setup.build(dict(bmreq=USBReq.RECIPIENT_DEVICE, breq=breq, wValue=wValue, wIndex=0, wLength=0)) self.logger.debug("[SEND_SETUP] {}".format(binascii.hexlify(data))) - self._queue_event_packet(SephUsbTag.XFER_EVENT, data) + self._send_cb(SephUSBTag.XFER_EVENT, data) def _flush_packets(self): packets_to_send = self.packets_to_send @@ -263,35 +272,38 @@ def _flush_packets(self): for packet in packets_to_send: self.send_xfer(packet) + def handle_rapdu(self, data: bytes) -> Optional[bytes]: + raise RuntimeError("This method is only implemented on NFC transport") + def config(self, data): """Parse a config packet. If the endpoint address is set, configure it.""" - tag = SephUsbConfig(data[0]) + tag = SephUSBConfig(data[0]) self.logger.debug("[CONFIG] {} {}".format(repr(tag), binascii.hexlify(data))) # The USB stack is shut down with USB_power(0) before being powered on. # Wait for the first CONNECT config message to ensure that USBD_Start() # has been called. - if tag == SephUsbConfig.CONNECT: - if self.state == UsbDevState.DISCONNECTED: - self.state = UsbDevState.ADDRESSED + if tag == SephUSBConfig.CONNECT: + if self.state == USBDevState.DISCONNECTED: + self.state = USBDevState.ADDRESSED self.logger.debug("set_address sent") - self._send_setup(UsbReq.SET_ADDRESS, 1) + self._send_setup(USBReq.SET_ADDRESS, 1) - elif tag == SephUsbConfig.DISCONNECT: - self.state = UsbDevState.DISCONNECTED - self.transport.config(tag) + elif tag == SephUSBConfig.DISCONNECT: + self.state = USBDevState.DISCONNECTED + self.transport_impl.config(tag) - elif tag == SephUsbConfig.ADDR: - if self.state == UsbDevState.ADDRESSED: - self.state = UsbDevState.CONFIGURED - self._send_setup(UsbReq.SET_CONFIGURATION, 1) + elif tag == SephUSBConfig.ADDR: + if self.state == USBDevState.ADDRESSED: + self.state = USBDevState.CONFIGURED + self._send_setup(USBReq.SET_CONFIGURATION, 1) self.logger.debug("configured") - elif tag == SephUsbConfig.ENDPOINTS: + elif tag == SephUSBConfig.ENDPOINTS: # once the endpoint is configured, queued packets can be sent endpoint = data[2] - if endpoint == self.transport.endpoint_out: + if endpoint == self.transport_impl.endpoint_out: self._flush_packets() def prepare(self, data): @@ -299,16 +311,16 @@ def prepare(self, data): header = usb_header.parse(data[:3]) answer = None - tag = SephUsbPrepare(header.tag) + tag = SephUSBPrepare(header.tag) self.logger.debug("[PREPARE] {} {} {}".format(repr(self.state), repr(tag), binascii.hexlify(data))) - if tag == SephUsbPrepare.IN: - if header.endpoint == self.transport.endpoint_in: + if tag == SephUSBPrepare.IN: + if header.endpoint == self.transport_impl.endpoint_in: assert header.length == USB_SIZE data = data[usb_header.sizeof():] - answer = self.transport.prepare(data) + answer = self.transport_impl.prepare(data) return answer - def xfer(self, data): - self.transport.xfer(data) + def send(self, data): + self.transport_impl.xfer(data) From 85e066dda10153f789e788dfaa0d4deb4683d537 Mon Sep 17 00:00:00 2001 From: Lucas PASCAL Date: Thu, 21 Nov 2024 16:02:06 +0100 Subject: [PATCH 5/6] [clean] Using inheritance rather than composition to manage U2F/HID USB transport classes --- speculos/mcu/transport/__init__.py | 6 +- speculos/mcu/transport/interface.py | 2 +- speculos/mcu/transport/nfc.py | 2 +- speculos/mcu/transport/usb.py | 241 +++++++++++++--------------- 4 files changed, 116 insertions(+), 135 deletions(-) diff --git a/speculos/mcu/transport/__init__.py b/speculos/mcu/transport/__init__.py index aa59cf53..2def3fc8 100644 --- a/speculos/mcu/transport/__init__.py +++ b/speculos/mcu/transport/__init__.py @@ -2,14 +2,16 @@ from .interface import TransportLayer, TransportType from .nfc import NFC -from .usb import USB +from .usb import HID, U2F def build_transport(cb: Callable, transport: TransportType) -> TransportLayer: if transport is TransportType.NFC: return NFC(cb, transport) + elif transport is TransportType.U2F: + return U2F(cb, transport) else: - return USB(cb, transport) + return HID(cb, transport) __all__ = ["build_transport", "TransportType"] diff --git a/speculos/mcu/transport/interface.py b/speculos/mcu/transport/interface.py index a59e430b..2eff09a2 100644 --- a/speculos/mcu/transport/interface.py +++ b/speculos/mcu/transport/interface.py @@ -24,7 +24,7 @@ def config(self, data: bytes) -> None: raise NotImplementedError @abstractmethod - def prepare(self, data: bytes) -> None: + def prepare(self, data: bytes) -> Optional[bytes]: raise NotImplementedError @abstractmethod diff --git a/speculos/mcu/transport/nfc.py b/speculos/mcu/transport/nfc.py index 30a82a42..3d308885 100644 --- a/speculos/mcu/transport/nfc.py +++ b/speculos/mcu/transport/nfc.py @@ -34,7 +34,7 @@ def handle_rapdu(self, data: bytes) -> Optional[bytes]: # example of data # 0000050000002b3330000409312e302e302d72633104e600000008362e312e302d646508352e312e302d6465010001009000 - # only APDU packets are suported + # only APDU packets are supported if data[2] != 0x05: return None diff --git a/speculos/mcu/transport/usb.py b/speculos/mcu/transport/usb.py index 0f5ed23c..2c96fed7 100644 --- a/speculos/mcu/transport/usb.py +++ b/speculos/mcu/transport/usb.py @@ -3,10 +3,9 @@ protocol. """ -import binascii import enum import logging -from abc import ABC, abstractmethod +from abc import abstractmethod from construct import Int8ub, Int16ub, Int16ul, Struct from typing import Callable, List, Optional @@ -106,26 +105,14 @@ def complete(self): return self.remaining_size == 0 -class USBTransport(ABC): +class USBTransport(TransportLayer): INTERFACE = USBInterface.GENERIC - def __init__(self, send_xfer: Callable): - self.send_xfer = send_xfer - - @abstractmethod - def xfer(self, data): - pass - - @abstractmethod - def build_xfer(self, data): - pass - - @abstractmethod - def prepare(self, data): - pass - - def config(self, tag): - pass + def __init__(self, send_cb: Callable, transport: TransportType = TransportType.HID): + super().__init__(send_cb, transport) + self.packets_to_send: List[bytes] = [] + self.state = USBDevState.DISCONNECTED + self.logger = logging.getLogger("USB") @property def endpoint_in(self): @@ -135,27 +122,110 @@ def endpoint_in(self): def endpoint_out(self): return HIDEndpoint.OUT_ADDR | self.INTERFACE + def _send_xfer(self, packet: bytes) -> None: + # don't send packets until the endpoint is configured + if self.state != USBDevState.CONFIGURED or len(self.packets_to_send) > 0: + self.packets_to_send.append(packet) + return + + self.logger.debug("[SEND_XFER] %s", packet.hex()) + self._send_cb(SephUSBTag.XFER_EVENT, packet) + + def _send_setup(self, breq: USBReq, wValue: int): + data = usb_header.build(dict(endpoint=self.endpoint_out, tag=SephUSBTag.XFER_SETUP, length=0)) + data += usb_setup.build(dict(bmreq=USBReq.RECIPIENT_DEVICE, breq=breq, wValue=wValue, wIndex=0, wLength=0)) + self.logger.debug("[SEND_SETUP] %s", data.hex()) + self._send_cb(SephUSBTag.XFER_EVENT, data) + + def _flush_packets(self) -> None: + packets_to_send = self.packets_to_send + self.packets_to_send = [] + for packet in packets_to_send: + self._send_xfer(packet) + + def handle_rapdu(self, data: bytes) -> Optional[bytes]: + self.logger.warning("NFC-specific 'handle_apdu' method called on USB transport. Ignored.") + return None + + @abstractmethod + def _config(self, data: SephUSBConfig) -> None: + raise NotImplementedError + + def config(self, data: bytes) -> None: + """Parse a config packet. If the endpoint address is set, configure it.""" + + tag = SephUSBConfig(data[0]) + self.logger.debug("[CONFIG] %s %s", repr(tag), data.hex()) + + # The USB stack is shut down with USB_power(0) before being powered on. + # Wait for the first CONNECT config message to ensure that USBD_Start() + # has been called. + if tag == SephUSBConfig.CONNECT: + if self.state == USBDevState.DISCONNECTED: + self.state = USBDevState.ADDRESSED + self.logger.debug("set_address sent") + self._send_setup(USBReq.SET_ADDRESS, 1) + + elif tag == SephUSBConfig.DISCONNECT: + self.state = USBDevState.DISCONNECTED + self._config(tag) + + elif tag == SephUSBConfig.ADDR: + if self.state == USBDevState.ADDRESSED: + self.state = USBDevState.CONFIGURED + self._send_setup(USBReq.SET_CONFIGURATION, 1) + self.logger.debug("USB configured") + + elif tag == SephUSBConfig.ENDPOINTS: + # once the endpoint is configured, queued packets can be sent + endpoint = data[2] + if endpoint == self.endpoint_out: + self._flush_packets() + + @abstractmethod + def _prepare(self, data: bytes) -> Optional[bytes]: + raise NotImplementedError + + def prepare(self, data: bytes) -> Optional[bytes]: + """Send or receive a packet chunk.""" + + header = usb_header.parse(data[:3]) + answer = None + tag = SephUSBPrepare(header.tag) + self.logger.debug("[PREPARE] %s %s %s", repr(self.state), repr(tag), data.hex()) + + if tag == SephUSBPrepare.IN: + if header.endpoint == self.endpoint_in: + assert header.length == USB_SIZE + data = data[usb_header.sizeof():] + answer = self._prepare(data) + + return answer + class U2F(USBTransport): INTERFACE = USBInterface.U2F - def __init__(self, send_xfer: Callable): - super().__init__(send_xfer) + def __init__(self, send_cb: Callable, transport: TransportType): + super().__init__(send_cb, transport) + + def _config(self, data: SephUSBConfig) -> None: + pass - def build_xfer(self, tag, data): + def _build_xfer(self, tag: SephUSBTag, data: bytes) -> bytes: packet = usb_header.build(dict(endpoint=self.endpoint_out, tag=tag, length=len(data))) packet += data return packet - def xfer(self, data): + def send(self, data: bytes) -> None: assert len(data) == USB_SIZE - packet = self.build_xfer(SephUSBTag.XFER_OUT, data) - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_OUT, data) + self._send_xfer(packet) - def prepare(self, data): + def _prepare(self, data: bytes) -> bytes: assert len(data) == USB_SIZE - packet = self.build_xfer(SephUSBTag.XFER_IN, b'') - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_IN, b'') + self._send_xfer(packet) return data @@ -165,18 +235,18 @@ class HID(USBTransport): USB_CHANNEL = 0x0101 USB_COMMAND = 0x05 - def __init__(self, send_xfer): - super().__init__(send_xfer) + def __init__(self, send_cb: Callable, transport: TransportType): + super().__init__(send_cb, transport) self.hid_packet = HIDPacket() - def _build_header(self, data, length, seq): + def _build_header(self, data: bytes, length: int, seq: int) -> bytes: header = hid_header.build(dict(channel=self.USB_CHANNEL, command=self.USB_COMMAND, seq=seq, length=length)) if seq != 0: # strip hid_header.length header = header[:-2] return header - def build_xfer(self, tag, data, seq=0, length=USB_SIZE): + def _build_xfer(self, tag: SephUSBTag, data: bytes, seq: int = 0, length: int = USB_SIZE): header = self._build_header(data, length, seq) size = len(header) + len(data) @@ -186,7 +256,7 @@ def build_xfer(self, tag, data, seq=0, length=USB_SIZE): return packet - def xfer(self, data): + def send(self, data: bytes) -> None: seq = 0 offset = 0 while offset < len(data): @@ -200,17 +270,17 @@ def xfer(self, data): else: length = len(chunk) - packet = self.build_xfer(SephUSBTag.XFER_OUT, chunk, seq, length) - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_OUT, chunk, seq, length) + self._send_xfer(packet) offset += len(chunk) seq += 1 - def config(self, tag): + def _config(self, tag: SephUSBConfig) -> None: if tag == USBDevState.DISCONNECTED: self.hid_packet.reset(0) - def prepare(self, data: bytes) -> Optional[bytes]: + def _prepare(self, data: bytes) -> Optional[bytes]: hid = hid_header.parse(data) assert hid.channel == self.USB_CHANNEL assert hid.command == self.USB_COMMAND @@ -223,8 +293,8 @@ def prepare(self, data: bytes) -> Optional[bytes]: chunk = data[hid_header.sizeof() - 2:] self.hid_packet.append_data(chunk) - packet = self.build_xfer(SephUSBTag.XFER_IN, b'', self.hid_packet.seq) - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_IN, b'', self.hid_packet.seq) + self._send_xfer(packet) if self.hid_packet.complete(): answer = self.hid_packet.data @@ -233,94 +303,3 @@ def prepare(self, data: bytes) -> Optional[bytes]: answer = None return answer - - -class USB(TransportLayer): - def __init__(self, send_cb, transport: TransportType = TransportType.HID): - super().__init__(send_cb, transport) - self.packets_to_send: List[bytes] = [] - self.state = USBDevState.DISCONNECTED - - self.transport_impl: USBTransport - if transport is TransportType.HID: - self.transport_impl = HID(self._send_xfer) - elif transport is TransportType.U2F: - self.transport_impl = U2F(self._send_xfer) - else: - raise ValueError(f"Unsupported USB transport {transport.name!r}") - - self.logger = logging.getLogger("USB") - - def _send_xfer(self, packet: bytes): - # don't send packets until the endpoint is configured - if self.state != USBDevState.CONFIGURED or len(self.packets_to_send) > 0: - self.packets_to_send.append(packet) - return - - self.logger.debug("[SEND_XFER] %s", binascii.hexlify(packet)) - self._send_cb(SephUSBTag.XFER_EVENT, packet) - - def _send_setup(self, breq, wValue): - data = usb_header.build(dict(endpoint=self.transport_impl.endpoint_out, tag=SephUSBTag.XFER_SETUP, length=0)) - data += usb_setup.build(dict(bmreq=USBReq.RECIPIENT_DEVICE, breq=breq, wValue=wValue, wIndex=0, wLength=0)) - self.logger.debug("[SEND_SETUP] {}".format(binascii.hexlify(data))) - self._send_cb(SephUSBTag.XFER_EVENT, data) - - def _flush_packets(self): - packets_to_send = self.packets_to_send - self.packets_to_send = [] - for packet in packets_to_send: - self.send_xfer(packet) - - def handle_rapdu(self, data: bytes) -> Optional[bytes]: - raise RuntimeError("This method is only implemented on NFC transport") - - def config(self, data): - """Parse a config packet. If the endpoint address is set, configure it.""" - - tag = SephUSBConfig(data[0]) - self.logger.debug("[CONFIG] {} {}".format(repr(tag), binascii.hexlify(data))) - - # The USB stack is shut down with USB_power(0) before being powered on. - # Wait for the first CONNECT config message to ensure that USBD_Start() - # has been called. - if tag == SephUSBConfig.CONNECT: - if self.state == USBDevState.DISCONNECTED: - self.state = USBDevState.ADDRESSED - self.logger.debug("set_address sent") - self._send_setup(USBReq.SET_ADDRESS, 1) - - elif tag == SephUSBConfig.DISCONNECT: - self.state = USBDevState.DISCONNECTED - self.transport_impl.config(tag) - - elif tag == SephUSBConfig.ADDR: - if self.state == USBDevState.ADDRESSED: - self.state = USBDevState.CONFIGURED - self._send_setup(USBReq.SET_CONFIGURATION, 1) - self.logger.debug("configured") - - elif tag == SephUSBConfig.ENDPOINTS: - # once the endpoint is configured, queued packets can be sent - endpoint = data[2] - if endpoint == self.transport_impl.endpoint_out: - self._flush_packets() - - def prepare(self, data): - """Send or receive a packet chunk.""" - - header = usb_header.parse(data[:3]) - answer = None - tag = SephUSBPrepare(header.tag) - self.logger.debug("[PREPARE] {} {} {}".format(repr(self.state), repr(tag), binascii.hexlify(data))) - - if tag == SephUSBPrepare.IN: - if header.endpoint == self.transport_impl.endpoint_in: - assert header.length == USB_SIZE - data = data[usb_header.sizeof():] - answer = self.transport_impl.prepare(data) - - return answer - - def send(self, data): - self.transport_impl.xfer(data) From 6518a6d7aa1f784ffb3c45015942cd603432de5e Mon Sep 17 00:00:00 2001 From: Lucas PASCAL Date: Thu, 21 Nov 2024 16:25:42 +0100 Subject: [PATCH 6/6] [add] Unique '--transport' parameter to configure HID, U2F or NFC transport --- CHANGELOG.md | 5 +++-- speculos/main.py | 13 ++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2eec930..76f64a02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- NFC always enabled on Stax & Flex OSes -- Starting Speculos with the `--nfc` argument will funel all communications as NFC +- NFC communication available +- Starting Speculos with the `--transport` argument allows to choose U2F, HID or NFC transport +- Flex and Stax OSes emulation always consider NFC to be up (it can't be deactivated for now) ## [0.11.0] 2024-11-12 diff --git a/speculos/main.py b/speculos/main.py index d070eefa..96f9aa23 100644 --- a/speculos/main.py +++ b/speculos/main.py @@ -269,8 +269,9 @@ def main(prog=None) -> int: 'to use a hex seed, prefix it with "hex:"') parser.add_argument('-t', '--trace', action='store_true', help='Trace syscalls') parser.add_argument('-u', '--usb', default='hid', help='Configure the USB transport protocol, ' - 'either HID (default) or U2F') - parser.add_argument('--nfc', action='store_true', help='Use NFC transport instead of USB') + 'either HID (default) or U2F (DEPRECATED, use `--transport` instead)') + parser.add_argument('-T', '--transport', default=None, choices=('HID', 'U2F', 'NFC'), + help='Configure the transport protocol: HID (default), U2F or NFC.') group = parser.add_argument_group('network arguments') group.add_argument('--apdu-port', default=9999, type=int, help='ApduServer TCP port') @@ -468,6 +469,12 @@ def main(prog=None) -> int: qemu_pid = run_qemu(s1, s2, args, use_bagl) s1.close() + # The `--transport` argument takes precedence over `--usb` + if args.transport is not None: + transport_type = TransportType[args.transport] + else: + transport_type = TransportType[args.usb.upper()] + apdu = apdu_server.ApduServer(host="0.0.0.0", port=args.apdu_port) seph = seproxyhal.SeProxyHal( s2, @@ -475,7 +482,7 @@ def main(prog=None) -> int: use_bagl=use_bagl, automation=automation_path, automation_server=automation_server, - transport=TransportType.NFC if args.nfc else TransportType[args.usb.upper()]) + transport=transport_type) button = None if args.button_port: