Skip to content

Commit

Permalink
Type hinting for pump_protocol.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Dario Cambié committed May 11, 2020
1 parent 1772db6 commit 5c837b1
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions pycont/pump_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
# -*- coding: utf-8 -*-
from typing import List, Union
from typing import List, Union, Optional, Tuple

from ._logger import create_logger

Expand Down Expand Up @@ -115,15 +115,15 @@
STATUS_BUSY_VALVE_OVERLOAD, STATUS_BUSY_PLUNGER_STUCK)


class C3000Protocol(object):
class C3000Protocol:
"""
This class is used to represent the protocol which the pumps will follow when controlled.
Args:
address (str): Address of the pump.
address: Address of the pump.
"""
def __init__(self, address):
def __init__(self, address: str):
self.logger = create_logger(self.__class__.__name__)

self.address = address
Expand All @@ -150,12 +150,12 @@ def forge_packet(self, dtcommands: Union[List[dtprotocol.DTCommand], dtprotocol.
return dtprotocol.DTInstructionPacket(self.address, dtcommands)

# handling answers
def decode_packet(self, dtresponse):
def decode_packet(self, dtresponse: bytes) -> Optional[Tuple[str, str, str]]:
"""
Decodes the response packet form the device.
Args:
dtresponse (str): The response from the device.
dtresponse: The response from the device.
Returns:
DTStatus: The decoded status of the device.
Expand All @@ -173,12 +173,12 @@ def decode_packet(self, dtresponse):

# the functions below should be generated automatically but not really needed for now

def forge_initialize_valve_right_packet(self, operand_value=0):
def forge_initialize_valve_right_packet(self, operand_value: int = 0) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for initialising the right valve.
Args:
operand_value (int): The value of the supplied operand, 0 by default.
operand_value: The value of the supplied operand, 0 by default.
Returns:
DTInstructionPacket: The packet created for initialising the right valve.
Expand All @@ -187,12 +187,12 @@ def forge_initialize_valve_right_packet(self, operand_value=0):
dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_VALVE_RIGHT, str(operand_value))
return self.forge_packet(dtcommand)

def forge_initialize_valve_left_packet(self, operand_value=0):
def forge_initialize_valve_left_packet(self, operand_value: int = 0) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for initialising the left valve.
Args:
operand_value (int): The value of the supplied operand, 0 by default.
operand_value: The value of the supplied operand, 0 by default.
Returns:
DTInstructionPacket: The packet created for initialising the left valve.
Expand All @@ -201,12 +201,12 @@ def forge_initialize_valve_left_packet(self, operand_value=0):
dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_VALVE_LEFT, str(operand_value))
return self.forge_packet(dtcommand)

def forge_initialize_no_valve_packet(self, operand_value=0):
def forge_initialize_no_valve_packet(self, operand_value: int = 0) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for initialising with no valves.
Args:
operand_value (int): The value of the supplied operand, 0 by default.
operand_value: The value of the supplied operand, 0 by default.
Returns:
DTInstructionPacket: The packet created for initialising with no valves.
Expand All @@ -215,12 +215,13 @@ def forge_initialize_no_valve_packet(self, operand_value=0):
dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_NO_VALVE, str(operand_value))
return self.forge_packet(dtcommand)

def forge_initialize_valve_only_packet(self, operand_string=None):
def forge_initialize_valve_only_packet(self, operand_string: Optional[str] = None)\
-> dtprotocol.DTInstructionPacket:
"""
Creates a packet for initialising with valves only.
Args:
operand_string (str): String representing the operand, None by default
operand_string: String representing the operand, None by default
Returns:
DTInstructionPacket: The packet created for initialising with valves only
Expand All @@ -229,12 +230,12 @@ def forge_initialize_valve_only_packet(self, operand_string=None):
dtcommand = dtprotocol.DTCommand(CMD_INITIALIZE_VALVE_ONLY, operand_string)
return self.forge_packet(dtcommand)

def forge_microstep_mode_packet(self, operand_value):
def forge_microstep_mode_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for initialising microstep mode.
Args:
operand_value (int): The value of the supplied operand.
operand_value: The value of the supplied operand.
Returns:
DTInstructionPacket: The packet created for initialising microstep mode.
Expand All @@ -245,12 +246,12 @@ def forge_microstep_mode_packet(self, operand_value):
dtcommand = dtprotocol.DTCommand(CMD_MICROSTEPMODE, str(operand_value))
return self.forge_packet(dtcommand)

def forge_move_to_packet(self, operand_value):
def forge_move_to_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for moving the device to a location.
Args:
operand_value (int): The value of the supplied operand.
operand_value: The value of the supplied operand.
Returns:
DTInstructionPacket: The packet created for moving the device to a location.
Expand All @@ -259,12 +260,12 @@ def forge_move_to_packet(self, operand_value):
dtcommand = dtprotocol.DTCommand(CMD_MOVE_TO, str(operand_value))
return self.forge_packet(dtcommand)

def forge_pump_packet(self, operand_value):
def forge_pump_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for the pump action of the device.
Args:
operand_value (int): The value of the supplied operand
operand_value: The value of the supplied operand
Returns:
DTInstructionPacket: The packet created for the pump action of the device.
Expand All @@ -273,12 +274,12 @@ def forge_pump_packet(self, operand_value):
dtcommand = dtprotocol.DTCommand(CMD_PUMP, str(operand_value))
return self.forge_packet(dtcommand)

def forge_deliver_packet(self, operand_value):
def forge_deliver_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for delivering the payload.
Args:
operand_value (int): The value of the supplied operand.
operand_value: The value of the supplied operand.
Returns:
DTInstructionPacket: The packet created for delivering the payload.
Expand All @@ -287,12 +288,12 @@ def forge_deliver_packet(self, operand_value):
dtcommand = dtprotocol.DTCommand(CMD_DELIVER, str(operand_value))
return self.forge_packet(dtcommand)

def forge_top_velocity_packet(self, operand_value):
def forge_top_velocity_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for the top velocity of the device.
Args:
operand_value (int): The value of the supplied operand.
operand_value: The value of the supplied operand.
Returns:
DTInstructionPacket: The packet created for the top velocity of the device.
Expand All @@ -301,12 +302,12 @@ def forge_top_velocity_packet(self, operand_value):
dtcommand = dtprotocol.DTCommand(CMD_TOPVELOCITY, str(int(operand_value)))
return self.forge_packet(dtcommand)

def forge_eeprom_config_packet(self, operand_value):
def forge_eeprom_config_packet(self, operand_value: int) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for accessing the EEPROM configuration of the device.
Args:
operand_value (int): The value of the supplied operand.
operand_value: The value of the supplied operand.
Returns:
DTInstructionPacket: The packet created for accessing the EEPROM configuration of the device.
Expand All @@ -315,13 +316,14 @@ def forge_eeprom_config_packet(self, operand_value):
dtcommand = dtprotocol.DTCommand(CMD_EEPROM_CONFIG, str(operand_value))
return self.forge_packet(dtcommand, execute=False)

def forge_eeprom_lowlevel_config_packet(self, sub_command=20, operand_value="pycont1"):
def forge_eeprom_lowlevel_config_packet(self, sub_command: int = 20, operand_value: str = "pycont1")\
-> dtprotocol.DTInstructionPacket:
"""
Creates a packet for accessing the EEPROM configuration of the device.
Args:
sub_command (int): Sub-command value (0-20)
operand_value (str): The value of the supplied operand.
sub_command: Sub-command value (0-20)
operand_value: The value of the supplied operand.
Returns:
DTInstructionPacket: The packet created for accessing the EEPROM configuration of the device.
Expand All @@ -331,7 +333,7 @@ def forge_eeprom_lowlevel_config_packet(self, sub_command=20, operand_value="pyc
dtcommand = dtprotocol.DTCommand(CMD_EEPROM_LOWLEVEL_CONFIG, str(sub_command) + "_" + str(operand_value))
return self.forge_packet(dtcommand, execute=False)

def forge_valve_input_packet(self):
def forge_valve_input_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for the input into a valve on the device.
Expand All @@ -341,7 +343,7 @@ def forge_valve_input_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_INPUT))

def forge_valve_output_packet(self):
def forge_valve_output_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for the output from a valve on the device.
Expand All @@ -351,7 +353,7 @@ def forge_valve_output_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_OUTPUT))

def forge_valve_bypass_packet(self):
def forge_valve_bypass_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for bypassing a valve on the device.
Expand All @@ -361,7 +363,7 @@ def forge_valve_bypass_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_BYPASS))

def forge_valve_extra_packet(self):
def forge_valve_extra_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for an extra valve.
Expand All @@ -371,7 +373,7 @@ def forge_valve_extra_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_VALVE_EXTRA))

def forge_valve_6way_packet(self, valve_position):
def forge_valve_6way_packet(self, valve_position: str) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for the 6way valve on the device.
Expand All @@ -381,7 +383,7 @@ def forge_valve_6way_packet(self, valve_position):
"""
return self.forge_packet(dtprotocol.DTCommand('{}{}'.format(CMD_VALVE_INPUT, valve_position)))

def forge_report_status_packet(self):
def forge_report_status_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the device status.
Expand All @@ -391,7 +393,7 @@ def forge_report_status_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_STATUS))

