diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..5222b5d --- /dev/null +++ b/mypy.ini @@ -0,0 +1,5 @@ +[mypy] +show_absolute_path = True + +[mypy-serial] +ignore_missing_imports = True diff --git a/pycont/_logger.py b/pycont/_logger.py index 6002f96..e25e617 100644 --- a/pycont/_logger.py +++ b/pycont/_logger.py @@ -3,5 +3,5 @@ __logger_root_name__ = 'pycont' -def create_logger(name): +def create_logger(name: str) -> logging.Logger: return logging.getLogger(__logger_root_name__ + '.' + name) diff --git a/pycont/controller.py b/pycont/controller.py index 8f081f5..34c07fb 100644 --- a/pycont/controller.py +++ b/pycont/controller.py @@ -11,6 +11,8 @@ import time import json +from typing import Dict, Union, Optional, List, Any, Tuple + import serial import threading @@ -19,6 +21,8 @@ from . import pump_protocol #: Represents the Broadcast of the C3000 +from .dtprotocol import DTInstructionPacket + C3000Broadcast = '_' #: Switches the C3000 to a certain address @@ -80,19 +84,19 @@ MAX_REPEAT_OPERATION = 10 -class PumpIO(object): +class PumpIO: """ This class deals with the pump I/O instructions. Args: - port (int): The port number + port: The device name (depending on operating system. e.g. /dev/ttyUSB0 on GNU/Linux or COM3 on Windows.) - baudrate (int): Baudrate of the communication, default set to DEFAULT_IO_BAUDRATE(9600) + baudrate: Baudrate of the communication, default set to DEFAULT_IO_BAUDRATE(9600) - timeout (int): The timeout of communication, default set to DEFAULT_IO_TIMEOUT(1) + timeout: The timeout of communication, default set to DEFAULT_IO_TIMEOUT(1) """ - def __init__(self, port, baudrate=DEFAULT_IO_BAUDRATE, timeout=DEFAULT_IO_TIMEOUT): + def __init__(self, port: str, baudrate: int = DEFAULT_IO_BAUDRATE, timeout: float = DEFAULT_IO_TIMEOUT): self.logger = create_logger(self.__class__.__name__) self.lock = threading.Lock() @@ -100,19 +104,19 @@ def __init__(self, port, baudrate=DEFAULT_IO_BAUDRATE, timeout=DEFAULT_IO_TIMEOU self.port = port self.baudrate = baudrate self.timeout = timeout - self._serial = None + self._serial = None # type: Union[serial.serialposix.Serial, serial.serialwin32.Serial] self.open(port, baudrate, timeout) @classmethod - def from_config(cls, io_config): + def from_config(cls, io_config: Dict) -> 'PumpIO': """ Sets details laid out in the configuration .json file Args: - cls (Class): The initialising class. + cls: The initialising class. - io_config (Dict): Dictionary holding the configuration data. + io_config: Dictionary holding the configuration data. Returns: PumpIO: New PumpIO object with the variables set from the configuration file. @@ -133,14 +137,14 @@ def from_config(cls, io_config): return cls(port, baudrate, timeout) @classmethod - def from_configfile(cls, io_configfile): + def from_configfile(cls, io_configfile: str) -> 'PumpIO': """ Opens the configuration file and parses the data to be used in the from_config method. Args: - cls (Class): The initialising class. + cls: The initialising class. - io_configfile (File): File which contains the configuration data. + io_configfile: File which contains the configuration data. Returns: PumpIO: New PumpIO object with the variables set form the configuration file. @@ -169,16 +173,16 @@ def __exit__(self, exc_type, exc_value, traceback): """ self.close() - def open(self, port, baudrate=DEFAULT_IO_BAUDRATE, timeout=DEFAULT_IO_TIMEOUT): + def open(self, port: str, baudrate: int = DEFAULT_IO_BAUDRATE, timeout: float = DEFAULT_IO_TIMEOUT) -> None: """ Opens a communication with the hardware. Args: - port (int): The port number on which the communication will take place. + port: The port number on which the communication will take place. - baudrate (int): The baudrate of the communication, default set to DEFAULT_IO_BAUDRATE(9600). + baudrate: The baudrate of the communication, default set to DEFAULT_IO_BAUDRATE(9600). - timeout (int): The timeout of the communication, default set to DEFAULT_IO_TIMEOUT(1). + timeout: The timeout of the communication, default set to DEFAULT_IO_TIMEOUT(1). """ self._serial = serial.Serial(port, baudrate, timeout=timeout) @@ -187,7 +191,7 @@ def open(self, port, baudrate=DEFAULT_IO_BAUDRATE, timeout=DEFAULT_IO_TIMEOUT): 'baudrate': self.baudrate, 'timeout': self.timeout}) - def close(self): + def close(self) -> None: """ Closes the communication with the hardware. """ @@ -201,27 +205,27 @@ def close(self): 'baudrate': self.baudrate, 'timeout': self.timeout}) - def flush_input(self): + def flush_input(self) -> None: """ Flushes the input buffer of the serial communication. """ self._serial.reset_input_buffer() - def write(self, packet): + def write(self, packet: DTInstructionPacket) -> None: """ Writes a packet along the serial communication. Args: - packet (DTInstructionPacket): The packet to send along the serial communication. + packet: The packet to send along the serial communication. .. note:: Unsure if this is the correct packet type (GAK). """ str_to_send = packet.to_string() - self.logger.debug("Sending {}".format(str_to_send)) + self.logger.debug("Sending {!r}".format(str_to_send)) self._serial.write(str_to_send) - def readline(self): + def readline(self) -> bytes: """ Reads a line from the serial communication. @@ -237,8 +241,7 @@ def readline(self): self.logger.debug("Readline timeout!") raise PumpIOTimeOutError - ## - def write_and_readline(self, packet): + def write_and_readline(self, packet: DTInstructionPacket) -> bytes: """ Writes a packet along the serial communication and waits for a response. @@ -248,7 +251,7 @@ def write_and_readline(self, packet): .. note:: Unsure if this is the correct packet type (GAK). Returns: - response (str): The received response. + response: The received response. Raises: PumpIOTimeOutError: If the response time is greater than the timeout threshold. @@ -338,26 +341,27 @@ class C3000Controller(object): The controller is what controls the pumps. Args: - pump_io (PumpIO): PumpIO object for communication. + pump_io: PumpIO object for communication. - name (str): The name of the controller. + name: The name of the controller. - address (str): Address of the controller. + address: Address of the controller. - total_volume (float): Total volume of the pump. + total_volume: Total volume of the pump. - micro_step_mode (int): The mode which the microstep will use, default set to MICRO_STEP_MODE_2 (2) + micro_step_mode: The mode which the microstep will use, default set to MICRO_STEP_MODE_2 (2) - top_velocity (int): The top velocity of the pump, default set to 6000 + top_velocity: The top velocity of the pump, default set to 6000 - initialize_valve_position (chr): Sets the valve position, default set to VALVE_INPUT ('I') + initialize_valve_position: Sets the valve position, default set to VALVE_INPUT ('I') Raises: ValueError: Invalid microstep mode. """ - def __init__(self, pump_io, name, address, total_volume, micro_step_mode=MICRO_STEP_MODE_2, top_velocity=6000, - initialize_valve_position=VALVE_INPUT): + def __init__(self, pump_io: PumpIO, name: str, address: str, total_volume: float, + micro_step_mode: int = MICRO_STEP_MODE_2, top_velocity: int = 6000, + initialize_valve_position: str = VALVE_INPUT): self.logger = create_logger(self.__class__.__name__) self._io = pump_io @@ -383,18 +387,18 @@ def __init__(self, pump_io, name, address, total_volume, micro_step_mode=MICRO_S self.default_top_velocity = top_velocity @classmethod - def from_config(cls, pump_io, pump_name, pump_config): + def from_config(cls, pump_io: PumpIO, pump_name: str, pump_config: Dict) -> 'C3000Controller': """ Obtains the configuration data. Args: - cls (Class): The initialising class. + cls: The initialising class. - pump_io (PumpIO): PumpIO object. + pump_io: PumpIO object. - pump_name (str): Name of the pump. + pump_name: Name of the pump. - pump_config (Dict): Dictionary containing the pump configuration data. + pump_config: Dictionary containing the pump configuration data. Returns: C3000Controller: New C3000Controller object with the data set from the configuration. @@ -408,20 +412,21 @@ def from_config(cls, pump_io, pump_name, pump_config): return cls(pump_io, pump_name, **pump_config) - def write_and_read_from_pump(self, packet, max_repeat=MAX_REPEAT_WRITE_AND_READ): + def write_and_read_from_pump(self, packet: DTInstructionPacket, max_repeat: int = MAX_REPEAT_WRITE_AND_READ)\ + -> Tuple[str, str, str]: """ Writes packets to and reads the response from the pump. Args: - packet (DTInstructionPacket): The packet to be written. + packet: The packet to be written. - max_repeat (int): The maximum time to repeat the read/write operation. + max_repeat: The maximum time to repeat the read/write operation. Returns: - decoded_response (str): The decoded response. + decoded_response: The decoded response. Raises: - PumpIOTimeOutError: If the response itme is greater than the timeout threshold. + PumpIOTimeOutError: If the response time is greater than the timeout threshold. ControllerRepeatedError: Error in decoding. @@ -434,18 +439,18 @@ def write_and_read_from_pump(self, packet, max_repeat=MAX_REPEAT_WRITE_AND_READ) if decoded_response is not None: return decoded_response else: - self.logger.debug("Decode error for {}, trying again!".format(response)) + self.logger.debug("Decode error for {!r}, trying again!".format(response)) except PumpIOTimeOutError: self.logger.debug("Timeout, trying again!") self.logger.debug("Too many failed communication!") raise ControllerRepeatedError('Repeated Error from pump {}'.format(self.name)) - def volume_to_step(self, volume_in_ml): + def volume_to_step(self, volume_in_ml: float) -> int: """ Determines the number of steps for a given volume. Args: - volume_in_ml (float): Volume in millilitres. + volume_in_ml: Volume in millilitres. Returns: int(round(volume_in_ml * self.steps_per_ml)) @@ -453,27 +458,27 @@ def volume_to_step(self, volume_in_ml): """ return int(round(volume_in_ml * self.steps_per_ml)) - def step_to_volume(self, step): + def step_to_volume(self, step: int) -> float: """ Determines the volume in a specific step. Args: - step (int): Step number. + step: Step number. Returns: - step / float(self.steps_per_ml + step / float(self.steps_per_ml) """ return step / float(self.steps_per_ml) - def is_idle(self): + def is_idle(self) -> bool: """ Determines if the pump is idle or Busy Returns: - True (bool): The pump is idle. + True: The pump is idle. - False (bool): The pump is not idle. + False: The pump is not idle. Raises: ValueError: Value returned from the pump is not valid. @@ -492,14 +497,14 @@ def is_idle(self): else: raise ValueError('The pump replied status {}, Not handled'.format(status)) - def is_busy(self): + def is_busy(self) -> bool: """ Determines if the pump is busy. Returns: - True (bool): Pump is busy. + True: Pump is busy. - False (bool): Pump is idle. + False: Pump is idle. Raises: ValueError: Value returned from the pump is not valid. @@ -507,28 +512,28 @@ def is_busy(self): """ return not self.is_idle() - def wait_until_idle(self): + def wait_until_idle(self) -> None: """ Waits until the pump is not busy for WAIT_SLEEP_TIME, default set to 0.1 """ while self.is_busy(): time.sleep(WAIT_SLEEP_TIME) - def is_initialized(self): + def is_initialized(self) -> bool: """ Determines if the pump has been initialised. Returns: - True (bool): The pump is initialised. + True: The pump is initialised. - False (bool): The pump is not initialised. + False: The pump is not initialised. """ initialized_packet = self._protocol.forge_report_initialized_packet() (_, _, init_status) = self.write_and_read_from_pump(initialized_packet) return bool(int(init_status)) - def smart_initialize(self, valve_position=None, secure=True): + def smart_initialize(self, valve_position: str = None, secure: bool = True) -> None: """ Initialises the pump and sets all pump parameters. @@ -542,16 +547,16 @@ def smart_initialize(self, valve_position=None, secure=True): self.initialize(valve_position, secure=secure) self.init_all_pump_parameters(secure=secure) - def initialize(self, valve_position=None, max_repeat=MAX_REPEAT_OPERATION, secure=True): + def initialize(self, valve_position: str = None, max_repeat: int = MAX_REPEAT_OPERATION, secure: bool = True) -> bool: """ Initialises the pump. Args: - valve_position (int): Position of the valve, default set to None. + valve_position: Position of the valve, default set to None. - max_repeat (int): Maximum number of times to repeat the operation, default set to MAX_REPEAT_OPERATION (10). + max_repeat: Maximum number of times to repeat the operation, default set to MAX_REPEAT_OPERATION (10). - secure (bool): Ensures that everything is correct. + secure: Ensures that everything is correct. Raises: ControllerRepeatedError: Too many failed attempts to initialise. @@ -572,42 +577,42 @@ def initialize(self, valve_position=None, max_repeat=MAX_REPEAT_OPERATION, secur self.logger.debug("Too many failed attempts to initialize!") raise ControllerRepeatedError('Repeated Error from pump {}'.format(self.name)) - def initialize_valve_right(self, operand_value=0, wait=True): + def initialize_valve_right(self, operand_value: int = 0, wait: bool = True) -> None: """ Initialises the right valve. Args: - operand_value (int): Value of the supplied operand. + operand_value: Value of the supplied operand. - wait (bool): Whether or not to wait until the pump is idle, default set to True. + wait: Whether or not to wait until the pump is idle, default set to True. """ self.write_and_read_from_pump(self._protocol.forge_initialize_valve_right_packet(operand_value)) if wait: self.wait_until_idle() - def initialize_valve_left(self, operand_value=0, wait=True): + def initialize_valve_left(self, operand_value: int = 0, wait: bool = True) -> None: """ Initialises the left valve. Args: - operand_value (int): Value of the supplied operand, default set to 0. + operand_value: Value of the supplied operand, default set to 0. - wait (bool): Whether or not to wait until the pump is idle, default set to True. + wait: Whether or not to wait until the pump is idle, default set to True. """ self.write_and_read_from_pump(self._protocol.forge_initialize_valve_left_packet(operand_value)) if wait: self.wait_until_idle() - def initialize_no_valve(self, operand_value=None, wait=True): + def initialize_no_valve(self, operand_value: int = None, wait: bool = True) -> None: """ Initialise with no valves. Args: - operand_value (int): Value of the supplied operand. + operand_value: Value of the supplied operand. - wait (bool): Whether or not to wait until the pump is idle, default set to True. + wait: Whether or not to wait until the pump is idle, default set to True. """ @@ -621,21 +626,21 @@ def initialize_no_valve(self, operand_value=None, wait=True): if wait: self.wait_until_idle() - def initialize_valve_only(self, operand_string='0,0', wait=True): + def initialize_valve_only(self, operand_string: str = '0,0', wait: bool = True) -> None: """ Initialise with valves only. Args: - operand_string (str): Value of the supplied operand. + operand_string: Value of the supplied operand. - wait (bool): Whether or not to wait until the pump is idle, default set to True. + wait: Whether or not to wait until the pump is idle, default set to True. """ self.write_and_read_from_pump(self._protocol.forge_initialize_valve_only_packet(operand_string)) if wait: self.wait_until_idle() - def init_all_pump_parameters(self, secure=True): + def init_all_pump_parameters(self, secure: bool = True) -> None: """ Initialises the pump parameters, Microstep Mode, and Top Velocity. @@ -649,25 +654,25 @@ def init_all_pump_parameters(self, secure=True): self.set_top_velocity(self.default_top_velocity, secure=secure) self.wait_until_idle() # just in case, but should not be needed - def set_microstep_mode(self, micro_step_mode): + def set_microstep_mode(self, micro_step_mode: int) -> None: """ - Sets the Microstep Mode to use. + Sets the microstep mode to use. Args: - micro_step_mode (int): Mode to use. + micro_step_mode: Mode to use. """ self.write_and_read_from_pump(self._protocol.forge_microstep_mode_packet(micro_step_mode)) - def check_top_velocity_within_range(self, top_velocity): + def check_top_velocity_within_range(self, top_velocity: int) -> bool: """ Checks that the top velocity is within a maximum range. Args: - top_velocity (int): The top velocity for the pump. + top_velocity: The top velocity for the pump (in steps/second). Returns: - True (bool): Top velocity is within range. + True: Top velocity is within range. Raises: ValueError: Top velocity is out of range. @@ -683,56 +688,55 @@ def check_top_velocity_within_range(self, top_velocity): else: raise ValueError('Top velocity {} is not in range'.format(top_velocity)) - def set_default_top_velocity(self, top_velocity): + def set_default_top_velocity(self, top_velocity: int) -> None: """ Sets the default top velocity. Args: - top_velocity (int): The top velocity for the pump. + top_velocity (int): The top velocity for the pump (steps/second). """ self.check_top_velocity_within_range(top_velocity) self.default_top_velocity = top_velocity - def get_default_top_velocity(self): + def get_default_top_velocity(self) -> int: """ Gets the default top velocity. Returns: - self.default_top_velocity (int): The default top velocity. + self.default_top_velocity: The default top velocity. """ return self.default_top_velocity - def ensure_default_top_velocity(self, secure=True): + def ensure_default_top_velocity(self, secure: bool = True) -> None: """ Ensures that the top velocity is the default top velocity. Args: - secure (bool): Ensures that everything is correct, default set to True. + secure: Ensures that everything is correct, default set to True. """ if self.get_top_velocity() != self.default_top_velocity: self.set_top_velocity(self.default_top_velocity, secure=secure) - def set_top_velocity(self, top_velocity, max_repeat=MAX_REPEAT_OPERATION, secure=True): + def set_top_velocity(self, top_velocity: int, max_repeat: int = MAX_REPEAT_OPERATION, secure: bool = True) -> bool: """ Sets the top velocity for the pump. Args: - top_velocity (int): The top velocity. + top_velocity: The top velocity. - max_repeat (int): Maximum number of times to repeat an operation, default set to MAX_REPEAT_OPERATION (10). + max_repeat: Maximum number of times to repeat an operation, default set to MAX_REPEAT_OPERATION (10). - secure (bool): Ensures that everything is correct. + secure: Ensures that everything is correct. Returns: - True (bool): Top velocity has been set. + True: Top velocity has been set. Raises: ControllerRepeatedError: Too many failed attempts at setting the top velocity. - """ for i in range(max_repeat): if self.get_top_velocity() == top_velocity: @@ -748,24 +752,24 @@ def set_top_velocity(self, top_velocity, max_repeat=MAX_REPEAT_OPERATION, secure self.logger.debug(f"[PUMP {self.name}] Too many failed attempts in set_top_velocity!") raise ControllerRepeatedError(f'Repeated Error from pump {self.name}') - def get_top_velocity(self): + def get_top_velocity(self) -> int: """ Gets the current top velocity. Returns: - top_velocity (int): The current top velocity. + top_velocity: The current top velocity (steps/second). """ top_velocity_packet = self._protocol.forge_report_peak_velocity_packet() (_, _, top_velocity) = self.write_and_read_from_pump(top_velocity_packet) return int(top_velocity) - def get_plunger_position(self): + def get_plunger_position(self) -> int: """ Gets the current position of the plunger. Returns: - steps (int): The position of the plunger. + steps: The position of the plunger (in steps). """ plunger_position_packet = self._protocol.forge_report_plunger_position_packet() @@ -773,91 +777,92 @@ def get_plunger_position(self): return int(steps) @property - def current_steps(self): + def current_steps(self) -> int: """ See get_plunger_position() """ return self.get_plunger_position() @property - def remaining_steps(self): + def remaining_steps(self) -> int: """ Gets the remaining number of steps. Returns: - (int): self.number_of_steps - self.current_steps + self.number_of_steps - self.current_steps """ return self.number_of_steps - self.current_steps - def get_volume(self): + def get_volume(self) -> float: """ - See step_to_volume() + See step_to_volume(), in ml Returns: - (float): self.step_to_volume(self.get_plunger_position()) + self.step_to_volume(self.get_plunger_position()) """ return self.step_to_volume(self.get_plunger_position()) # in ml @property - def current_volume(self): + def current_volume(self) -> float: """ See get_volume() Returns: - (float): self.get_volume() + self.get_volume() """ return self.get_volume() @property - def remaining_volume(self): + def remaining_volume(self) -> float: """ Gets the remaining volume. Returns: - (float): self.total_volume - self.current_volume + self.total_volume - self.current_volume """ return self.total_volume - self.current_volume - def is_volume_pumpable(self, volume_in_ml): + def is_volume_pumpable(self, volume_in_ml: float) -> bool: """ Determines if the volume is pumpable. Args: - volume_in_ml (float): The supplied volume. + volume_in_ml: The supplied volume. Returns: - True (bool): If the number of steps is <= to the remaining steps. + True: If the number of steps is <= to the remaining steps. - False (bool): The number of steps is > than the remaining steps. + False: The number of steps is > than the remaining steps. """ steps = self.volume_to_step(volume_in_ml) return steps <= self.remaining_steps - def pump(self, volume_in_ml, from_valve=None, speed_in=None, wait=False, secure=True): + def pump(self, volume_in_ml: float, from_valve: str = None, speed_in: int = None, wait: bool = False, + secure: bool = True) -> bool: """ Sends the signal to initiate the pump sequence. .. warning:: Change of speed will last after the scope of this function but will be reset to default each time speed_in == None Args: - volume_in_ml (float): Volume to pump (in mL). + volume_in_ml: Volume to pump (in mL). - from_valve (chr): Pump using the valve, default set to None. + from_valve: Pump using the valve, default set to None. - speed_in (int): Speed to pump, default set to None. + speed_in: Speed to pump, default set to None. - wait (bool): Waits for the pump to be idle, default set to False. + wait: Waits for the pump to be idle, default set to False. - secure (bool): Ensures everything is correct, default set to True. + secure: Ensures everything is correct, default set to True. Returns: - True (bool): The supplied volume is pumpable. + True: The supplied volume is pumpable. - False (bool): Supplied volume is not pumpable. + False: Supplied volume is not pumpable. """ if self.is_volume_pumpable(volume_in_ml): @@ -881,38 +886,39 @@ def pump(self, volume_in_ml, from_valve=None, speed_in=None, wait=False, secure= else: return False - def is_volume_deliverable(self, volume_in_ml): + def is_volume_deliverable(self, volume_in_ml: float) -> bool: """ Determines if the supplied volume is deliverable. Args: - volume_in_ml (float): The supplied volume in mL. + volume_in_ml: The supplied volume in mL. Returns: - True (bool): The number of steps is <= the current steps. + True: The number of steps is <= the current steps. - False (bool): The number of steps is > the current steps. + False: The number of steps is > the current steps. """ steps = self.volume_to_step(volume_in_ml) return steps <= self.current_steps - def deliver(self, volume_in_ml, to_valve=None, speed_out=None, wait=False, secure=True): + def deliver(self, volume_in_ml: float, to_valve: str = None, speed_out: int = None, wait: bool = False, + secure: bool = True) -> bool: """ Delivers the volume payload. .. warning:: Change of speed will last after the scope of this function but will be reset to default each time speed_out == None Args: - volume_in_ml (float): The supplied volume to deliver. + volume_in_ml: The supplied volume to deliver. - to_valve (chr): The valve to deliver the payload to, default set to None. + to_valve: The valve to deliver the payload to, default set to None. - speed_out (int): The speed of delivery, default set to None. + speed_out: The speed of delivery, default set to None. - wait (bool): Waits for the pump to be idle, default set to False. + wait: Waits for the pump to be idle, default set to False. - secure (bool): Ensures that everything is correct, default set to False. + secure: Ensures that everything is correct, default set to False. """ if self.is_volume_deliverable(volume_in_ml): @@ -939,20 +945,21 @@ def deliver(self, volume_in_ml, to_valve=None, speed_out=None, wait=False, secur else: return False - def transfer(self, volume_in_ml, from_valve, to_valve, speed_in=None, speed_out=None): + def transfer(self, volume_in_ml: float, from_valve: str, to_valve: str, speed_in: int = None, + speed_out: int = None) -> None: """ Transfers the desired volume in mL. Args: - volume_in_ml (float): The volume to transfer. + volume_in_ml: The volume to transfer. - from_valve (chr): The valve to transfer from. + from_valve: The valve to transfer from. - to_valve (chr): The valve to transfer to. + to_valve: The valve to transfer to. - speed_in (int): The speed of transfer to valve, default set to None. + speed_in: The speed of transfer to valve, default set to None. - speed_out (int): The speed of transfer from the valve, default set to None. + speed_out: The speed of transfer from the valve, default set to None. """ volume_transferred = min(volume_in_ml, self.remaining_volume) @@ -963,40 +970,40 @@ def transfer(self, volume_in_ml, from_valve, to_valve, speed_in=None, speed_out= if remaining_volume_to_transfer > 0: self.transfer(remaining_volume_to_transfer, from_valve, to_valve, speed_in, speed_out) - def is_volume_valid(self, volume_in_ml): + def is_volume_valid(self, volume_in_ml: float) -> bool: """ Determines if the supplied volume is valid. Args: - volume_in_ml (float): The supplied volume. + volume_in_ml: The supplied volume. Returns: - True (bool): The supplied volume is <= the total volume and >= 0 + True: The supplied volume is <= the total volume and >= 0 - False (bool): The supplied volume is > total volume or < 0 + False: The supplied volume is > total volume or < 0 """ return 0 <= volume_in_ml <= self.total_volume - def go_to_volume(self, volume_in_ml, speed=None, wait=False, secure=True): + def go_to_volume(self, volume_in_ml: float, speed: int = None, wait: bool = False, secure: bool = True) -> bool: """ Moves the pump to the desired volume. .. warning:: Change of speed will last after the scope of this function but will be reset to default each time speed == None Args: - volume_in_ml (float): The supplied volume. + volume_in_ml: The supplied volume. - speed (int): The speed of movement, default set to None. + speed: The speed of movement, default set to None. - wait (bool): Waits for the pump to be idle, default set to False. + wait: Waits for the pump to be idle, default set to False. - secure (bool): Ensures that everything is correct, default set to True. + secure: Ensures that everything is correct, default set to True. Returns: - True (bool): The supplied volume is valid. + True: The supplied volume is valid. - False (bool): THe supplied volume is not valid. + False: THe supplied volume is not valid. """ if self.is_volume_valid(volume_in_ml): @@ -1017,44 +1024,44 @@ def go_to_volume(self, volume_in_ml, speed=None, wait=False, secure=True): else: return False - def go_to_max_volume(self, speed=None, wait=False): + def go_to_max_volume(self, speed: int = None, wait: bool = False) -> None: """ Moves the pump to the maximum volume. Args: - speed (int): The speed of movement, default set to None. + speed: The speed of movement, default set to None. - wait (bool): Waits until the pump is idle, default set to False. + wait: Waits until the pump is idle, default set to False. Returns: - True (bool): The maximum volume is valid. + True: The maximum volume is valid. - False (bool): The maximum volume is not valid. + False: The maximum volume is not valid. """ self.go_to_volume(self.total_volume, speed=speed, wait=wait) - def get_raw_valve_position(self): + def get_raw_valve_position(self) -> str: """ Gets the raw value of the valve's position. Returns: - raw_valve_position (str): The raw position of the valve. + raw_valve_position: The raw position of the valve. """ valve_position_packet = self._protocol.forge_report_valve_position_packet() (_, _, raw_valve_position) = self.write_and_read_from_pump(valve_position_packet) return raw_valve_position - def get_valve_position(self, max_repeat=MAX_REPEAT_OPERATION): + def get_valve_position(self, max_repeat: int = MAX_REPEAT_OPERATION) -> str: """ Gets the position of the valve. Args: - max_repeat (int): Maximum number of times to repeat an operation, default set to MAX_REPEAT_OPERATION (10). + max_repeat: Maximum number of times to repeat an operation, default set to MAX_REPEAT_OPERATION (10). Returns: - (chr): The position of the valve. + The position of the valve. Raises: ValueError: The valve position is not valid/unknown. @@ -1076,19 +1083,19 @@ def get_valve_position(self, max_repeat=MAX_REPEAT_OPERATION): self.logger.debug(f"Valve position request failed attempt {i+1}/{max_repeat}, {raw_valve_position} unknown") raise ValueError(f'Valve position received was {raw_valve_position}. It is unknown') - def set_valve_position(self, valve_position, max_repeat=MAX_REPEAT_OPERATION, secure=True): + def set_valve_position(self, valve_position: str, max_repeat: int = MAX_REPEAT_OPERATION, secure: bool = True) -> bool: """ Sets the position of the valve. Args: - valve_position (str): Position of the valve. + valve_position: Position of the valve. - max_repeat (int): maximum number of times to repeat an operation, default set to MAX_REPEAT_OPERATION (10). + max_repeat: maximum number of times to repeat an operation, default set to MAX_REPEAT_OPERATION (10). - secure (bool): Ensures that everything is correct, default set to True. + secure: Ensures that everything is correct, default set to True. Returns: - True (bool): The valve position has been set. + True: The valve position has been set. Raises: ValueError: The valve position is invalid/unknown. @@ -1127,12 +1134,12 @@ def set_valve_position(self, valve_position, max_repeat=MAX_REPEAT_OPERATION, se self.logger.debug("[PUMP {}] Too many failed attempts in set_valve_position!".format(self.name)) raise ControllerRepeatedError('Repeated Error from pump {}'.format(self.name)) - def set_eeprom_config(self, operand_value): + def set_eeprom_config(self, operand_value: int) -> None: """ Sets the configuration of the EEPROM on the pumps. Args: - operand_value (int): The value of the supplied operand. + operand_value: The value of the supplied operand. """ eeprom_config_packet = self._protocol.forge_eeprom_config_packet(operand_value) @@ -1151,32 +1158,32 @@ def set_eeprom_config(self, operand_value): print("Unpower and repower the pump to make changes active!") print("####################################################") - def set_eeprom_lowlevel_config(self, command, operand): + def set_eeprom_lowlevel_config(self, command: int, operand: str) -> None: """ Sets the configuration of the EEPROM on the pumps. Args: - command (int): The value of the command to be issued. - operand (int): The value of the supplied operand. + command: The value of the command to be issued. + operand: The value of the supplied operand. """ eeprom_packet = self._protocol.forge_eeprom_lowlevel_config_packet(sub_command=command, operand_value=operand) self.write_and_read_from_pump(eeprom_packet) - def flash_eeprom_3_way_y_valve(self): + def flash_eeprom_3_way_y_valve(self) -> None: """ Sets the EEPROM config of the pump to use a 3-way Y valve (I/O operations) Requires switching of the jumper pin on the back of the pump from the top set of pins to the bottom. """ self.set_eeprom_config(1) - def flash_eeprom_3_way_t_valve(self): + def flash_eeprom_3_way_t_valve(self) -> None: """ Sets the EEPROM config of the pump to use a 3-way T valve (I/O operations) """ self.set_eeprom_config(5) - def flash_eeprom_4_way_nondist_valve(self): + def flash_eeprom_4_way_nondist_valve(self) -> None: """ Sets the EEPROM config of the pump to use a 4-way Non-Dist valve (I/O/E operations) Note in this configuration it is not possible to pump to E! @@ -1184,24 +1191,24 @@ def flash_eeprom_4_way_nondist_valve(self): """ self.set_eeprom_config(2) - def flash_eeprom_4_way_dist_valve(self): + def flash_eeprom_4_way_dist_valve(self) -> None: """ Sets the EEPROM config of the pump to use a 4-way Dist Valve (I/O/E operations) """ self.set_eeprom_config(4) - def get_eeprom_config(self): + def get_eeprom_config(self) -> str: """ Gets the EEPROM configuration. Returns: - eeprom_config (int): The configuration of the EEPROM. + eeprom_config: The configuration of the EEPROM. """ (_, _, eeprom_config) = self.write_and_read_from_pump(self._protocol.forge_report_eeprom_packet()) return eeprom_config - def get_current_valve_config(self): + def get_current_valve_config(self) -> str: """ Infers the current valve configuration based on the EEPROM data. """ @@ -1228,7 +1235,7 @@ def get_current_valve_config(self): return current_valve_config - def terminate(self): + def terminate(self) -> None: """ Sends the command to terminate the current action. """ @@ -1313,13 +1320,13 @@ class MultiPumpController(object): This class deals with controlling multiple pumps on one or more hubs at a time. Args: - setup_config (Dict): The configuration of the setup. + setup_config: The configuration of the setup. """ - def __init__(self, setup_config): + def __init__(self, setup_config: Dict): self.logger = create_logger(self.__class__.__name__) - self.pumps = {} - self._io = [] + self.pumps: Dict[str, C3000Controller] = {} + self._io: Union[PumpIO, List[PumpIO]] = [] # Sets groups and default configs if provided in the config dictionary self.groups = setup_config['groups'] if 'groups' in setup_config else {} @@ -1342,14 +1349,14 @@ def __init__(self, setup_config): self.set_pumps_as_attributes() @classmethod - def from_configfile(cls, setup_configfile): + def from_configfile(cls, setup_configfile: str) -> 'MultiPumpController': """ Obtains the configuration data from the supplied configuration file. Args: - cls (Class): The initialising class. + cls: The initialising class. - setup_configfile (File): The configuration file. + setup_configfile: The configuration file. Returns: MultiPumpController: A new MultiPumpController object with the configuration set from the config file. @@ -1358,15 +1365,15 @@ def from_configfile(cls, setup_configfile): with open(setup_configfile) as f: return cls(json.load(f)) - def default_pump_config(self, pump_specific_config): + def default_pump_config(self, pump_specific_config: Dict) -> Dict: """ Creates a default pump configuration. Args: - pump_specific_config (Dict): Dictionary containing the pump configuration. + pump_specific_config: Dictionary containing the pump configuration. Returns: - combined_pump_config (Dict): A new default pump configuration mirroring that of pump_config. + combined_pump_config: A new default pump configuration mirroring that of pump_config. """ # Makes a copy of the default values (this is needed because we are going to merge default with pump settings) @@ -1379,7 +1386,7 @@ def default_pump_config(self, pump_specific_config): # Returns the combination of default settings and pump specific settings return combined_pump_config - def set_pumps_as_attributes(self): + def set_pumps_as_attributes(self) -> None: """ Sets the pumps as attributes. """ @@ -1390,15 +1397,15 @@ def set_pumps_as_attributes(self): else: setattr(self, pump_name, pump) - def get_pumps(self, pump_names): + def get_pumps(self, pump_names: List[str]) -> List[C3000Controller]: """ Obtains a list of all pumps with name in pump_names. Args: - pump_names (List): A list of the pump names + pump_names: A list of the pump names Returns: - pumps (List): A list of the pump objects. + pumps: A list of the pump objects. """ pumps = [] @@ -1409,15 +1416,15 @@ def get_pumps(self, pump_names): pass return pumps - def get_pumps_in_group(self, group_name): + def get_pumps_in_group(self, group_name: str) -> Optional[List[C3000Controller]]: """ Obtains a list of all pumps with group_name. Args: - group_name (List): The group name + group_name: The group name Returns: - pumps (List): A list of the pump objects in the group. None for non-existing groups. + pumps: A list of the pump objects in the group. None for non-existing groups. """ pumps = [] @@ -1430,18 +1437,18 @@ def get_pumps_in_group(self, group_name): pumps.append(self.pumps[pump_name]) return pumps - def get_all_pumps(self): + def get_all_pumps(self) -> Dict[str, C3000Controller]: """ Obtains a list of all pumps. Returns: - pumps (List): A list of the all the pump objects in the Controller. + pumps: A list of the all the pump objects in the Controller. """ return self.pumps - def apply_command_to_pumps(self, pump_names, command, *args, **kwargs): + def apply_command_to_pumps(self, pump_names: List[str], command: str, *args, **kwargs) -> Dict[str, Any]: """ Applies a given command to the pumps. @@ -1455,7 +1462,7 @@ def apply_command_to_pumps(self, pump_names, command, *args, **kwargs): **kwargs: Arbitrary keyword arguments. Returns: - returns (Dict): Dictionary of the functions. + returns (Dict): Dictionary of the functions return. """ returns = {} @@ -1465,7 +1472,7 @@ def apply_command_to_pumps(self, pump_names, command, *args, **kwargs): return returns - def apply_command_to_all_pumps(self, command, *args, **kwargs): + def apply_command_to_all_pumps(self, command: str, *args, **kwargs) -> Dict[str, Any]: """ Applies a given command to all of the pumps. @@ -1482,33 +1489,33 @@ def apply_command_to_all_pumps(self, command, *args, **kwargs): """ return self.apply_command_to_pumps(list(self.pumps.keys()), command, *args, **kwargs) - def apply_command_to_group(self, group_name, command, *args, **kwargs): + def apply_command_to_group(self, group_name: str, command: str, *args, **kwargs) -> Dict[str, Any]: """ Applies a given command to the group. Args: - group_name (str): Name of the group. + group_name: Name of the group. - command (str): The command to apply. + command: The command to apply. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: - returns (Dict) Dictionary of the functions. + returns Dictionary of the functions. """ return self.apply_command_to_pumps(self.groups[group_name], command, *args, **kwargs) - def are_pumps_initialized(self): + def are_pumps_initialized(self) -> bool: """ Determines if the pumps have been initialised. Returns: - True (bool): The pumps have been initialised. + True: The pumps have been initialised. - False (bool): The pumps have not been initialised. + False: The pumps have not been initialised. """ for pump in list(self.pumps.values()): @@ -1516,12 +1523,12 @@ def are_pumps_initialized(self): return False return True - def smart_initialize(self, secure=True): + def smart_initialize(self, secure: bool = True) -> None: """ Initialises the pumps, setting all parameters. Args: - secure (bool): Ensures everything is correct, default set to True. + secure: Ensures everything is correct, default set to True. """ for pump in list(self.pumps.values()): @@ -1542,32 +1549,32 @@ def smart_initialize(self, secure=True): self.apply_command_to_all_pumps('init_all_pump_parameters', secure=secure) self.wait_until_all_pumps_idle() - def wait_until_all_pumps_idle(self): + def wait_until_all_pumps_idle(self) -> None: """ Sends the command 'wait_until_idle' to the pumps. """ self.apply_command_to_all_pumps('wait_until_idle') - def wait_until_group_idle(self, group_name): + def wait_until_group_idle(self, group_name: str) -> None: """ Sends the command ' wait_until_idle' to all pumps of a group. """ self.apply_command_to_group(group_name=group_name, command='wait_until_idle') - def terminate_all_pumps(self): + def terminate_all_pumps(self) -> None: """ Sends the command 'terminate' to all the pumps. """ self.apply_command_to_all_pumps('terminate') - def are_pumps_idle(self): + def are_pumps_idle(self) -> bool: """ Determines if the pumps are idle. Returns: - True (bool): The pumps are idle. + True: The pumps are idle. - False (bool): The pumps are not idle. + False: The pumps are not idle. """ for pump in list(self.pumps.values()): @@ -1575,34 +1582,35 @@ def are_pumps_idle(self): return False return True - def are_pumps_busy(self): + def are_pumps_busy(self) -> bool: """ Determines if the pumps are busy. Returns: - True (bool): The pumps are busy. + True: The pumps are busy. - False (bool): The pumps are not busy. + False: The pumps are not busy. """ return not self.are_pumps_idle() - def pump(self, pump_names, volume_in_ml, from_valve=None, speed_in=None, wait=False, secure=True): + def pump(self, pump_names: List[str], volume_in_ml: float, from_valve: str = None, speed_in: float = None, + wait: bool = False, secure: bool = True) -> None: """ Pumps the desired volume. Args: - pump_names (List): The name of the pumps. + pump_names: The name of the pumps. - volume_in_ml (float): The volume to be pumped. + volume_in_ml: The volume to be pumped. - from_valve (chr): The valve to pump from. + from_valve: The valve to pump from. - speed_in (int): The speed at which to pump, default set to None. + speed_in: The speed at which to pump, default set to None. - wait (bool): Waits for the pumps to be idle, default set to False. + wait: Waits for the pumps to be idle, default set to False. - secure (bool): Ensures everything is correct, default set to False. + secure: Ensures everything is correct, default set to False. """ if speed_in is not None: @@ -1618,22 +1626,23 @@ def pump(self, pump_names, volume_in_ml, from_valve=None, speed_in=None, wait=Fa if wait: self.apply_command_to_pumps(pump_names, 'wait_until_idle') - def deliver(self, pump_names, volume_in_ml, to_valve=None, speed_out=None, wait=False, secure=True): + def deliver(self, pump_names: List[str], volume_in_ml: float, to_valve: str = None, speed_out: int = None, + wait: bool = False, secure: bool = True) -> None: """ Delivers the desired volume. Args: - pump_names (List): The name of the pumps. + pump_names: The name of the pumps. - volume_in_ml (float): The volume to be delivered. + volume_in_ml: The volume to be delivered. - to_valve (chr): The valve to deliver to. + to_valve: The valve to deliver to. - speed_out (int): The speed at which to deliver. + speed_out: The speed at which to deliver. - wait (bool): Wait for the pumps to be idle, default set to False. + wait: Wait for the pumps to be idle, default set to False. - secure (bool): Ensures everything is correct, default set to True. + secure: Ensures everything is correct, default set to True. """ if speed_out is not None: @@ -1649,24 +1658,25 @@ def deliver(self, pump_names, volume_in_ml, to_valve=None, speed_out=None, wait= if wait: self.apply_command_to_pumps(pump_names, 'wait_until_idle') - def transfer(self, pump_names, volume_in_ml, from_valve, to_valve, speed_in=None, speed_out=None, secure=True): + def transfer(self, pump_names: List[str], volume_in_ml: float, from_valve: str, to_valve: str, + speed_in: int = None, speed_out: int = None, secure: bool = True) -> None: """ Transfers the desired volume between pumps. Args: - pump_names (List): The name of the pumps. + pump_names: The name of the pumps. - volume_in_ml (float): The volume to be transferred. + volume_in_ml: The volume to be transferred. - from_valve (chr): The valve to transfer from. + from_valve: The valve to transfer from. - to_valve (chr): the valve to transfer to. + to_valve: the valve to transfer to. - speed_in (int): The speed at which to receive transfer, default set to None. + speed_in: The speed at which to receive transfer, default set to None. - speed_out (int): The speed at which to transfer, default set to None + speed_out: The speed at which to transfer, default set to None - secure (bool): Ensures that everything is correct, default set to False. + secure: Ensures that everything is correct, default set to False. """ volume_transferred = float('inf') # Temporary value for the first cycle only, see below @@ -1681,25 +1691,25 @@ def transfer(self, pump_names, volume_in_ml, from_valve, to_valve, speed_in=None if remaining_volume_to_transfer > 0: self.transfer(pump_names, remaining_volume_to_transfer, from_valve, to_valve, speed_in, speed_out) - def parallel_transfer(self, pumps_and_volumes_dict: dict, from_valve: str, to_valve: str, - speed_in=None, speed_out=None, secure=True, wait=False): + def parallel_transfer(self, pumps_and_volumes_dict: Dict, from_valve: str, to_valve: str, + speed_in: int = None, speed_out: int = None, secure: bool = True, wait: bool = False) -> bool: """ Transfers the desired volume between pumps. Args: - pumps_and_volumes_dict (dict): The names and volumes to be pumped for each pump. + pumps_and_volumes_dict: The names and volumes to be pumped for each pump. - from_valve (chr): The valve to transfer from. + from_valve: The valve to transfer from. - to_valve (chr): the valve to transfer to. + to_valve: the valve to transfer to. - speed_in (int): The speed at which to receive transfer, default set to None. + speed_in: The speed at which to receive transfer, default set to None. - speed_out (int): The speed at which to transfer, default set to None + speed_out: The speed at which to transfer, default set to None - secure (bool): Ensures that everything is correct, default set to False. + secure: Ensures that everything is correct, default set to False. - wait (bool): Wait for the pumps to be idle, default set to False. + wait: Wait for the pumps to be idle, default set to False. """ @@ -1738,6 +1748,7 @@ def parallel_transfer(self, pumps_and_volumes_dict: dict, from_valve: str, to_va self.parallel_transfer(left_to_pump, from_valve, to_valve, speed_in, speed_out, secure) elif wait is True: # If no more pumping is needed wait if needed self.apply_command_to_pumps(list(pumps_and_volumes_dict.keys()), "wait_until_idle") + return True class VirtualMultiPumpController(MultiPumpController): diff --git a/pycont/dtprotocol.py b/pycont/dtprotocol.py index c19db14..e1e2277 100644 --- a/pycont/dtprotocol.py +++ b/pycont/dtprotocol.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import itertools +from typing import List, Tuple, Optional from ._logger import create_logger @@ -8,86 +9,85 @@ DTStop = '\r' -class DTInstructionPacket(object): - """ This class is used to represent a DT instruction packet. - - Args: - address (str): The address to talk to - - dtcommands (list): List of DTCommand - - (for more details see http://www.tricontinent.com/products/cseries-syringe-pumps) - """ - - def __init__(self, address, dtcommands): - self.address = address.encode() - self.dtcommands = dtcommands - - def to_array(self): - commands = ''.encode() - for dtcommand in self.dtcommands: - commands += dtcommand.to_string() - return bytearray(itertools.chain(DTStart.encode(), - self.address, - commands, - DTStop.encode(), )) - - def to_string(self): - return bytes(self.to_array()) - - class DTCommand(object): """ This class is used to represent a DTcommand. Args: - command (str): The command to be sent + command: The command to be sent - operand (str): The parameter of the command, None by default + operand: The parameter of the command, None by default (for more details see http://www.tricontinent.com/products/cseries-syringe-pumps) """ - def __init__(self, command, operand=None): + def __init__(self, command: str, operand: str = None): self.command = command.encode() if operand is not None: self.operand = operand.encode() else: - self.operand = None + self.operand = None # type: ignore - def to_array(self): + def to_array(self) -> bytearray: if self.operand is None: chain = itertools.chain(self.command) else: chain = itertools.chain(self.command, self.operand) return bytearray(chain) - def to_string(self): + def to_string(self) -> bytes: return bytes(self.to_array()) def __str__(self): return "command: " + str(self.command.decode()) + " operand: " + str(self.operand) -class DTStatus(object): +class DTInstructionPacket: + """ This class is used to represent a DT instruction packet. + + Args: + address: The address to talk to + + dtcommands: List of DTCommand + + (for more details see http://www.tricontinent.com/products/cseries-syringe-pumps) + """ + + def __init__(self, address: str, dtcommands: List[DTCommand]): + self.address = address.encode() + self.dtcommands = dtcommands + + def to_array(self) -> bytearray: + commands = ''.encode() + for dtcommand in self.dtcommands: + commands += dtcommand.to_string() + return bytearray(itertools.chain(DTStart.encode(), + self.address, + commands, + DTStop.encode(), )) + + def to_string(self) -> bytes: + return bytes(self.to_array()) + +class DTStatus(object): """ This class is used to represent a DTstatus, the response of the device from a command. Args: - response (str): The response from the device + response: The response from the device (for more details see http://www.tricontinent.com/products/cseries-syringe-pumps) """ - def __init__(self, response): + def __init__(self, response: bytes): self.logger = create_logger(self.__class__.__name__) try: self.response = response.decode() except UnicodeDecodeError: - self.logger.debug('Could not decode {}'.format(response)) - self.response = None + self.logger.debug('Could not decode {!r}'.format(response)) + self.response = None # type: ignore - def decode(self): + def decode(self) -> Optional[Tuple[str, str, str]]: if self.response is not None: info = self.response.rstrip().rstrip('\x03').lstrip(DTStart) address = info[0] diff --git a/pycont/pump_protocol.py b/pycont/pump_protocol.py index 331a7a1..01b2885 100644 --- a/pycont/pump_protocol.py +++ b/pycont/pump_protocol.py @@ -7,6 +7,8 @@ """ # -*- coding: utf-8 -*- +from typing import List, Union, Optional, Tuple + from ._logger import create_logger from . import dtprotocol @@ -113,46 +115,47 @@ STATUS_BUSY_VALVE_OVERLOAD, STATUS_BUSY_PLUNGER_STUCK) -class C3000Protocol(object): +class C3000Protocol: """ This class is used to represent the protocol which the pumps will follow when controlled. Args: - address (str): Address of the pump. + address: Address of the pump. """ - def __init__(self, address): + def __init__(self, address: str): self.logger = create_logger(self.__class__.__name__) self.address = address - def forge_packet(self, dtcommands: dtprotocol.DTCommand, execute=True) -> dtprotocol.DTInstructionPacket: + def forge_packet(self, dtcommands: Union[List[dtprotocol.DTCommand], dtprotocol.DTCommand], + execute: bool = True) -> dtprotocol.DTInstructionPacket: """ Creates a packet which will be sent to the device. Args: - dtcommands (list): List of dtcommands. + dtcommands: DTCommand or list of DTCommands. - execute (bool): Sets the execute value, True by default. + execute: Sets the execute value, True by default. Returns: DTInstructionPacket: The packet created. """ self.logger.debug("Forging packet with {} and execute set to {}".format(dtcommands, execute)) - if type(dtcommands) == dtprotocol.DTCommand: + if isinstance(dtcommands, dtprotocol.DTCommand): dtcommands = [dtcommands] if execute: dtcommands.append(dtprotocol.DTCommand(CMD_EXECUTE)) return dtprotocol.DTInstructionPacket(self.address, dtcommands) # handling answers - def decode_packet(self, dtresponse): + def decode_packet(self, dtresponse: bytes) -> Optional[Tuple[str, str, str]]: """ Decodes the response packet form the device. Args: - dtresponse (str): The response from the device. + dtresponse: The response from the device. Returns: DTStatus: The decoded status of the device. @@ -170,12 +173,12 @@ def decode_packet(self, dtresponse): # the functions below should be generated automatically but not really needed for now - def forge_initialize_valve_right_packet(self, operand_value=0): + def forge_initialize_valve_right_packet(self, operand_value: int = 0) -> dtprotocol.DTInstructionPacket: """ Creates a packet for initialising the right valve. Args: - operand_value (int): The value of the supplied operand, 0 by default. + operand_value: The value of the supplied operand, 0 by default. Returns: DTInstructionPacket: The packet created for initialising the right valve. @@ -184,12 +187,12 @@ def forge_initialize_valve_right_packet(self, operand_value=0): dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_VALVE_RIGHT, str(operand_value)) return self.forge_packet(dtcommand) - def forge_initialize_valve_left_packet(self, operand_value=0): + def forge_initialize_valve_left_packet(self, operand_value: int = 0) -> dtprotocol.DTInstructionPacket: """ Creates a packet for initialising the left valve. Args: - operand_value (int): The value of the supplied operand, 0 by default. + operand_value: The value of the supplied operand, 0 by default. Returns: DTInstructionPacket: The packet created for initialising the left valve. @@ -198,12 +201,12 @@ def forge_initialize_valve_left_packet(self, operand_value=0): dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_VALVE_LEFT, str(operand_value)) return self.forge_packet(dtcommand) - def forge_initialize_no_valve_packet(self, operand_value=0): + def forge_initialize_no_valve_packet(self, operand_value: int = 0) -> dtprotocol.DTInstructionPacket: """ Creates a packet for initialising with no valves. Args: - operand_value (int): The value of the supplied operand, 0 by default. + operand_value: The value of the supplied operand, 0 by default. Returns: DTInstructionPacket: The packet created for initialising with no valves. @@ -212,12 +215,13 @@ def forge_initialize_no_valve_packet(self, operand_value=0): dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_NO_VALVE, str(operand_value)) return self.forge_packet(dtcommand) - def forge_initialize_valve_only_packet(self, operand_string=None): + def forge_initialize_valve_only_packet(self, operand_string: Optional[str] = None)\ + -> dtprotocol.DTInstructionPacket: """ Creates a packet for initialising with valves only. Args: - operand_string (str): String representing the operand, None by default + operand_string: String representing the operand, None by default Returns: DTInstructionPacket: The packet created for initialising with valves only @@ -226,12 +230,12 @@ def forge_initialize_valve_only_packet(self, operand_string=None): dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_VALVE_ONLY, operand_string) return self.forge_packet(dtcommand) - def forge_microstep_mode_packet(self, operand_value): + def forge_microstep_mode_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket: """ Creates a packet for initialising microstep mode. Args: - operand_value (int): The value of the supplied operand. + operand_value: The value of the supplied operand. Returns: DTInstructionPacket: The packet created for initialising microstep mode. @@ -242,12 +246,12 @@ def forge_microstep_mode_packet(self, operand_value): dtcommand = dtprotocol.DTCommand(CMD_MICROSTEPMODE, str(operand_value)) return self.forge_packet(dtcommand) - def forge_move_to_packet(self, operand_value): + def forge_move_to_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket: """ Creates a packet for moving the device to a location. Args: - operand_value (int): The value of the supplied operand. + operand_value: The value of the supplied operand. Returns: DTInstructionPacket: The packet created for moving the device to a location. @@ -256,12 +260,12 @@ def forge_move_to_packet(self, operand_value): dtcommand = dtprotocol.DTCommand(CMD_MOVE_TO, str(operand_value)) return self.forge_packet(dtcommand) - def forge_pump_packet(self, operand_value): + def forge_pump_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket: """ Creates a packet for the pump action of the device. Args: - operand_value (int): The value of the supplied operand + operand_value: The value of the supplied operand Returns: DTInstructionPacket: The packet created for the pump action of the device. @@ -270,12 +274,12 @@ def forge_pump_packet(self, operand_value): dtcommand = dtprotocol.DTCommand(CMD_PUMP, str(operand_value)) return self.forge_packet(dtcommand) - def forge_deliver_packet(self, operand_value): + def forge_deliver_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket: """ Creates a packet for delivering the payload. Args: - operand_value (int): The value of the supplied operand. + operand_value: The value of the supplied operand. Returns: DTInstructionPacket: The packet created for delivering the payload. @@ -284,12 +288,12 @@ def forge_deliver_packet(self, operand_value): dtcommand = dtprotocol.DTCommand(CMD_DELIVER, str(operand_value)) return self.forge_packet(dtcommand) - def forge_top_velocity_packet(self, operand_value): + def forge_top_velocity_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket: """ Creates a packet for the top velocity of the device. Args: - operand_value (int): The value of the supplied operand. + operand_value: The value of the supplied operand. Returns: DTInstructionPacket: The packet created for the top velocity of the device. @@ -298,12 +302,12 @@ def forge_top_velocity_packet(self, operand_value): dtcommand = dtprotocol.DTCommand(CMD_TOPVELOCITY, str(int(operand_value))) return self.forge_packet(dtcommand) - def forge_eeprom_config_packet(self, operand_value): + def forge_eeprom_config_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket: """ Creates a packet for accessing the EEPROM configuration of the device. Args: - operand_value (int): The value of the supplied operand. + operand_value: The value of the supplied operand. Returns: DTInstructionPacket: The packet created for accessing the EEPROM configuration of the device. @@ -312,13 +316,14 @@ def forge_eeprom_config_packet(self, operand_value): dtcommand = dtprotocol.DTCommand(CMD_EEPROM_CONFIG, str(operand_value)) return self.forge_packet(dtcommand, execute=False) - def forge_eeprom_lowlevel_config_packet(self, sub_command=20, operand_value="pycont1"): + def forge_eeprom_lowlevel_config_packet(self, sub_command: int = 20, operand_value: str = "pycont1")\ + -> dtprotocol.DTInstructionPacket: """ Creates a packet for accessing the EEPROM configuration of the device. Args: - sub_command (int): Sub-command value (0-20) - operand_value (str): The value of the supplied operand. + sub_command: Sub-command value (0-20) + operand_value: The value of the supplied operand. Returns: DTInstructionPacket: The packet created for accessing the EEPROM configuration of the device. @@ -328,7 +333,7 @@ def forge_eeprom_lowlevel_config_packet(self, sub_command=20, operand_value="pyc dtcommand = dtprotocol.DTCommand(CMD_EEPROM_LOWLEVEL_CONFIG, str(sub_command) + "_" + str(operand_value)) return self.forge_packet(dtcommand, execute=False) - def forge_valve_input_packet(self): + def forge_valve_input_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for the input into a valve on the device. @@ -338,7 +343,7 @@ def forge_valve_input_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_INPUT)) - def forge_valve_output_packet(self): + def forge_valve_output_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for the output from a valve on the device. @@ -348,7 +353,7 @@ def forge_valve_output_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_OUTPUT)) - def forge_valve_bypass_packet(self): + def forge_valve_bypass_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for bypassing a valve on the device. @@ -358,7 +363,7 @@ def forge_valve_bypass_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_BYPASS)) - def forge_valve_extra_packet(self): + def forge_valve_extra_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for an extra valve. @@ -368,7 +373,7 @@ def forge_valve_extra_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_EXTRA)) - def forge_valve_6way_packet(self, valve_position): + def forge_valve_6way_packet(self, valve_position: str) -> dtprotocol.DTInstructionPacket: """ Creates a packet for the 6way valve on the device. @@ -378,7 +383,7 @@ def forge_valve_6way_packet(self, valve_position): """ return self.forge_packet(dtprotocol.DTCommand('{}{}'.format(CMD_VALVE_INPUT, valve_position))) - def forge_report_status_packet(self): + def forge_report_status_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the device status. @@ -388,7 +393,7 @@ def forge_report_status_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_STATUS)) - def forge_report_plunger_position_packet(self): + def forge_report_plunger_position_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the device's plunger position. @@ -398,7 +403,7 @@ def forge_report_plunger_position_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_PLUNGER_POSITION)) - def forge_report_start_velocity_packet(self): + def forge_report_start_velocity_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the device's start velocity. @@ -408,7 +413,7 @@ def forge_report_start_velocity_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_START_VELOCITY)) - def forge_report_peak_velocity_packet(self): + def forge_report_peak_velocity_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the device's peak velocity. @@ -418,7 +423,7 @@ def forge_report_peak_velocity_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_PEAK_VELOCITY)) - def forge_report_cutoff_velocity_packet(self): + def forge_report_cutoff_velocity_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the device's cutoff velocity. @@ -428,7 +433,7 @@ def forge_report_cutoff_velocity_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_CUTOFF_VELOCITY)) - def forge_report_valve_position_packet(self): + def forge_report_valve_position_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the device's valve position. @@ -438,7 +443,7 @@ def forge_report_valve_position_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_VALVE_POSITION)) - def forge_report_initialized_packet(self): + def forge_report_initialized_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the initialisation of the device. @@ -448,7 +453,7 @@ def forge_report_initialized_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_INTIALIZED)) - def forge_report_eeprom_packet(self): + def forge_report_eeprom_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates a packet for reporting the EEPROM. @@ -458,7 +463,7 @@ def forge_report_eeprom_packet(self): """ return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_EEPROM)) - def forge_terminate_packet(self): + def forge_terminate_packet(self) -> dtprotocol.DTInstructionPacket: """ Creates the data packet for terminating the current command diff --git a/pycont/py.typed b/pycont/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py index b624c97..3ed8100 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import find_packages, setup -VERSION = '1.0.1' +VERSION = '1.0.2' setup(name="pycont", version=VERSION, @@ -8,4 +8,9 @@ author="Jonathan Grizou", author_email='jonathan.grizou@glasgow.ac.uk', packages=find_packages(), + package_data={ + "pycont": ["py.typed"] + }, + include_package_data=True, + install_requires=['pyserial'] )