diff --git a/pyproject.toml b/pyproject.toml index b2bf2c794..427551a08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,4 +14,7 @@ [build-system] requires = ["setuptools"] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" + +[[tool.mypy]] +exclude = ["doc", "setup.py"] diff --git a/spinnman/extended/bmp_set_led.py b/spinnman/extended/bmp_set_led.py index c76f2d220..1b8f9f693 100644 --- a/spinnman/extended/bmp_set_led.py +++ b/spinnman/extended/bmp_set_led.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Iterable, Union from spinn_utilities.overrides import overrides from spinnman.messages.scp import SCPRequestHeader from spinnman.messages.scp.abstract_messages import ( AbstractSCPRequest, BMPRequest) from spinnman.messages.scp.enums import SCPCommand from spinnman.messages.scp.impl.check_ok_response import CheckOKResponse +from spinnman.messages.scp.enums.led_action import LEDAction class BMPSetLed(BMPRequest): @@ -27,9 +29,10 @@ class BMPSetLed(BMPRequest): This class is currently deprecated and untested as there is no known use except for Transceiver.set_led which is itself deprecated. """ - __slots__ = [] + __slots__ = () - def __init__(self, led, action, boards): + def __init__(self, led: Union[int, Iterable[int]], action: LEDAction, + boards: Union[int, Iterable[int]]): """ :param led: Number of the LED or an iterable of LEDs to set the state of (0-7) @@ -42,12 +45,9 @@ def __init__(self, led, action, boards): """ # set up the led entry for arg1 if isinstance(led, int): - leds = [led] + arg1 = action.value << (led * 2) else: - leds = led - - # LED setting actions - arg1 = sum(action.value << (led * 2) for led in leds) + arg1 = sum(action.value << (a_led * 2) for a_led in led) # Bitmask of boards to control arg2 = self.get_board_mask(boards) @@ -59,5 +59,5 @@ def __init__(self, led, action, boards): argument_1=arg1, argument_2=arg2) @overrides(AbstractSCPRequest.get_scp_response) - def get_scp_response(self): + def get_scp_response(self) -> CheckOKResponse: return CheckOKResponse("Set the LEDs of a board", "CMD_LED") diff --git a/spinnman/extended/de_alloc_sdram_process.py b/spinnman/extended/de_alloc_sdram_process.py index 916a2500d..b70cca50d 100644 --- a/spinnman/extended/de_alloc_sdram_process.py +++ b/spinnman/extended/de_alloc_sdram_process.py @@ -12,10 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from typing import Optional from spinnman.messages.scp.impl import SDRAMDeAlloc -from spinnman.processes.abstract_multi_connection_process import ( - AbstractMultiConnectionProcess) +from spinnman.processes import AbstractMultiConnectionProcess +from spinnman.processes import ConnectionSelector class DeAllocSDRAMProcess(AbstractMultiConnectionProcess): @@ -25,19 +25,16 @@ class DeAllocSDRAMProcess(AbstractMultiConnectionProcess): known use except for Transceiver.free_sdram and free_sdram_by_app_id which are both themselves deprecated. """ - __slots__ = [ - "_no_blocks_freed"] + __slots__ = ("_no_blocks_freed", ) - def __init__(self, connection_selector): + def __init__(self, connection_selector: ConnectionSelector): """ - :param connection_selector: - :type connection_selector: - AbstractMultiConnectionProcessConnectionSelector + :param ConnectionSelector connection_selector: """ super().__init__(connection_selector) - self._no_blocks_freed = None + self._no_blocks_freed: Optional[int] = None - def de_alloc_sdram(self, x, y, app_id, base_address=None): + def de_alloc_all_app_sdram(self, x: int, y: int, app_id: int): """ :param int x: :param int y: @@ -45,20 +42,26 @@ def de_alloc_sdram(self, x, y, app_id, base_address=None): :param base_address: :type base_address: int or None """ - callback = None # deallocate space in the SDRAM - if base_address is None: - callback = self._handle_sdram_alloc_response - self._send_request(SDRAMDeAlloc(x, y, app_id, base_address), - callback=callback) - self._finish() - self.check_for_error() + with self._collect_responses(): + self._send_request(SDRAMDeAlloc(x, y, app_id=app_id), + callback=self.__handle_sdram_alloc_response) + + def de_alloc_sdram(self, x: int, y: int, base_address: int): + """ + :param int x: + :param int y: + :param int base_address: + """ + with self._collect_responses(): + self._send_request(SDRAMDeAlloc(x, y, base_address=base_address), + callback=None) - def _handle_sdram_alloc_response(self, response): + def __handle_sdram_alloc_response(self, response): self._no_blocks_freed = response.number_of_blocks_freed @property - def no_blocks_freed(self): + def no_blocks_freed(self) -> Optional[int]: """ :rtype: int """ diff --git a/spinnman/extended/extended_transceiver.py b/spinnman/extended/extended_transceiver.py index 5ae7038f5..40689a44e 100644 --- a/spinnman/extended/extended_transceiver.py +++ b/spinnman/extended/extended_transceiver.py @@ -12,46 +12,53 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=too-many-arguments from contextlib import contextmanager -import io import os import logging from threading import Condition, RLock import time +from typing import ( + BinaryIO, Generator, Iterable, List, Mapping, Optional, Sequence, TypeVar, + Union) from spinn_utilities.log import FormatAdapter from spinn_utilities.logger_utils import warn_once from spinn_machine import CoreSubsets -from spinnman.constants import SYSTEM_VARIABLE_BASE_ADDRESS -from spinnman.transceiver import Transceiver from spinnman.constants import ( ROUTER_REGISTER_BASE_ADDRESS, ROUTER_FILTER_CONTROLS_OFFSET, - ROUTER_DIAGNOSTIC_FILTER_SIZE) + ROUTER_DIAGNOSTIC_FILTER_SIZE, SYSTEM_VARIABLE_BASE_ADDRESS) +from spinnman.transceiver import ( + Transceiver, _EXECUTABLE_ADDRESS, _ONE_BYTE, _ONE_WORD) from spinnman.data import SpiNNManDataView from spinnman.exceptions import SpinnmanException from spinnman.extended import ( BMPSetLed, DeAllocSDRAMProcess, ReadADC, SetLED, WriteMemoryFloodProcess) -from spinnman.model import DiagnosticFilter +from spinnman.model import ( + ADCInfo, BMPConnectionData, DiagnosticFilter, ExecutableTargets, + HeapElement, IOBuffer) from spinnman.model.enums import CPUState -from spinnman.messages.scp.enums import Signal -from spinnman.messages.scp.impl import ( - ReadMemory, ApplicationRun) +from spinnman.messages.scp.enums import LEDAction, Signal +from spinnman.messages.scp.impl import ReadMemory, ApplicationRun from spinnman.messages.spinnaker_boot import SystemVariableDefinition +from spinnman.messages.scp.abstract_messages import ( + AbstractSCPRequest, AbstractSCPResponse) +from spinnman.connections.abstract_classes import Connection from spinnman.connections.udp_packet_connections import ( BMPConnection, BootConnection, SCAMPConnection) from spinnman.processes import ( - GetHeapProcess, ReadMemoryProcess, SendSingleCommandProcess, - WriteMemoryProcess) -from spinnman.transceiver import _EXECUTABLE_ADDRESS, _ONE_BYTE, _ONE_WORD + GetHeapProcess, ReadMemoryProcess, WriteMemoryProcess) from spinnman.utilities.utility_functions import ( work_out_bmp_from_machine_details) logger = FormatAdapter(logging.getLogger(__name__)) +#: :meta private: +R = TypeVar("R", bound=AbstractSCPResponse) def create_transceiver_from_hostname( - hostname, version, bmp_connection_data=None, number_of_boards=None, - auto_detect_bmp=False): + hostname: Optional[str], version: int, *, + bmp_connection_data: Optional[List[BMPConnectionData]] = None, + number_of_boards: Optional[int] = None, + auto_detect_bmp: bool = False) -> 'ExtendedTransceiver': """ Create a Transceiver by creating a :py:class:`~.UDPConnection` to the given hostname on port 17893 (the default SCAMP port), and a @@ -89,13 +96,17 @@ def create_transceiver_from_hostname( """ if hostname is not None: logger.info("Creating transceiver for {}", hostname) - connections = list() + connections: List[Connection] = list() # if no BMP has been supplied, but the board is a spinn4 or a spinn5 # machine, then an assumption can be made that the BMP is at -1 on the # final value of the IP address if (version >= 4 and auto_detect_bmp is True and (bmp_connection_data is None or not bmp_connection_data)): + assert hostname is not None, \ + "hostname must be supplied to work out BMP connections" + assert number_of_boards is not None, \ + "number_of_boards must be supplied to work out BMP connections" bmp_connection_data = [ work_out_bmp_from_machine_details(hostname, number_of_boards)] @@ -135,7 +146,8 @@ class ExtendedTransceiver(Transceiver): __slots__ = ["_flood_write_lock", "_nearest_neighbour_id", "_nearest_neighbour_lock"] - def __init__(self, version, connections=None): + def __init__(self, version: int, + connections: Optional[Sequence[Connection]] = None): """ :param int version: The version of the board being connected to :param list(Connection) connections: @@ -151,7 +163,8 @@ def __init__(self, version, connections=None): :raise SpinnmanUnexpectedResponseCodeException: If a response indicates an error during the exchange """ - super().__init__(version, connections) + conns = list(connections) if connections else [] + super().__init__(version, conns) # A lock against multiple flood fill writes - needed as SCAMP cannot # cope with this @@ -161,7 +174,9 @@ def __init__(self, version, connections=None): self._nearest_neighbour_id = 1 self._nearest_neighbour_lock = RLock() - def send_scp_message(self, message, connection=None): + def send_scp_message( + self, message: AbstractSCPRequest, + connection: Optional[SCAMPConnection] = None): """ Sends an SCP message, without expecting a response. @@ -186,9 +201,10 @@ def send_scp_message(self, message, connection=None): """ if connection is None: connection = self._get_random_connection(self._scamp_connections) + assert connection is not None connection.send_scp_request(message) - def get_connections(self): + def get_connections(self) -> Iterable[Connection]: """ Get the currently known connections to the board, made up of those passed in to the transceiver and those that are discovered during @@ -199,7 +215,7 @@ def get_connections(self): """ return self._all_connections - def is_connected(self, connection=None): + def is_connected(self, connection: Optional[Connection] = None) -> bool: """ Determines if the board can be contacted. @@ -214,7 +230,8 @@ def is_connected(self, connection=None): return connection.is_connected() return any(c.is_connected() for c in self._scamp_connections) - def __set_watch_dog_on_chip(self, x, y, watch_dog): + def __set_watch_dog_on_chip( + self, x: int, y: int, watch_dog: int): """ Enable, disable or set the value of the watch dog timer on a specific chip. @@ -226,28 +243,21 @@ def __set_watch_dog_on_chip(self, x, y, watch_dog): :param int x: chip X coordinate to write new watchdog parameter to :param int y: chip Y coordinate to write new watchdog parameter to - :param watch_dog: - Either a boolean indicating whether to enable (True) or - disable (False) the watchdog timer, or an int value to set the - timer count to - :type watch_dog: bool or int + :param int watch_dog: The watchdog timeout to set """ # build what we expect it to be warn_once(logger, "The set_watch_dog_on_chip method is deprecated " "and untested due to no known use.") - value_to_set = watch_dog watchdog = SystemVariableDefinition.software_watchdog_count - if isinstance(watch_dog, bool): - value_to_set = watchdog.default if watch_dog else 0 # build data holder - data = _ONE_BYTE.pack(value_to_set) + data = _ONE_BYTE.pack(watch_dog) # write data address = SYSTEM_VARIABLE_BASE_ADDRESS + watchdog.offset - self.write_memory(x=x, y=y, base_address=address, data=data) + self.write_memory(x, y, address, data) - def set_watch_dog(self, watch_dog): + def set_watch_dog(self, watch_dog: Union[bool, int]): """ Enable, disable or set the value of the watch dog timer. @@ -264,10 +274,15 @@ def set_watch_dog(self, watch_dog): """ warn_once(logger, "The set_watch_dog method is deprecated and " "untested due to no known use.") + if isinstance(watch_dog, bool): + watchdog = SystemVariableDefinition.software_watchdog_count + value_to_set = int(watchdog.default) if watch_dog else 0 + else: + value_to_set = watch_dog for x, y in SpiNNManDataView.get_machine().chip_coordinates: - self.__set_watch_dog_on_chip(x, y, watch_dog) + self.__set_watch_dog_on_chip(x, y, value_to_set) - def get_iobuf_from_core(self, x, y, p): + def get_iobuf_from_core(self, x: int, y: int, p: int) -> IOBuffer: """ Get the contents of IOBUF for a given core. @@ -293,10 +308,11 @@ def get_iobuf_from_core(self, x, y, p): "likely to be removed.") core_subsets = CoreSubsets() core_subsets.add_processor(x, y, p) - return next(self.get_iobuf(core_subsets)) + return next(iter(self.get_iobuf(core_subsets))) @contextmanager - def _chip_execute_lock(self, x, y): + def _chip_execute_lock( + self, x: int, y: int) -> Generator[Condition, None, None]: """ Get a lock for executing an executable on a chip. @@ -328,8 +344,10 @@ def _chip_execute_lock(self, x, y): self._chip_execute_lock_condition.notify_all() def execute( - self, x, y, processors, executable, app_id, n_bytes=None, - wait=False, is_filename=False): + self, x: int, y: int, processors: Sequence[int], + executable: Union[bytes, str, BinaryIO], app_id: int, + n_bytes: Optional[int] = None, + wait: bool = False): """ Start an executable running on a single chip. @@ -359,7 +377,6 @@ def execute( * If executable is an RawIOBase, an error is raised * If executable is a bytearray, the length of the bytearray will be used - * If executable is an int, 4 will be used * If executable is a str, the length of the file will be used :param bool wait: True if the binary should enter a "wait" state on loading @@ -382,14 +399,13 @@ def execute( with self._chip_execute_lock(x, y): # Write the executable self.write_memory( - x, y, _EXECUTABLE_ADDRESS, executable, n_bytes, - is_filename=is_filename) + x, y, _EXECUTABLE_ADDRESS, executable, n_bytes=n_bytes) # Request the start of the executable - process = SendSingleCommandProcess(self._scamp_connection_selector) - process.execute(ApplicationRun(app_id, x, y, processors, wait)) + self._call(ApplicationRun(app_id, x, y, processors, wait)) - def execute_application(self, executable_targets, app_id): + def execute_application( + self, executable_targets: ExecutableTargets, app_id: int): """ Execute a set of binaries that make up a complete application on specified cores, wait for them to be ready and then start all of the @@ -406,8 +422,7 @@ def execute_application(self, executable_targets, app_id): # Execute each of the binaries and get them in to a "wait" state for binary in executable_targets.binaries: core_subsets = executable_targets.get_cores_for_binary(binary) - self.execute_flood( - core_subsets, binary, app_id, wait=True, is_filename=True) + self.execute_flood(core_subsets, binary, app_id, wait=True) # Sleep to allow cores to get going time.sleep(0.5) @@ -427,7 +442,8 @@ def execute_application(self, executable_targets, app_id): # Send a signal telling the application to start self.send_signal(app_id, Signal.START) - def set_led(self, led, action, board): + def set_led(self, led: Union[int, Iterable[int]], action: LEDAction, + board: Union[int, Iterable[int]]): """ Set the LED state of a board in the machine. @@ -448,10 +464,9 @@ def set_led(self, led, action, board): """ warn_once(logger, "The set_led method is deprecated and " "untested due to no known use.") - process = SendSingleCommandProcess(self._bmp_selector) - process.execute(BMPSetLed(led, action, board)) + self._bmp_call(BMPSetLed(led, action, board)) - def read_adc_data(self, board): + def read_adc_data(self, board: int) -> ADCInfo: """ Read the BMP ADC data. @@ -466,12 +481,13 @@ def read_adc_data(self, board): """ warn_once(logger, "The read_adc_data method is deprecated and " "untested due to no known use.") - process = SendSingleCommandProcess(self._bmp_selector) - response = process.execute(ReadADC(board)) - return response.adc_info # pylint: disable=no-member + response = self._bmp_call(ReadADC(board)) + return response.adc_info - def write_neighbour_memory(self, x, y, link, base_address, data, - n_bytes=None, offset=0, cpu=0): + def write_neighbour_memory( + self, x: int, y: int, link: int, base_address: int, + data: Union[BinaryIO, bytes, int], *, + n_bytes: Optional[int] = None, offset: int = 0, cpu: int = 0): """ Write to the memory of a neighbouring chip using a LINK_READ SCP command. If sent to a BMP, this command can be used to communicate @@ -525,20 +541,25 @@ def write_neighbour_memory(self, x, y, link, base_address, data, warn_once(logger, "The write_neighbour_memory method is deprecated " "and untested due to no known use.") process = WriteMemoryProcess(self._scamp_connection_selector) - if isinstance(data, io.RawIOBase): - process.write_link_memory_from_reader( - x, y, cpu, link, base_address, data, n_bytes) - elif isinstance(data, int): + if isinstance(data, int): data_to_write = _ONE_WORD.pack(data) process.write_link_memory_from_bytearray( x, y, cpu, link, base_address, data_to_write, 0, 4) - else: + elif isinstance(data, (bytes, bytearray)): if n_bytes is None: n_bytes = len(data) process.write_link_memory_from_bytearray( x, y, cpu, link, base_address, data, offset, n_bytes) + else: + if n_bytes is None: + raise ValueError( + "n_bytes must be provided when using a reader") + process.write_link_memory_from_reader( + x, y, cpu, link, base_address, data, n_bytes) - def read_neighbour_memory(self, x, y, link, base_address, length, cpu=0): + def read_neighbour_memory( + self, x: int, y: int, link: int, base_address: int, length: int, *, + cpu: int = 0) -> bytes: """ Read some areas of memory on a neighbouring chip using a LINK_READ SCP command. If sent to a BMP, this command can be used to @@ -582,15 +603,15 @@ def read_neighbour_memory(self, x, y, link, base_address, length, cpu=0): logger.info(self._where_is_xy(x, y)) raise - def _get_next_nearest_neighbour_id(self): + def _get_next_nearest_neighbour_id(self) -> int: with self._nearest_neighbour_lock: next_nearest_neighbour_id = (self._nearest_neighbour_id + 1) % 127 self._nearest_neighbour_id = next_nearest_neighbour_id return next_nearest_neighbour_id def write_memory_flood( - self, base_address, data, n_bytes=None, offset=0, - is_filename=False): + self, base_address: int, data: Union[BinaryIO, bytes, str, int], *, + n_bytes: Optional[int] = None, offset: int = 0): """ Write to the SDRAM of all chips. @@ -612,7 +633,6 @@ def write_memory_flood( * If `data` is an RawIOBase, an error is raised * If `data` is a bytearray or bytes, the length of the bytearray will be used - * If `data` is an int, 4 will be used * If `data` is a str, the size of the file will be used :param int offset: The offset where the valid data starts; if `data` is @@ -636,38 +656,42 @@ def write_memory_flood( with self._flood_write_lock: # Start the flood fill nearest_neighbour_id = self._get_next_nearest_neighbour_id() - if isinstance(data, io.RawIOBase): - process.write_memory_from_reader( - nearest_neighbour_id, base_address, data, n_bytes) - elif isinstance(data, str) and is_filename: + if isinstance(data, int): + data_to_write = _ONE_WORD.pack(data) + process.write_memory_from_bytearray( + nearest_neighbour_id, base_address, data_to_write, 0) + elif isinstance(data, str): if n_bytes is None: n_bytes = os.stat(data).st_size with open(data, "rb") as reader: process.write_memory_from_reader( nearest_neighbour_id, base_address, reader, n_bytes) - elif isinstance(data, int): - data_to_write = _ONE_WORD.pack(data) - process.write_memory_from_bytearray( - nearest_neighbour_id, base_address, data_to_write, 0) - else: + elif isinstance(data, (bytes, bytearray)): if n_bytes is None: n_bytes = len(data) process.write_memory_from_bytearray( nearest_neighbour_id, base_address, data, offset, n_bytes) + else: + if n_bytes is None: + raise ValueError( + "n_bytes must be provided when using a reader") + process.write_memory_from_reader( + nearest_neighbour_id, base_address, data, n_bytes) - def set_leds(self, x, y, cpu, led_states): + def set_leds(self, x: int, y: int, led_states: Mapping[int, int], *, + cpu: int = 0): """ - Set LED states. + Set chip LED states. .. warning:: The set_leds is deprecated and untested due to no known use. :param int x: The x-coordinate of the chip on which to set the LEDs :param int y: The x-coordinate of the chip on which to set the LEDs - :param int cpu: The CPU of the chip on which to set the LEDs :param dict(int,int) led_states: - A dictionary mapping SetLED index to state with + A dictionary mapping LED index to state with 0 being off, 1 on and 2 inverted. + :param int cpu: The CPU of the chip on which to set the LEDs :raise SpinnmanIOException: If there is an error communicating with the board :raise SpinnmanInvalidPacketException: @@ -680,13 +704,12 @@ def set_leds(self, x, y, cpu, led_states): try: warn_once(logger, "The set_leds is deprecated and " "untested due to no known use.") - process = SendSingleCommandProcess(self._scamp_connection_selector) - process.execute(SetLED(x, y, cpu, led_states)) + self._call(SetLED(x, y, cpu, led_states)) except Exception: logger.info(self._where_is_xy(x, y)) raise - def free_sdram(self, x, y, base_address, app_id): + def free_sdram(self, x: int, y: int, base_address: int): """ Free allocated SDRAM. @@ -696,18 +719,17 @@ def free_sdram(self, x, y, base_address, app_id): :param int x: The x-coordinate of the chip onto which to ask for memory :param int y: The y-coordinate of the chip onto which to ask for memory :param int base_address: The base address of the allocated memory - :param int app_id: The app ID of the allocated memory """ try: warn_once(logger, "The free_sdram method is deprecated and " "likely to be removed.") process = DeAllocSDRAMProcess(self._scamp_connection_selector) - process.de_alloc_sdram(x, y, app_id, base_address) + process.de_alloc_sdram(x, y, base_address) except Exception: logger.info(self._where_is_xy(x, y)) raise - def free_sdram_by_app_id(self, x, y, app_id): + def free_sdram_by_app_id(self, x: int, y: int, app_id: int) -> int: """ Free all SDRAM allocated to a given app ID. @@ -726,13 +748,16 @@ def free_sdram_by_app_id(self, x, y, app_id): warn_once(logger, "The free_sdram_by_app_id method is deprecated " "and untested due to no known use.") process = DeAllocSDRAMProcess(self._scamp_connection_selector) - process.de_alloc_sdram(x, y, app_id) - return process.no_blocks_freed + process.de_alloc_all_app_sdram(x, y, app_id) + freed = process.no_blocks_freed + assert freed is not None + return freed except Exception: logger.info(self._where_is_xy(x, y)) raise - def get_router_diagnostic_filter(self, x, y, position): + def get_router_diagnostic_filter( + self, x: int, y: int, position: int) -> DiagnosticFilter: """ Gets a router diagnostic filter from a router. @@ -763,18 +788,15 @@ def get_router_diagnostic_filter(self, x, y, position): ROUTER_REGISTER_BASE_ADDRESS + ROUTER_FILTER_CONTROLS_OFFSET + position * ROUTER_DIAGNOSTIC_FILTER_SIZE) - process = SendSingleCommandProcess( - self._scamp_connection_selector) - response = process.execute(ReadMemory(x, y, memory_position, 4)) + response = self._call(ReadMemory(x, y, memory_position, 4)) return DiagnosticFilter.read_from_int(_ONE_WORD.unpack_from( response.data, response.offset)[0]) - # pylint: disable=no-member except Exception: logger.info(self._where_is_xy(x, y)) raise @property - def number_of_boards_located(self): + def number_of_boards_located(self) -> int: """ The number of boards currently configured. @@ -791,7 +813,10 @@ def number_of_boards_located(self): # if no BMPs are available, then there's still at least one board return 1 - def get_heap(self, x, y, heap=SystemVariableDefinition.sdram_heap_address): + def get_heap( + self, x: int, y: int, + heap=SystemVariableDefinition.sdram_heap_address + ) -> Sequence[HeapElement]: """ Get the contents of the given heap on a given chip. diff --git a/spinnman/extended/read_adc.py b/spinnman/extended/read_adc.py index 6fedf28e3..2c82978e5 100644 --- a/spinnman/extended/read_adc.py +++ b/spinnman/extended/read_adc.py @@ -15,13 +15,12 @@ from spinn_utilities.overrides import overrides from spinnman.messages.scp import SCPRequestHeader from spinnman.messages.scp.abstract_messages import ( - AbstractSCPRequest, AbstractSCPResponse, BMPRequest, BMPResponse) -from spinnman.messages.scp.enums import BMPInfo, SCPCommand, SCPResult -from spinnman.exceptions import SpinnmanUnexpectedResponseCodeException + AbstractSCPRequest, BMPRequest, BMPResponse) +from spinnman.messages.scp.enums import BMPInfo, SCPCommand from spinnman.model import ADCInfo -class ReadADC(BMPRequest): +class ReadADC(BMPRequest['_SCPReadADCResponse']): """ SCP Request for the data from the BMP including voltages and temperature. @@ -33,44 +32,38 @@ class ReadADC(BMPRequest): .. note:: The equivalent code in Java is *not* deprecated. """ - __slots__ = [] + __slots__ = () - def __init__(self, board): + def __init__(self, board: int): """ - :param int board: which board to request the ADC register from + :param int board: which board to request the ADC data from """ super().__init__( board, SCPRequestHeader(command=SCPCommand.CMD_BMP_INFO), - argument_1=BMPInfo.ADC) + argument_1=BMPInfo.ADC.value) @overrides(AbstractSCPRequest.get_scp_response) - def get_scp_response(self): + def get_scp_response(self) -> '_SCPReadADCResponse': return _SCPReadADCResponse() -class _SCPReadADCResponse(BMPResponse): +class _SCPReadADCResponse(BMPResponse[ADCInfo]): """ An SCP response to a request for ADC information. """ - __slots__ = [ - "_adc_info"] + __slots__ = ("_adc_info", ) - def __init__(self): - super().__init__() + def __init__(self) -> None: + super().__init__("Read ADC", SCPCommand.CMD_BMP_INFO) self._adc_info = None - @overrides(AbstractSCPResponse.read_data_bytestring) - def read_data_bytestring(self, data, offset): - result = self.scp_response_header.result - if result != SCPResult.RC_OK: - raise SpinnmanUnexpectedResponseCodeException( - "ADC", "CMD_ADC", result.name) - self._adc_info = ADCInfo(data, offset) + def _parse_payload(self, data: bytes, offset: int) -> ADCInfo: + return ADCInfo(data, offset) @property def adc_info(self): """ The ADC information. """ - return self._adc_info + return self._value diff --git a/spinnman/extended/set_led.py b/spinnman/extended/set_led.py index 220ae5695..e485b1818 100644 --- a/spinnman/extended/set_led.py +++ b/spinnman/extended/set_led.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Mapping from spinn_utilities.overrides import overrides from spinnman.messages.scp import SCPRequestHeader from spinnman.messages.scp.abstract_messages import AbstractSCPRequest @@ -20,22 +21,24 @@ from spinnman.messages.scp.impl.check_ok_response import CheckOKResponse -class SetLED(AbstractSCPRequest): +class SetLED(AbstractSCPRequest[CheckOKResponse]): """ - A request to change the state of an SetLED. + A request to change the state of a chip's LED. This class is currently deprecated and untested as there is no known use except for Transceiver.set_led which is itself deprecated. """ - __slots__ = [] + __slots__ = () - def __init__(self, x, y, cpu, led_states): + def __init__( + self, x: int, y: int, cpu: int, led_states: Mapping[int, int]): """ :param int x: The x-coordinate of the chip, between 0 and 255 :param int y: The y-coordinate of the chip, between 0 and 255 - :param int cpu: The CPU-number to use to set the SetLED. + :param int cpu: + The CPU-number to use to set the LED. Normally should be 0 :param dict(int,int) led_states: - A dictionary mapping SetLED index to state with + A dictionary mapping LED index to state with 0 being off, 1 on and 2 inverted. """ encoded_led_states = 0 @@ -51,5 +54,5 @@ def __init__(self, x, y, cpu, led_states): argument_1=encoded_led_states) @overrides(AbstractSCPRequest.get_scp_response) - def get_scp_response(self): - return CheckOKResponse("Set SetLED", "CMD_LED") + def get_scp_response(self) -> CheckOKResponse: + return CheckOKResponse("Set chip LED", "CMD_LED") diff --git a/spinnman/extended/write_memory_flood_process.py b/spinnman/extended/write_memory_flood_process.py index 4ef491bf7..6cecb6297 100644 --- a/spinnman/extended/write_memory_flood_process.py +++ b/spinnman/extended/write_memory_flood_process.py @@ -13,10 +13,11 @@ # limitations under the License. import math +from typing import BinaryIO, Optional from spinnman.messages.scp.impl import ( FloodFillEnd, FloodFillStart, FloodFillData) -from spinnman.processes.abstract_multi_connection_process import ( - AbstractMultiConnectionProcess) +from spinnman.processes import ( + AbstractMultiConnectionProcess, ConnectionSelector) from spinnman.constants import UDP_MESSAGE_MAX_SIZE @@ -24,28 +25,27 @@ class WriteMemoryFloodProcess(AbstractMultiConnectionProcess): """ A process for writing memory on multiple SpiNNaker chips at once. """ - __slots__ = [] + __slots__ = () - def __init__(self, next_connection_selector): + def __init__(self, next_connection_selector: ConnectionSelector): AbstractMultiConnectionProcess.__init__( self, next_connection_selector, n_channels=3, intermediate_channel_waits=2) - def _start_flood_fill(self, n_bytes, nearest_neighbour_id): + def _start_flood_fill(self, n_bytes: int, nearest_neighbour_id: int): n_blocks = int(math.ceil(math.ceil(n_bytes / 4.0) / UDP_MESSAGE_MAX_SIZE)) - self._send_request( - FloodFillStart(nearest_neighbour_id, n_blocks)) - self._finish() - self.check_for_error() - - def _end_flood_fill(self, nearest_neighbour_id): - self._send_request(FloodFillEnd(nearest_neighbour_id)) - self._finish() - self.check_for_error() - - def write_memory_from_bytearray(self, nearest_neighbour_id, base_address, - data, offset, n_bytes=None): + with self._collect_responses(): + self._send_request( + FloodFillStart(nearest_neighbour_id, n_blocks)) + + def _end_flood_fill(self, nearest_neighbour_id: int): + with self._collect_responses(): + self._send_request(FloodFillEnd(nearest_neighbour_id)) + + def write_memory_from_bytearray( + self, nearest_neighbour_id: int, base_address: int, + data: bytes, offset: int, n_bytes: Optional[int] = None): """ :param int nearest_neighbour_id: :param int base_address: @@ -59,27 +59,27 @@ def write_memory_from_bytearray(self, nearest_neighbour_id, base_address, n_bytes = len(data) self._start_flood_fill(n_bytes, nearest_neighbour_id) - data_offset = offset - offset = base_address - block_no = 0 - while n_bytes > 0: - bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE)) + with self._collect_responses(): + data_offset = offset + offset = base_address + block_no = 0 + while n_bytes > 0: + bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE)) - self._send_request(FloodFillData( - nearest_neighbour_id, block_no, offset, - data, data_offset, bytes_to_send)) + self._send_request(FloodFillData( + nearest_neighbour_id, block_no, offset, + data, data_offset, bytes_to_send)) - block_no += 1 - n_bytes -= bytes_to_send - offset += bytes_to_send - data_offset += bytes_to_send - self._finish() - self.check_for_error() + block_no += 1 + n_bytes -= bytes_to_send + offset += bytes_to_send + data_offset += bytes_to_send self._end_flood_fill(nearest_neighbour_id) - def write_memory_from_reader(self, nearest_neighbour_id, base_address, - reader, n_bytes): + def write_memory_from_reader( + self, nearest_neighbour_id: int, base_address: int, + reader: BinaryIO, n_bytes: int): """ :param int nearest_neighbour_id: :param int base_address: @@ -89,20 +89,19 @@ def write_memory_from_reader(self, nearest_neighbour_id, base_address, """ self._start_flood_fill(n_bytes, nearest_neighbour_id) - offset = base_address - block_no = 0 - while n_bytes > 0: - bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE)) - data_array = reader.read(bytes_to_send) - - self._send_request(FloodFillData( - nearest_neighbour_id, block_no, offset, - data_array, 0, len(data_array))) - - block_no += 1 - n_bytes -= bytes_to_send - offset += bytes_to_send - self._finish() - self.check_for_error() + with self._collect_responses(): + offset = base_address + block_no = 0 + while n_bytes > 0: + bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE)) + data_array = reader.read(bytes_to_send) + + self._send_request(FloodFillData( + nearest_neighbour_id, block_no, offset, + data_array, 0, len(data_array))) + + block_no += 1 + n_bytes -= bytes_to_send + offset += bytes_to_send self._end_flood_fill(nearest_neighbour_id) diff --git a/spinnman/messages/scp/impl/sdram_de_alloc.py b/spinnman/messages/scp/impl/sdram_de_alloc.py index d77bb7925..ef8300d37 100644 --- a/spinnman/messages/scp/impl/sdram_de_alloc.py +++ b/spinnman/messages/scp/impl/sdram_de_alloc.py @@ -13,7 +13,7 @@ # limitations under the License. import struct -from typing import Optional +from typing import Optional, overload from spinn_utilities.overrides import overrides from spinnman.messages.scp import SCPRequestHeader from spinnman.messages.scp.abstract_messages import ( @@ -74,7 +74,17 @@ class SDRAMDeAlloc(AbstractSCPRequest[_SCPSDRAMDeAllocResponse]): """ __slots__ = "_read_n_blocks_freed", - def __init__(self, x: int, y: int, *, app_id: Optional[int], + @overload + def __init__(self, x: int, y: int, *, app_id: int, + base_address: None = None): + ... + + @overload + def __init__(self, x: int, y: int, *, app_id: None = None, + base_address: int): + ... + + def __init__(self, x: int, y: int, *, app_id: Optional[int] = None, base_address: Optional[int] = None): """ :param int x: @@ -88,6 +98,7 @@ def __init__(self, x: int, y: int, *, app_id: Optional[int], """ # pylint: disable=unsupported-binary-operation if base_address is not None: + assert app_id is None super().__init__( SDPHeader( flags=SDPFlag.REPLY_EXPECTED, destination_port=0, diff --git a/spinnman/transceiver.py b/spinnman/transceiver.py index 754e93b6f..31aa72adc 100644 --- a/spinnman/transceiver.py +++ b/spinnman/transceiver.py @@ -287,7 +287,7 @@ def __init__(self, version: int, self._machine_off = False - def __where_is_xy(self, x: int, y: int) -> str: + def _where_is_xy(self, x: int, y: int) -> str: """ Attempts to get where_is_x_y info from the machine @@ -664,7 +664,7 @@ def _boot_board(self, extra_boot_values: Optional[Dict[ self._boot_send_connection.send_boot_message(boot_message) time.sleep(2.0) - def __call(self, req: AbstractSCPRequest[R], **kwargs) -> R: + def _call(self, req: AbstractSCPRequest[R], **kwargs) -> R: """ Wrapper that makes doing simple SCP calls easier, especially with types. @@ -754,7 +754,7 @@ def ensure_board_is_ready( # Change the default SCP timeout on the machine, keeping the old one to # revert at close for scamp_connection in self._scamp_connections: - self.__call(IPTagSetTTO( + self._call(IPTagSetTTO( scamp_connection.chip_x, scamp_connection.chip_y, IPTAG_TIME_OUT_WAIT_TIMES.TIMEOUT_2560_ms)) @@ -1038,7 +1038,7 @@ def get_core_state_count(self, app_id: int, state: CPUState) -> int: :raise SpinnmanUnexpectedResponseCodeException: If a response indicates an error during the exchange """ - return self.__call(CountState(app_id, state)).count + return self._call(CountState(app_id, state)).count def execute_flood( self, core_subsets: CoreSubsets, @@ -1153,7 +1153,7 @@ def power_off(self, boards: Union[int, Iterable[int]] = 0): """ self._power(PowerCommand.POWER_OFF, boards) - def __bmp_call(self, req: AbstractSCPRequest[R], **kwargs) -> R: + def _bmp_call(self, req: AbstractSCPRequest[R], **kwargs) -> R: """ Wrapper that makes doing simple BMP calls easier, especially with types. @@ -1179,8 +1179,8 @@ def _power( BMP_POWER_ON_TIMEOUT if power_command == PowerCommand.POWER_ON else BMP_TIMEOUT) - self.__bmp_call(SetPower(power_command, boards), - timeout=timeout, n_retries=0) + self._bmp_call(SetPower(power_command, boards), + timeout=timeout, n_retries=0) self._machine_off = power_command == PowerCommand.POWER_OFF # Sleep for 5 seconds if the machine has just been powered on @@ -1201,7 +1201,7 @@ def read_fpga_register( :return: the register data :rtype: int """ - response = self.__bmp_call( + response = self._bmp_call( ReadFPGARegister(fpga_num, register, board), timeout=1.0) return response.fpga_register @@ -1219,7 +1219,7 @@ def write_fpga_register( :param int value: the value to write into the FPGA register :param int board: which board to write the FPGA register to """ - self.__bmp_call( + self._bmp_call( WriteFPGARegister(fpga_num, register, value, board)) def read_bmp_version(self, board: int) -> VersionInfo: @@ -1229,7 +1229,7 @@ def read_bmp_version(self, board: int) -> VersionInfo: :param int board: which board to request the data from :return: the sver from the BMP """ - response = self.__bmp_call(BMPGetVersion(board)) + response = self._bmp_call(BMPGetVersion(board)) return response.version_info def write_memory( @@ -1365,7 +1365,7 @@ def read_memory( process = ReadMemoryProcess(self._scamp_connection_selector) return process.read_memory(x, y, cpu, base_address, length) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def read_word( @@ -1400,7 +1400,7 @@ def read_word( (value, ) = _ONE_WORD.unpack(data) return value except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def stop_application(self, app_id: int): @@ -1420,7 +1420,7 @@ def stop_application(self, app_id: int): """ if not self._machine_off: - self.__call(AppStop(app_id)) + self._call(AppStop(app_id)) else: logger.warning( "You are calling a app stop on a turned off machine. " @@ -1440,7 +1440,7 @@ def __log_where_is_info(self, cpu_infos: Iterable[ else: xys.add((cpu_info[0], cpu_info[1])) for (x, y) in xys: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) @staticmethod def __state_set(cpu_states: _States) -> FrozenSet[CPUState]: @@ -1570,7 +1570,7 @@ def send_signal(self, app_id: int, signal: Signal): :raise SpinnmanUnexpectedResponseCodeException: If a response indicates an error during the exchange """ - self.__call(SendSignal(app_id, signal)) + self._call(SendSignal(app_id, signal)) def _locate_spinnaker_connection_for_board_address( self, board_address: str) -> Optional[SCAMPConnection]: @@ -1629,7 +1629,7 @@ def set_ip_tag(self, ip_tag: IPTag, use_sender: bool = False): ip_string = socket.gethostbyname(host_string) ip_address = bytearray(socket.inet_aton(ip_string)) - self.__call(IPTagSet( + self._call(IPTagSet( connection.chip_x, connection.chip_y, ip_address, ip_tag.port, ip_tag.tag, strip=ip_tag.strip_sdp, use_sender=use_sender)) @@ -1703,7 +1703,7 @@ def set_reverse_ip_tag(self, reverse_ip_tag: ReverseIPTag): "The given board address is not recognised") for connection in connections: - self.__call(ReverseIPTagSet( + self._call(ReverseIPTagSet( connection.chip_x, connection.chip_y, reverse_ip_tag.destination_x, reverse_ip_tag.destination_y, reverse_ip_tag.destination_p, @@ -1731,7 +1731,7 @@ def clear_ip_tag(self, tag: int, board_address: Optional[str] = None): If a response indicates an error during the exchange """ for conn in self.__get_connection_list(board_address=board_address): - self.__call(IPTagClear(conn.chip_x, conn.chip_y, tag)) + self._call(IPTagClear(conn.chip_x, conn.chip_y, tag)) def get_tags(self, connection: Optional[SCAMPConnection] = None ) -> Iterable[AbstractTag]: @@ -1781,7 +1781,7 @@ def malloc_sdram( process.malloc_sdram(x, y, size, app_id, tag) return process.base_address except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def load_multicast_routes( @@ -1813,7 +1813,7 @@ def load_multicast_routes( self._scamp_connection_selector) process.load_routes(x, y, routes, app_id) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def load_fixed_route( @@ -1844,7 +1844,7 @@ def load_fixed_route( self._scamp_connection_selector) process.load_fixed_route(x, y, fixed_route, app_id) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def read_fixed_route(self, x: int, y: int, app_id: int) -> FixedRouteEntry: @@ -1865,7 +1865,7 @@ def read_fixed_route(self, x: int, y: int, app_id: int) -> FixedRouteEntry: self._scamp_connection_selector) return process.read_fixed_route(x, y, app_id) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def get_multicast_routes( @@ -1899,7 +1899,7 @@ def get_multicast_routes( self._scamp_connection_selector, app_id) return process.get_routes(x, y, base_address) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def clear_multicast_routes(self, x: int, y: int): @@ -1918,9 +1918,9 @@ def clear_multicast_routes(self, x: int, y: int): If a response indicates an error during the exchange """ try: - self.__call(RouterClear(x, y)) + self._call(RouterClear(x, y)) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def get_router_diagnostics(self, x: int, y: int) -> RouterDiagnostics: @@ -1947,7 +1947,7 @@ def get_router_diagnostics(self, x: int, y: int) -> RouterDiagnostics: self._scamp_connection_selector) return process.get_router_diagnostics(x, y) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def set_router_diagnostic_filter( @@ -1984,7 +1984,7 @@ def set_router_diagnostic_filter( self.__set_router_diagnostic_filter( x, y, position, diagnostic_filter) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def __set_router_diagnostic_filter( @@ -2007,7 +2007,7 @@ def __set_router_diagnostic_filter( ROUTER_REGISTER_BASE_ADDRESS + ROUTER_FILTER_CONTROLS_OFFSET + position * ROUTER_DIAGNOSTIC_FILTER_SIZE) - self.__call(WriteMemory( + self._call(WriteMemory( x, y, memory_position, _ONE_WORD.pack(data_to_send))) def clear_router_diagnostic_counters(self, x: int, y: int): @@ -2028,10 +2028,10 @@ def clear_router_diagnostic_counters(self, x: int, y: int): """ try: # Clear all - self.__call(WriteMemory( + self._call(WriteMemory( x, y, 0xf100002c, _ONE_WORD.pack(0xFFFFFFFF))) except Exception: - logger.info(self.__where_is_xy(x, y)) + logger.info(self._where_is_xy(x, y)) raise def close(self) -> None: @@ -2051,7 +2051,7 @@ def control_sync(self, do_sync: bool): :param bool do_sync: Whether to synchronise or not """ - self.__call(DoSync(do_sync)) + self._call(DoSync(do_sync)) def update_provenance_and_exit(self, x: int, y: int, p: int): """