def forge_report_plunger_position_packet(self):
def forge_report_plunger_position_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the device's plunger position.
Expand All @@ -401,7 +403,7 @@ def forge_report_plunger_position_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_PLUNGER_POSITION))

def forge_report_start_velocity_packet(self):
def forge_report_start_velocity_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the device's start velocity.
Expand All @@ -411,7 +413,7 @@ def forge_report_start_velocity_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_START_VELOCITY))

def forge_report_peak_velocity_packet(self):
def forge_report_peak_velocity_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the device's peak velocity.
Expand All @@ -421,7 +423,7 @@ def forge_report_peak_velocity_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_PEAK_VELOCITY))

def forge_report_cutoff_velocity_packet(self):
def forge_report_cutoff_velocity_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the device's cutoff velocity.
Expand All @@ -431,7 +433,7 @@ def forge_report_cutoff_velocity_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_CUTOFF_VELOCITY))

def forge_report_valve_position_packet(self):
def forge_report_valve_position_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the device's valve position.
Expand All @@ -441,7 +443,7 @@ def forge_report_valve_position_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_VALVE_POSITION))

def forge_report_initialized_packet(self):
def forge_report_initialized_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the initialisation of the device.
Expand All @@ -451,7 +453,7 @@ def forge_report_initialized_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_INTIALIZED))

def forge_report_eeprom_packet(self):
def forge_report_eeprom_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates a packet for reporting the EEPROM.
Expand All @@ -461,7 +463,7 @@ def forge_report_eeprom_packet(self):
"""
return self.forge_packet(dtprotocol.DTCommand(CMD_REPORT_EEPROM))

def forge_terminate_packet(self):
def forge_terminate_packet(self) -> dtprotocol.DTInstructionPacket:
"""
Creates the data packet for terminating the current command
Expand Down

0 comments on commit 5c837b1

Please sign in to comment.