From b78efc7d462fac235810a672da69c7024c323c20 Mon Sep 17 00:00:00 2001 From: zhengpuas47 Date: Wed, 6 Mar 2024 18:24:35 -0500 Subject: [PATCH] new hamilton tests --- storm_control/fluidics/default_config.xml | 62 +++ storm_control/fluidics/kilroy.py | 56 +-- storm_control/fluidics/kilroyProtocols.py | 12 +- storm_control/fluidics/pumps/hamilton_psd4.py | 297 ++++++++++++ storm_control/fluidics/pumps/pumpCommands.py | 82 ++-- .../scope_settings/default_config.xml | 94 ++-- storm_control/fluidics/valves/Note.txt | 0 storm_control/fluidics/valves/hamilton.py | 1 + storm_control/fluidics/valves/hamilton_mvp.py | 459 ++++++++++++++++++ .../fluidics/valves/hamilton_mvp4.py | 386 +++++++++++++++ storm_control/fluidics/valves/valveChain.py | 46 +- 11 files changed, 1340 insertions(+), 155 deletions(-) create mode 100644 storm_control/fluidics/default_config.xml create mode 100644 storm_control/fluidics/pumps/hamilton_psd4.py create mode 100644 storm_control/fluidics/valves/Note.txt create mode 100644 storm_control/fluidics/valves/hamilton_mvp.py create mode 100644 storm_control/fluidics/valves/hamilton_mvp4.py diff --git a/storm_control/fluidics/default_config.xml b/storm_control/fluidics/default_config.xml new file mode 100644 index 0000000..1f63514 --- /dev/null +++ b/storm_control/fluidics/default_config.xml @@ -0,0 +1,62 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Flow Wash + Normal Flow + Stop Flow + + + Normal Flow + Set Hyb 1 + Flow Hybridization + Flow Wash + Flow STORM Buffer + Stop Flow + + + Normal Flow + Set Hyb 2 + Flow Hybridization + Flow Wash + Flow STORM Buffer + Stop Flow + + + \ No newline at end of file diff --git a/storm_control/fluidics/kilroy.py b/storm_control/fluidics/kilroy.py index 2c56279..10250ca 100644 --- a/storm_control/fluidics/kilroy.py +++ b/storm_control/fluidics/kilroy.py @@ -1,11 +1,11 @@ #!/usr/bin/python # ---------------------------------------------------------------------------------------- # A master control class to implemented a series of automated flow protocols -# using a daisy chained valve system (and eventually syringe pumps) +# using a daisy chained valve system and peristaltic or syringe pumps # ---------------------------------------------------------------------------------------- # Jeff Moffitt # 12/28/13 -# jeffmoffitt@gmail.com +# jeffrey.moffitt@childrens.harvard.edu # ---------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------- @@ -14,12 +14,9 @@ import sys import os import time -import sys -sys.path.append(r"..\..") from PyQt5 import QtCore, QtGui, QtWidgets from storm_control.fluidics.valves.valveChain import ValveChain -from storm_control.fluidics.pumps.pumpControl import PeristalticPumpControl -from storm_control.fluidics.pumps.pumpControl import SyringePumpControl +import storm_control.fluidics.pumps.pumpControl as pumpControl from storm_control.fluidics.kilroyProtocols import KilroyProtocols from storm_control.sc_library.tcpServer import TCPServer import storm_control.sc_library.parameters as params @@ -33,21 +30,8 @@ def __init__(self, parameters): # Parse parameters into internal attributes self.verbose = parameters.get("verbose") - self.valve_com_port = parameters.get("valves_com_port") self.tcp_port = parameters.get("tcp_port") - self.pump_com_port = parameters.get("pump_com_port") - self.pump_ID = parameters.get("pump_ID") - # pump type - if not parameters.has("pump_type"): - self.pump_type = 'peristaltic' - else: - self.pump_type = parameters.get('pump_type') - - if not parameters.has("num_simulated_valves"): - self.num_simulated_valves = 0 - else: - self.num_simulated_valves = parameters.get("num_simulated_valves") - + if not parameters.has("valve_type"): self.valve_type = 'Hamilton' else: @@ -72,22 +56,23 @@ def __init__(self, parameters): self.received_message = None # Create ValveChain instance - self.valveChain = ValveChain(com_port = self.valve_com_port, - num_simulated_valves = self.num_simulated_valves, - valve_type=self.valve_type, - verbose = self.verbose) + self.valveChain = ValveChain(parameters = parameters) # Create PumpControl instance - if self.pump_type == 'peristaltic': - self.pumpControl = PeristalticPumpControl(parameters=parameters) - elif self.pump_type == 'syringe': - self.pumpControl = SyringePumpControl(parameters=parameters) - + pump_type = parameters.get("pump_type", "peristaltic") + + if pump_type == "peristaltic": + self.pumpControl = pumpControl.PeristalticPumpControl(parameters = parameters) + elif pump_type == "syringe": + self.pumpControl = pumpControl.SyringePumpControl(parameters = parameters) + else: + print("Unrecognized pump_type requested") + assert False + # Create KilroyProtocols instance and connect signals self.kilroyProtocols = KilroyProtocols(protocol_xml_path = self.protocols_file, command_xml_path = self.commands_file, - verbose = self.verbose, - pumpType = self.pump_type) + verbose = self.verbose) self.kilroyProtocols.command_ready_signal.connect(self.sendCommand) self.kilroyProtocols.status_change_signal.connect(self.handleProtocolStatusChange) @@ -102,6 +87,8 @@ def __init__(self, parameters): # Create GUI self.createGUI() + + # Update protocols # ---------------------------------------------------------------------------------------- # Close @@ -172,8 +159,6 @@ def handleTCPData(self, message): # ---------------------------------------------------------------------------------------- def sendCommand(self): command_data = self.kilroyProtocols.getCurrentCommand() - print("**", command_data) - if command_data[0] == "valve": self.valveChain.receiveCommand(command_data[1]) elif command_data[0] == "pump": @@ -224,6 +209,11 @@ def __init__(self, parameters, parent = None): valve_menu = menubar.addMenu("&Valves") for menu_item in self.kilroy.valveChain.menu_items[0]: valve_menu.addAction(menu_item) + + if not self.kilroy.pumpControl.menu_items is None: + pump_menu = menubar.addMenu("&Pump") + for menu_item in self.kilroy.pumpControl.menu_items[0]: + pump_menu.addAction(menu_item) # ---------------------------------------------------------------------------------------- # Handle dragEnterEvent diff --git a/storm_control/fluidics/kilroyProtocols.py b/storm_control/fluidics/kilroyProtocols.py index 3b64943..b6973bc 100644 --- a/storm_control/fluidics/kilroyProtocols.py +++ b/storm_control/fluidics/kilroyProtocols.py @@ -8,8 +8,6 @@ # Jeff Moffitt # 2/15/14 # jeffmoffitt@gmail.com -# -# Updated 7/2019 by George Emanuel # ---------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------- @@ -35,8 +33,7 @@ class KilroyProtocols(QtWidgets.QMainWindow): def __init__(self, protocol_xml_path = "default_config.xml", command_xml_path = "default_config.xml", - verbose = False, - pumpType='peristaltic'): + verbose = False): super(KilroyProtocols, self).__init__() # Initialize internal attributes @@ -50,7 +47,6 @@ def __init__(self, self.status = [-1, -1] # Protocol ID, command ID within protocol self.issued_command = [] self.received_message = None - self.pumpType = pumpType print("----------------------------------------------------------------------") @@ -63,8 +59,7 @@ def __init__(self, # Create instance of PumpCommands class self.pumpCommands = PumpCommands(xml_file_path = self.command_xml_path, - verbose = self.verbose, - pumpType = pumpType) + verbose = self.verbose) # Connect pump commands issue signal self.pumpCommands.change_command_signal.connect(self.issuePumpCommand) @@ -479,7 +474,8 @@ def stopProtocol(self): # Unselect all self.protocolDetailsList.setCurrentRow(0) - self.protocolDetailsList.item(0).setSelected(False) + if self.protocolDetailsList.item(0) is not None: + self.protocolDetailsList.item(0).setSelected(False) # Stop timers self.poll_elapsed_time_timer.stop() diff --git a/storm_control/fluidics/pumps/hamilton_psd4.py b/storm_control/fluidics/pumps/hamilton_psd4.py new file mode 100644 index 0000000..2a5b686 --- /dev/null +++ b/storm_control/fluidics/pumps/hamilton_psd4.py @@ -0,0 +1,297 @@ +#!/usr/bin/python +# ---------------------------------------------------------------------------------------- +# A basic class for the control of the PSD4 syringe pump from Hamilton +# ---------------------------------------------------------------------------------------- +# Jeff Moffitt +# 11/20/21 +# jeffrey.moffitt@childrens.harvard.edu +# +# ---------------------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------------------- +# Import +# ---------------------------------------------------------------------------------------- +import sys +import time +import serial + +class APump(): + def __init__(self, + parameters = False): + + # Define attributes + self.com_port = parameters.get("pump_com_port", "COM3") + self.pump_ID = parameters.get("pump_ID", "PSD4") + self.verbose = parameters.get("verbose", True) + self.simulate = parameters.get("simulate_pump", False) + self.serial_verbose = parameters.get("serial_verbose", False) + self.high_res_mode = parameters.get("high_res", True) + self.syringe_volume = parameters.get("syringe_volume", 12.5) + self.syringe_type = parameters.get("syringe_type", "standard") + + self.min_velocity_in_steps_s = 2 + self.max_velocity_in_steps_s = 10000 + self.min_stroke_in_steps = 0 + self.max_stroke_in_steps = None + + # Check syringe type + if self.syringe_type == "standard": + self.high_res_step = 24000.0 + self.low_res_step = 3000.0 + elif self.syringe_type == "smooth_flow": + self.high_res_step = 192000.0 + self.low_res_step = 24000.0 + else: + print("The provided syringe type for the PSD4 is not valid") + assert False + + # Define the resolution mode + if self.high_res_mode: + self.max_stroke_in_steps = self.high_res_step + else: + self.max_stroke_in_steps = self.low_res_step + + self.steps_to_volume = self.syringe_volume/self.max_stroke_in_steps + + # Create serial port + self.serial = serial.Serial(port = self.com_port, + timeout=0.1) + + # Define initial pump status + self.flow_status = "Stopped" + self.speed = 0.0 + self.num_ports = 0 + + # Configure pump + self.configurePump() + self.identification = "PSD4" + + # Report configuration + print("--------------------------------") + print("Configured PSD4 Syringe Pump") + print(" PSD4 Type: " + str(self.syringe_type)) + print(" Syringe Volume: " + str(self.syringe_volume) + " mL") + print(" High Res Mode: " + str(self.high_res_mode)) + print(" Steps for Full Fill: " + str(self.max_stroke_in_steps)) + print(" Minimum Speed: " + str(self.min_velocity_in_steps_s * self.syringe_volume * (4/self.high_res_step) * 60 ) + " mL/min") + + def initializePump(self): + message = "/1ZR\r" + self.write(message) + response = self.read() + + if len(response) < 2: + print("PSD4 not found") + assert False + + def configurePump(self): + + # Determine the port configuration + message = "/1?21000R\r" + self.write(message) + response = self.read() + + if len(response) < 2: + assert False + + num_ports_dict = {'0': 3, + '1': 4, + '2': 3, + '3': 8, + '4': 4, + '6': 6} + + self.num_ports = num_ports_dict[chr(response[3])] + + # Set the resolution + if self.high_res_mode: + message = '/1N1R\r' + else: + message = '/1N0R\r' + + self.write(message) + response = self.read() + if len(response) < 2: + assert False + + + def getStatus(self): + # Determine if is moving + message = "/1Q\r" + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + response_char = chr(response[2]) + if response_char == '@': + is_moving = True + elif response_char == "`": + is_moving = False + else: + print("Unknown response from PSD4") + print(response) + assert False + + # Determine the syringe position and return in mL + message = '/1?R\r' + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + start_pos = 3 + end_pos = response.find('\x03'.encode()) + pos_in_units = float(response[start_pos:end_pos].decode()) + pos_in_mL = pos_in_units*self.steps_to_volume + + # Determine current speed + message = '/1?2R\r' + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + start_pos = 3 + end_pos = response.find('\x03'.encode()) + vel_in_units = float(response[start_pos:end_pos].decode()) + vel_in_mLmin = vel_in_units*self.syringe_volume*60/(self.high_res_step/4) + + # Determine valve numerical position + message = '/1?24000R\r' + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + start_pos = 3 + end_pos = response.find('\x03'.encode()) + valve_pos = int(response[start_pos:end_pos].decode()) + + return (is_moving, pos_in_mL, vel_in_mLmin, valve_pos) + + def setPort(self, port_id): + # Check to see if it is within the number of ports + if port_id >= self.num_ports: + print("An invalid port was requested for the PSD4") + assert False + + # Set the port + message = "/1h2500" + str(port_id+1) + "R\r" + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + def close(self): + self.serial.close() + + def setSpeed(self, fill_speed_in_mLmin): + + # Convert the requested speed to steps per s + fill_speed_in_mLs = fill_speed_in_mLmin/60 + new_speed_value = int((fill_speed_in_mLs/self.syringe_volume) * (self.high_res_step/4)) + + # Coerce to the hardware limits + if new_speed_value < self.min_velocity_in_steps_s: + new_speed_value = self.min_velocity_in_steps_s + print("Coerced pump speed to lowest value") + + if new_speed_value > self.max_velocity_in_steps_s: + new_speed_value = self.max_velocity_in_steps_s + print("Coerced pump speed to highest value") + + message = '/1V' + str(new_speed_value) + 'R\r' + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + def startFill(self, new_volume): + # Define the volume + new_step_pos = int(new_volume/self.steps_to_volume) + + # Coerce to the hardware limits + if new_step_pos < self.min_stroke_in_steps: + new_step_pos = self.min_stroke_in_steps + print("Coerced pump fill to lowest value") + + if new_step_pos > self.max_stroke_in_steps: + new_step_pos = self.max_stroke_in_steps + print("Coerced pump fill to highest value") + + # Define and write the message + message = '/1A' + str(new_step_pos) + 'R\r' + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + def stopFill(self): + message = '/1TR\r' + + self.write(message) + response = self.read() + + if len(response)<2: + print("Unknown response from PSD4") + assert False + + def read(self): + # response = self.serial.readline().decode() + response = self.serial.readline() + + if self.verbose: + print("Received: " + str((response, ""))) + return response + + def write(self, message): + self.serial.write(message.encode()) + if self.verbose: + print("Wrote: " + message[:-1]) # Display all but final carriage return + + +# +# The MIT License +# +# Copyright (c) 2021 Moffitt Laboratory, Boston Children's Hospital +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + diff --git a/storm_control/fluidics/pumps/pumpCommands.py b/storm_control/fluidics/pumps/pumpCommands.py index 2c92adb..e43c4d8 100644 --- a/storm_control/fluidics/pumps/pumpCommands.py +++ b/storm_control/fluidics/pumps/pumpCommands.py @@ -5,9 +5,6 @@ # Jeff Moffitt # 2/16/14 # jeffmoffitt@gmail.com -# -# -# Updated 7/3/2019 by George Emanuel for syringe pump # ---------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------- @@ -28,8 +25,7 @@ class PumpCommands(QtWidgets.QMainWindow): def __init__(self, xml_file_path="default_config.xml", - verbose = False, - pumpType = 'syringe'): + verbose = False): super(PumpCommands, self).__init__() # Initialize internal attributes @@ -39,7 +35,6 @@ def __init__(self, self.commands = [] self.num_commands = 0 self.num_pumps = 0 - self.pumpType = pumpType # Create GUI self.createGUI() @@ -172,12 +167,23 @@ def parseCommandXML(self): if not (self.num_pumps>0): print("Number of pumps not specified") + # Determine the pump_type + pump_type = None + # Load commands for pump_command in self.kilroy_configuration.findall("pump_commands"): + # Find type if provided + found_pump_type = pump_command.get("type", "peristaltic") + if pump_type is None: + pump_type = found_pump_type + if not pump_type == found_pump_type: + print("Found conflicting pump types in pump commands") + assert False + command_list = pump_command.findall("pump_cmd") for command in command_list: - if self.pumpType == 'peristaltic': - for pump_config in command.findall("pump_config"): + for pump_config in command.findall("pump_config"): + if pump_type == "peristaltic": speed = float(pump_config.get("speed")) direction = pump_config.get("direction") if speed < 0.00 or speed > 48.0: @@ -185,20 +191,23 @@ def parseCommandXML(self): direction = "Stopped" # Flag for stopped flow direction = {"Forward": "Forward", "Reverse": "Reverse"}.get(direction, "Stopped") - # Add command - self.commands.append([direction, speed]) - self.command_names.append(command.get("name")) - - elif self.pumpType == 'syringe': - for pump_config in command.findall("pump_config"): - position = int(pump_config.get('position')) - speed = int(pump_config.get('speed')) - # add direction - direction = pump_config.get("direction", "Input") - direction = {"Input": "Input", "Output": "Output", "Bypass":"Bypass"}.get(direction, "Input") - self.commands.append([position, speed, direction]) - self.command_names.append(command.get('name')) + # Add command + self.commands.append([direction, speed]) + self.command_names.append(command.get("name")) + elif pump_type == "syringe": + speed = float(pump_config.get("speed")) + volume = float(pump_config.get("volume")) + port_name = pump_config.get("port_name") + + # Add command + self.commands.append([port_name, speed, volume]) + self.command_names.append(command.get("name")) + + else: + print("Found an unsupported pump type in pump commands") + assert False + # Record number of configs self.num_commands = len(self.command_names) @@ -209,19 +218,20 @@ def printCommands(self): print("Current commands:") for command_ID in range(self.num_commands): print(self.command_names[command_ID]) - if self.pumpType == 'peristaltic': - direction = self.commands[command_ID][0] - speed = self.commands[command_ID][1] + local_command = self.commands[command_ID] + if len(local_command) == 2: + direction = local_command[0] + speed = local_command[1] text_string = " " + "Flow Direction: " + direction + "\n" text_string += " " + "Speed: " + str(speed) +"\n" print(text_string) - elif self.pumpType == 'syringe': - position = self.commands[command_ID][0] - speed = self.commands[command_ID][1] - direction = self.commands[command_ID][2] - text_string = " " + "Position: " + str(position) + "\n" + elif len(local_command) == 3: + port_name = local_command[0] + speed = local_command[1] + volume = local_command[2] + text_string = " " + "Port: " + port_name + "\n" text_string += " " + "Speed: " + str(speed) +"\n" - text_string += f" Direction: {direction}\n" + text_string += " " + "Volume: " + str(volume) +"\n" print(text_string) # ------------------------------------------------------------------------------------ @@ -254,13 +264,15 @@ def updateCommandDisplay(self): current_command = self.commands[current_ID] text_string = current_command_name + "\n" - if self.pumpType == 'peristaltic': + + if len(current_command) == 2: text_string += "Flow Direction: " + current_command[0] + "\n" text_string += "Flow Speed: " + str(current_command[1]) + "\n" - elif self.pumpType == 'syringe': - text_string += "Targe position: " + str(current_command[0]) + "\n" - text_string += "Speed: " + str(current_command[1]) + "\n" - text_string += "Direction: " + str(current_command[2]) + "\n" + elif len(current_command) == 3: + text_string = "Port: " + current_command[0] + "\n" + text_string += "Speed: " + str(current_command[1]) +"\n" + text_string += "Fill Volume: " + str(current_command[2]) +"\n" + self.currentCommandLabel.setText(text_string) # ------------------------------------------------------------------------------------ diff --git a/storm_control/fluidics/scope_settings/default_config.xml b/storm_control/fluidics/scope_settings/default_config.xml index 6a86d40..3f84ee0 100644 --- a/storm_control/fluidics/scope_settings/default_config.xml +++ b/storm_control/fluidics/scope_settings/default_config.xml @@ -1,62 +1,38 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - Flow Wash - Normal Flow - Stop Flow - - - Normal Flow - Set Hyb 1 - Flow Hybridization - Flow Wash - Flow STORM Buffer - Stop Flow - - - Normal Flow - Set Hyb 2 - Flow Hybridization - Flow Wash - Flow STORM Buffer - Stop Flow - - + + + + + + + + \ No newline at end of file diff --git a/storm_control/fluidics/valves/Note.txt b/storm_control/fluidics/valves/Note.txt new file mode 100644 index 0000000..e69de29 diff --git a/storm_control/fluidics/valves/hamilton.py b/storm_control/fluidics/valves/hamilton.py index 1eadb38..30e7302 100644 --- a/storm_control/fluidics/valves/hamilton.py +++ b/storm_control/fluidics/valves/hamilton.py @@ -14,6 +14,7 @@ # ---------------------------------------------------------------------------------------- import sys import time +import importlib from storm_control.fluidics.valves.valve import AbstractValve diff --git a/storm_control/fluidics/valves/hamilton_mvp.py b/storm_control/fluidics/valves/hamilton_mvp.py new file mode 100644 index 0000000..eec74cf --- /dev/null +++ b/storm_control/fluidics/valves/hamilton_mvp.py @@ -0,0 +1,459 @@ +#!/usr/bin/python +# ---------------------------------------------------------------------------------------- +# A basic class for the control of the new MVP valves from Hamilton: MVP4 +# ---------------------------------------------------------------------------------------- +# Jeff Moffitt +# 11/20/21 +# jeffrey.moffitt@childrens.harvard.edu +# +# ---------------------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------------------- +# Import +# ---------------------------------------------------------------------------------------- +import sys +import time +import importlib + +from storm_control.fluidics.valves.valve import AbstractValve + +# ---------------------------------------------------------------------------------------- +# HamiltonMVP Class Definition +# ---------------------------------------------------------------------------------------- +class AValveChain(AbstractValve): + def __init__(self, + parameters = parameters): + + # Define attributes + self.com_port = parameters + self.verbose = verbose + self.num_simulated_valves = num_simulated_valves + + # Determine simulation mode + self.simulate = (self.num_simulated_valves > 0) + + # Create serial port (if not in simulation mode) + if not self.simulate: + import serial + self.serial = serial.Serial(port = self.com_port, + baudrate = 9600, + bytesize = serial.SEVENBITS, + parity = serial.PARITY_ODD, + stopbits = serial.STOPBITS_ONE, + timeout = 0.1) + + # Define important serial characters + self.acknowledge = "\x06" + self.carriage_return = "\x13" + self.negative_acknowledge = "\x21" + self.read_length = 64 + self.char_offset = 97 # offset to convert int current_device + # to ascii addresses (0=a, 1=b, ...) + + # Define valve and port properties + self.max_valves = 16 # Maximum number of daisy chains + self.valve_names = [] + self.num_valves = 0 + self.valve_configs = [] + self.max_ports_per_valve = [] + self.current_port = [] + + # Configure device + self.autoAddress() + self.autoDetectValves() + + # ------------------------------------------------------------------------------------ + # Define Device Addresses: Must be First Command Issued + # ------------------------------------------------------------------------------------ + def autoAddress(self): + if not self.simulate: + auto_address_cmd = "1a\r" + if self.verbose: + print("Addressing Hamilton Valves") + x = self.write(auto_address_cmd) + response = self.read() # Clear buffer + else: + print("Simulating Hamilton MVP") + + # ------------------------------------------------------------------------------------ + # Auto Detect and Configure Valves: Devices are detected by acknowledgement of + # initialization command + # ------------------------------------------------------------------------------------ + def autoDetectValves(self): + if not self.simulate: + print("----------------------------------------------------------------------") + print("Opening the Hamilton MVP Valve Daisy Chain") + print(" " + "COM Port: " + str(self.com_port)) + for valve_ID in range(self.max_valves): # Loop over all possible valves + + # Generate address character (0=a, 1=b, ...) + device_address_character = chr(valve_ID + self.char_offset) + + if self.verbose: + print("Looking for device with address: " + str(valve_ID) + "=" + device_address_character) + + self.valve_names.append(device_address_character) # Save device characters + + # Send initialization command to valve: if it acknowledges, then it exists + found_valve = self.initializeValve(valve_ID) + if found_valve: + # Determine valve configuration + valve_config = self.howIsValveConfigured(valve_ID) + + if valve_config[1]: # Indicates successful response + self.valve_configs.append(valve_config) + self.max_ports_per_valve.append(self.numPortsPerConfiguration(valve_config)) + self.current_port.append(0) + + if self.verbose: + print("Found " + valve_config + " device at address " + str(valve_ID)) + else: + break + + self.num_valves = len(self.valve_configs) + + if self.num_valves == 0: + self.valve_names = "0" + print("Error: no valves discovered") + return False # Return failure + + # Display found valves + print("Found " + str(self.num_valves) + " Hamilton MVP Valves") + for valve_ID in range(self.num_valves): + print(" " + "Device " + self.valve_names[valve_ID] + " is configured with " + self.valve_configs[valve_ID]) + + print("Initializing valves...") + + # Wait for final device to stop moving + self.waitUntilNotMoving(self.num_valves-1) + + return True + + else: # Simulation code + for valve_ID in range(self.num_simulated_valves): + self.valve_configs.append(self.howIsValveConfigured(valve_ID)) + self.max_ports_per_valve.append(self.numPortsPerConfiguration(self.howIsValveConfigured(valve_ID))) + self.current_port.append(0) + self.num_valves = self.num_simulated_valves + print("Created " + str(self.num_simulated_valves) + " simulated Hamilton MVP valves") + return True + + # ------------------------------------------------------------------------------------ + # Change Port Position + # ------------------------------------------------------------------------------------ + def changePort(self, valve_ID, port_ID, direction = 0, wait_until_done = False): + # Check validity if valve and port IDs + if not self.isValidValve(valve_ID): + return False + if not self.isValidPort(valve_ID, port_ID): + return False + + if not self.simulate: + # Compose message and increment port_ID (starts at 1) + message = "LP" + str(direction) + str(port_ID+1) + "R\r" + + response = self.inquireAndRespond(valve_ID, message) + if response[0] == "Negative Acknowledge": + print("Move failed: " + str(response)) + + if response[1]: #Acknowledged move + self.current_port[valve_ID] = port_ID + + if wait_until_done: + self.waitUntilNotMoving() + + return response[1] + else: ## simulation code + self.current_port[valve_ID] = port_ID + return True + + # ------------------------------------------------------------------------------------ + # Close Serial Port + # ------------------------------------------------------------------------------------ + def close(self): + if not self.simulate: + self.serial.close() + if self.verbose: print("Closed hamilton valves") + else: ## simulation code + if self.verbose: print("Closed simulated hamilton valves") + + # ------------------------------------------------------------------------------------ + # Initialize Port Position of Given Valve + # ------------------------------------------------------------------------------------ + def initializeValve(self, valve_ID): + if not self.simulate: + response = self.inquireAndRespond(valve_ID, + message ="LXR\r", + dictionary = {}, + default = "") + if self.verbose: + if response[1]: print("Initialized Valve: " + str(valve_ID+1)) + else: print("Did not find valve: " + str(valve_ID+1)) + return response[1] + else: + return True + + # ------------------------------------------------------------------------------------ + # Basic I/O with Serial Port + # This function returns a response tuple used by this class + # (dictionary entry, affirmative response?, raw response string) + # ------------------------------------------------------------------------------------ + def inquireAndRespond(self, valve_ID, message, dictionary = {}, default = "Unknown"): + + # Check if the valve_ID valve is initialized + if not self.isValidValve(valve_ID): + return ("", False, "") + + # Prepend address of provided valve (0=a, 1=b, ...) + message = self.valve_names[valve_ID] + message + + # Write message and read response + self.write(message) + response = self.read() + + # Parse response into sent message and response + repeated_message = response[:(response.find(self.carriage_return)-1)] + actual_response = response[(response.find(self.carriage_return)-1): + (response.rfind(self.carriage_return))] + #actual_response = actual_response # remove carriage returns + + # Check for negative acknowledge + if actual_response == self.negative_acknowledge: + return ("Negative Acknowledge", False, response) + + # Check for acknowledge + if actual_response == self.acknowledge: + return ("Acknowledge", True, response) + + # Parse provided dictionary with response + return_value = dictionary.get(actual_response, default) + if return_value == default: + return (default, False, response) + else: + return (return_value, True, response) + + # ------------------------------------------------------------------------------------ + # Generate Default Port Names + # ------------------------------------------------------------------------------------ + def getDefaultPortNames(self, valve_ID): + if not self.isValidValve(valve_ID): + return ("") + default_names = [] + for port_ID in range(self.max_ports_per_valve[valve_ID]): + default_names.append("Port " + str(port_ID+1)) + return default_names + + # ------------------------------------------------------------------------------------ + # Generate Rotation Direction Labels + # ------------------------------------------------------------------------------------ + def getRotationDirections(self, valve_ID): + if not self.isValidValve(valve_ID): + return ("") + return ("Clockwise", "Counter Clockwise") + + # ------------------------------------------------------------------------------------ + # Get Valve Status + # ------------------------------------------------------------------------------------ + def getStatus(self, valve_ID): + return (self.whereIsValve(valve_ID), not self.isMovementFinished(valve_ID)) + + # ------------------------------------------------------------------------------------ + # Poll Valve Configuration + # ------------------------------------------------------------------------------------ + def howIsValveConfigured(self, valve_ID): + if not self.simulate: + response = self.inquireAndRespond(valve_ID, + message ="LQT\r", + dictionary = {"2": "8 ports", + "3": "6 ports", + "4": "3 ports", + "5": "2 ports @180", + "6": "2 ports @90", + "7": "4 ports"}, + default = "Unknown response") + return response[0] + else: ## simulation code + return "8 ports" + + # ------------------------------------------------------------------------------------ + # Determine number of active valves + # ------------------------------------------------------------------------------------ + def howManyValves(self): + return self.num_valves + + # ------------------------------------------------------------------------------------ + # Poll Movement of Valve + # ------------------------------------------------------------------------------------ + def isMovementFinished(self, valve_ID): + if not self.simulate: + response = self.inquireAndRespond(valve_ID, + message ="F\r", + dictionary = {"*": False, + "N": False, + "Y": True}, + default = "Unknown response") + return response[0] + else: ## simulation code + return ("Y", True, "Simulation") + + # ------------------------------------------------------------------------------------ + # Poll Overload Status of Valve + # ------------------------------------------------------------------------------------ + def isValveOverloaded(self, valve_ID): + if not self.simulate: + return self.inquireAndRespond(valve_ID, + message ="G\r", + dictionary = {"*": False, + "N": False, + "Y": True}, + default = "Unknown response") + else: ## simulation code + return ("N", False, "Simulation") + + # ------------------------------------------------------------------------------------ + # Check if Port is Valid + # ------------------------------------------------------------------------------------ + def isValidPort(self, valve_ID, port_ID): + if not self.isValidValve(valve_ID): + return False + elif not (port_ID < self.max_ports_per_valve[valve_ID]): + if self.verbose: + print(str(port_ID) + " is not a valid port on valve " + str(valve_ID)) + return False + else: + return True + + # ------------------------------------------------------------------------------------ + # Check if Valve is Valid + # ------------------------------------------------------------------------------------ + def isValidValve(self, valve_ID): + if not (valve_ID < self.max_valves): + if self.verbose: + print(str(valve_ID) + " is not a valid valve") + return False + else: + return True + + # ------------------------------------------------------------------------------------ + # Convert Port Configuration String to Number of Ports + # ------------------------------------------------------------------------------------ + def numPortsPerConfiguration(self, configuration_string): + return {"8 ports": 8, + "6 ports": 6, + "3 ports": 3, + "2 ports @180": 2, + "2 ports @90": 2, + "4 ports": 4}.get(configuration_string, 0) + + # ------------------------------------------------------------------------------------ + # Read from Serial Port + # ------------------------------------------------------------------------------------ + def read(self): + response = self.serial.read(self.read_length).decode() + if self.verbose: + print("Received: " + str((response, ""))) + return response + + # ------------------------------------------------------------------------------------ + # Reset Chain: Readdress and redetect valves + # ------------------------------------------------------------------------------------ + def resetChain(self): + # Reset device configuration + self.valve_names = [] + self.num_valves = 0 + self.valve_configs = [] + self.max_ports_per_valve = [] + + # Configure Device + self.autoAddress() + self.autoDetectValves() + + # ------------------------------------------------------------------------------------ + # Halt Hamilton Class Until Movement is Finished + # ------------------------------------------------------------------------------------ + def waitUntilNotMoving(self, valve_ID, pause_time = 1): + doneMoving = False + while not doneMoving: + doneMoving = self.isMovementFinished(valve_ID) + time.sleep(pause_time) + + # ------------------------------------------------------------------------------------ + # Poll Valve Configuration + # ------------------------------------------------------------------------------------ + def whatIsValveConfiguration(self, valve_ID): + if not self.isValidValve(valve_ID): + return "" + else: + return self.valve_configs[valve_ID] + + # ------------------------------------------------------------------------------------ + # Poll Valve Location + # ------------------------------------------------------------------------------------ + def whereIsValve(self, valve_ID): + if not self.simulate: + response = self.inquireAndRespond(valve_ID, + message ="LQP\r", + dictionary = {"1": "Port 1", + "2": "Port 2", + "3": "Port 3", + "4": "Port 4", + "5": "Port 5", + "6": "Port 6", + "7": "Port 7", + "8": "Port 8"}, + default = "Unknown Port") + return response[0] + else: ## simulation code + return {"1": "Port 1", + "2": "Port 2", + "3": "Port 3", + "4": "Port 4", + "5": "Port 5", + "6": "Port 6", + "7": "Port 7", + "8": "Port 8"}.get(str(self.current_port[valve_ID]+1)) + + # ------------------------------------------------------------------------------------ + # Write to Serial Port + # ------------------------------------------------------------------------------------ + def write(self, message): + self.serial.write(message.encode()) + if self.verbose: + print("Wrote: " + message[:-1]) # Display all but final carriage return + +# ---------------------------------------------------------------------------------------- +# Test/Demo of Classs +# ---------------------------------------------------------------------------------------- +if (__name__ == '__main__'): + hamilton = AValveChain(verbose = True) + + for valve_ID in range(hamilton.howManyValves()): + text = "Valve " + str(valve_ID+1) + text = " is configured with " + hamilton.howIsValveConfigured(valve_ID) + + hamilton.close() + +# +# The MIT License +# +# Copyright (c) 2013 Zhuang Lab, Harvard University +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + diff --git a/storm_control/fluidics/valves/hamilton_mvp4.py b/storm_control/fluidics/valves/hamilton_mvp4.py new file mode 100644 index 0000000..5df2ad8 --- /dev/null +++ b/storm_control/fluidics/valves/hamilton_mvp4.py @@ -0,0 +1,386 @@ +#!/usr/bin/python +# ---------------------------------------------------------------------------------------- +# A basic class for the control of the new MVP valves from Hamilton: MVP4 +# ---------------------------------------------------------------------------------------- +# Jeff Moffitt +# 11/20/21 +# jeffrey.moffitt@childrens.harvard.edu +# +# ---------------------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------------------- +# Import +# ---------------------------------------------------------------------------------------- +import sys +import time +import importlib +import serial + +from storm_control.fluidics.valves.valve import AbstractValve + +# ---------------------------------------------------------------------------------------- +# HamiltonMVP Class Definition +# ---------------------------------------------------------------------------------------- +class AValveChain(AbstractValve): + def __init__(self, + parameters = None): + + # Define attributes + self.com_port = parameters.get("valves_com_port") + self.verbose = parameters.get("verbose", True) + + + # Create serial port + self.serial = serial.Serial(port = self.com_port, + baudrate = 9600, + timeout = 0.25) + + # Define valve and port properties + self.max_valves = 16 # Maximum number of daisy chains + self.valve_names = [] + self.num_valves = 0 + self.valve_configs = [] + self.max_ports_per_valve = [] + self.current_port = [] + + # Configure device + self.autoDetectValves() + + # ------------------------------------------------------------------------------------ + # Auto Detect and Configure Valves: Devices are detected by acknowledgement of + # initialization command + # ------------------------------------------------------------------------------------ + def autoDetectValves(self): + print("----------------------------------------------------------------------") + print("Opening the Hamilton MVP4 Valve Daisy Chain") + print(" " + "COM Port: " + str(self.com_port)) + for valve_ID in range(self.max_valves): # Loop over all possible valves + + # Generate valve ID + valve_name = str(valve_ID + 1) + + if self.verbose: + print("Looking for device " + valve_name) + + self.valve_names.append(valve_name) # Save device characters + + # Send initialization command to valve: if it acknowledges, then it exists + found_valve = self.initializeValve(valve_ID) + if found_valve: + # Determine valve configuration + valve_config = self.howIsValveConfigured(valve_ID) + + if valve_config[1]: # Indicates successful response + self.valve_configs.append(valve_config) + self.max_ports_per_valve.append(self.numPortsPerConfiguration(valve_config)) + self.current_port.append(0) + + if self.verbose: + print("Found " + valve_config + " device at address " + str(valve_ID)) + else: + break + + self.num_valves = len(self.valve_configs) + + if self.num_valves == 0: + self.valve_names = "0" + print("Error: no valves discovered") + return False # Return failure + + # Display found valves + print("Found " + str(self.num_valves) + " Hamilton MVP4 Valves") + for valve_ID in range(self.num_valves): + print(" " + "Device " + self.valve_names[valve_ID] + " is configured with " + self.valve_configs[valve_ID]) + + print("Initializing valves...") + + # Wait for final device to stop moving + self.waitUntilNotMoving(self.num_valves-1) + + return True + + # ------------------------------------------------------------------------------------ + # Change Port Position + # ------------------------------------------------------------------------------------ + def changePort(self, valve_ID, port_ID, direction = 0, wait_until_done = False): + # Check validity if valve and port IDs + if not self.isValidValve(valve_ID): + return False + if not self.isValidPort(valve_ID, port_ID): + return False + + if direction == 0: + message = "h2400" + str(port_ID+1) + "R\r" + else: + message = "h2500" + str(port_ID+1) + "R\r" + + response = self.inquireAndRespond(valve_ID, message) + + self.current_port[valve_ID] = port_ID + + if wait_until_done: + self.waitUntilNotMoving() + + return True + + # ------------------------------------------------------------------------------------ + # Close Serial Port + # ------------------------------------------------------------------------------------ + def close(self): + self.serial.close() + if self.verbose: print("Closed hamilton valves") + + # ------------------------------------------------------------------------------------ + # Initialize Port Position of Given Valve + # ------------------------------------------------------------------------------------ + def initializeValve(self, valve_ID): + response = self.inquireAndRespond(valve_ID, + message ="ZR\r", + dictionary = {}, + default = None) + if self.verbose: + if response[1]: print("Initialized Valve: " + str(valve_ID+1)) + else: print("Did not find valve: " + str(valve_ID+1)) + return response[1] + + # ------------------------------------------------------------------------------------ + # Basic I/O with Serial Port + # This function returns a response tuple used by this class + # (dictionary entry, affirmative response?, raw response string) + # ------------------------------------------------------------------------------------ + def inquireAndRespond(self, valve_ID, message, dictionary = {}, default = "Unknown"): + + # Check if the valve_ID valve is initialized + if not self.isValidValve(valve_ID): + return ("", False, "") + + # Prepend address of provided valve (0=a, 1=b, ...) + message = "/" + self.valve_names[valve_ID] + message + + # Write message and read response + self.write(message) + actual_response = self.read() + + # Handle no response + if len(actual_response) < 2: + return (default, False, actual_response) + + # Handle status value + status = chr(actual_response[2]) + + # Extract data values, if provided + data_start = 3 + data_end = actual_response.find('\x03'.encode()) + + if data_start == data_end: + return (default, True, actual_response) + else: + data = actual_response[data_start:data_end].decode() + # Parse provided dictionary with data + return_value = dictionary.get(data, default) + + if return_value == default: + return (default, False, actual_response) + else: + return (return_value, True, actual_response) + + # ------------------------------------------------------------------------------------ + # Generate Default Port Names + # ------------------------------------------------------------------------------------ + def getDefaultPortNames(self, valve_ID): + if not self.isValidValve(valve_ID): + return ("") + default_names = [] + for port_ID in range(self.max_ports_per_valve[valve_ID]): + default_names.append("Port " + str(port_ID+1)) + return default_names + + # ------------------------------------------------------------------------------------ + # Generate Rotation Direction Labels + # ------------------------------------------------------------------------------------ + def getRotationDirections(self, valve_ID): + if not self.isValidValve(valve_ID): + return ("") + return ("Clockwise", "Counter Clockwise") + + # ------------------------------------------------------------------------------------ + # Get Valve Status + # ------------------------------------------------------------------------------------ + def getStatus(self, valve_ID): + return (self.whereIsValve(valve_ID), not self.isMovementFinished(valve_ID)) + + # ------------------------------------------------------------------------------------ + # Poll Valve Configuration + # ------------------------------------------------------------------------------------ + def howIsValveConfigured(self, valve_ID): + response = self.inquireAndRespond(valve_ID, + message ="?21000\r", + dictionary = {"3": "8 ports", + }, + default = "Unknown response") + return response[0] + + # ------------------------------------------------------------------------------------ + # Determine number of active valves + # ------------------------------------------------------------------------------------ + def howManyValves(self): + return self.num_valves + + # ------------------------------------------------------------------------------------ + # Poll Movement of Valve + # ------------------------------------------------------------------------------------ + def isMovementFinished(self, valve_ID): + # Create moving message + message = "/" + self.valve_names[valve_ID] + "Q\r" + + # Write message and read response + self.write(message) + actual_response = self.read() + + # Handle status value + status = chr(actual_response[2]) + if status == "@": + return False + else: + return True + + # ------------------------------------------------------------------------------------ + # Check if Port is Valid + # ------------------------------------------------------------------------------------ + def isValidPort(self, valve_ID, port_ID): + if not self.isValidValve(valve_ID): + return False + elif not (port_ID < self.max_ports_per_valve[valve_ID]): + if self.verbose: + print(str(port_ID) + " is not a valid port on valve " + str(valve_ID)) + return False + else: + return True + + # ------------------------------------------------------------------------------------ + # Check if Valve is Valid + # ------------------------------------------------------------------------------------ + def isValidValve(self, valve_ID): + if not (valve_ID < self.max_valves): + if self.verbose: + print(str(valve_ID) + " is not a valid valve") + return False + else: + return True + + # ------------------------------------------------------------------------------------ + # Convert Port Configuration String to Number of Ports + # ------------------------------------------------------------------------------------ + def numPortsPerConfiguration(self, configuration_string): + return {"8 ports": 8, + "6 ports": 6, + "3 ports": 3, + "2 ports @180": 2, + "2 ports @90": 2, + "4 ports": 4}.get(configuration_string, 0) + + # ------------------------------------------------------------------------------------ + # Read from Serial Port + # ------------------------------------------------------------------------------------ + def read(self): + # response = self.serial.readline().decode() + response = self.serial.readline() + + if self.verbose: + print("Received: " + str((response, ""))) + return response + + # ------------------------------------------------------------------------------------ + # Reset Chain: Readdress and redetect valves + # ------------------------------------------------------------------------------------ + def resetChain(self): + # Reset device configuration + self.valve_names = [] + self.num_valves = 0 + self.valve_configs = [] + self.max_ports_per_valve = [] + + # Configure Device + self.autoDetectValves() + + # ------------------------------------------------------------------------------------ + # Halt Hamilton Class Until Movement is Finished + # ------------------------------------------------------------------------------------ + def waitUntilNotMoving(self, valve_ID, pause_time = 1): + doneMoving = False + while not doneMoving: + if self.isMovementFinished(valve_ID): + doneMoving = True + else: + time.sleep(pause_time) + + # ------------------------------------------------------------------------------------ + # Poll Valve Configuration + # ------------------------------------------------------------------------------------ + def whatIsValveConfiguration(self, valve_ID): + if not self.isValidValve(valve_ID): + return "" + else: + return self.valve_configs[valve_ID] + + # ------------------------------------------------------------------------------------ + # Poll Valve Location + # ------------------------------------------------------------------------------------ + def whereIsValve(self, valve_ID): + response = self.inquireAndRespond(valve_ID, + message ="?24000\r", + dictionary = {"1": "Port 1", + "2": "Port 2", + "3": "Port 3", + "4": "Port 4", + "5": "Port 5", + "6": "Port 6", + "7": "Port 7", + "8": "Port 8"}, + default = "Unknown Port") + return response[0] + + # ------------------------------------------------------------------------------------ + # Write to Serial Port + # ------------------------------------------------------------------------------------ + def write(self, message): + self.serial.write(message.encode()) + if self.verbose: + print("Wrote: " + message[:-1]) # Display all but final carriage return + +# ---------------------------------------------------------------------------------------- +# Test/Demo of Classs +# ---------------------------------------------------------------------------------------- +if (__name__ == '__main__'): + hamilton = APump(verbose = True) + + for valve_ID in range(hamilton.howManyValves()): + text = "Valve " + str(valve_ID+1) + text = " is configured with " + hamilton.howIsValveConfigured(valve_ID) + + hamilton.close() + +# +# The MIT License +# +# Copyright (c) 2013 Zhuang Lab, Harvard University +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + diff --git a/storm_control/fluidics/valves/valveChain.py b/storm_control/fluidics/valves/valveChain.py index 1594de7..b426328 100644 --- a/storm_control/fluidics/valves/valveChain.py +++ b/storm_control/fluidics/valves/valveChain.py @@ -13,6 +13,7 @@ # Import # ---------------------------------------------------------------------------------------- import sys +import importlib from PyQt5 import QtCore, QtGui, QtWidgets from storm_control.fluidics.valves.qtValveControl import QtValveControl from storm_control.fluidics.valves.hamilton import HamiltonMVP @@ -24,35 +25,40 @@ class ValveChain(QtWidgets.QWidget): def __init__(self, parent = None, - com_port = "COM2", - num_simulated_valves = 0, - valve_type = 'Hamilton', - verbose = False + parameters = False, ): # Initialize parent class QtWidgets.QWidget.__init__(self, parent) - + # Define local attributes - self.com_port = com_port - self.verbose = verbose + self.com_port = parameters.get("valves_com_port", None) + self.verbose = parameters.get("verbose", False) self.poll_time = 2000 - - # Create instance of Hamilton class - print(valve_type) - if valve_type == 'Simulated': + + # Simulate the valves? + num_simulated_valves = parameters.get("num_simulated_valves", 0) + + if num_simulated_valves > 0: self.valve_chain = HamiltonMVP(com_port = 0, num_simulated_valves = num_simulated_valves, verbose = self.verbose) - - elif valve_type == 'Hamilton': - self.valve_chain = HamiltonMVP(com_port = self.com_port, - verbose = self.verbose) - - elif valve_type == 'Titan': - self.valve_chain = TitanValve(com_port = self.com_port, - verbose = self.verbose) - + else: # Create the valve + # Backwards compatibility in valve_type + valve_type = parameters.get("valve_type", "None") + + if valve_type == 'Hamilton': + self.valve_chain = HamiltonMVP(com_port = self.com_port, + verbose = self.verbose) + elif valve_type == 'Titan': + self.valve_chain = TitanValve(com_port = self.com_port, + verbose = self.verbose) + + # Create the valve chain using newer loading protocols + else: + valve_module = importlib.import_module(parameters.get("valve_class", "storm_control.fluidics.valves.hamilton_mvp")) + self.valve_chain = valve_module.AValveChain(parameters = parameters) + # Create QtValveControl widgets for each valve in the chain self.num_valves = self.valve_chain.howManyValves() self.valve_names = []