Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discover controllers by serial number #19

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Thorpy
# Thorpy

Python library implementing Thorlabs APT communication protocol

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
setup(name="thorpy",
version="0.0.0",
description="",
packages = ['thorpy', 'thorpy.comm', 'thorpy.message', 'thorpy.stages'],
packages = ['thorpy', 'thorpy.comm', 'thorpy.message', 'thorpy.stages', 'thorpy.flipmount'],
package_data = {'thorpy.stages': ['*.ini']},
include_package_data = True,
zip_safe = True,
Expand Down
33 changes: 19 additions & 14 deletions thorpy/comm/discovery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def discover_stages():
def discover_stages(serial_number):
import usb
import os
from .port import Port
Expand All @@ -9,28 +9,33 @@ def discover_stages():

for dev in usb.core.find(find_all=True, custom_match= lambda x: x.bDeviceClass != 9):
try:

#FIXME: this avoids an error related to https://github.com/walac/pyusb/issues/139
#FIXME: this could maybe be solved in a better way?
dev._langids = (1033, )
# KDC101 3-port is recognized as FTDI in newer kernels
if not (dev.manufacturer == 'Thorlabs' or dev.manufacturer == 'FTDI'):
#check if device is a thorlabs product excluding the thorlabs powermeter
if not (dev.manufacturer == 'Thorlabs' or dev.manufacturer == 'FTDI') or ('pm' in dev.product.lower()):
continue
except usb.core.USBError:
continue
if str(dev.serial_number) == serial_number:
if platform.system() == 'Linux':
port_candidates = [x[0] for x in serial_ports if x[2].get('SER', None) == dev.serial_number]
else:
raise NotImplementedError("Implement for platform.system()=={0}".format(platform.system()))

if platform.system() == 'Linux':
port_candidates = [x[0] for x in serial_ports if x[2].get('SER', None) == dev.serial_number]
else:
raise NotImplementedError("Implement for platform.system()=={0}".format(platform.system()))

assert len(port_candidates) == 1

port = port_candidates[0]
assert len(port_candidates) == 1

port = port_candidates[0]

p = Port.create(port, dev.serial_number)
for stage in p.get_stages().values():
return stage

raise NotImplementedError("Device not found with serial number: " + serial_number)
return 0

p = Port.create(port, dev.serial_number)
for stage in p.get_stages().values():
yield stage

if __name__ == '__main__':
print(list(discover_stages()))

Expand Down
6 changes: 4 additions & 2 deletions thorpy/comm/port.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def __init__(self, port, sn):
except: # TODO: Be more specific on what we catch here
self._buffer = b''
self._serial.flushInput()

# print(self._info_message['serial_number'])
# self._info_message.__serial_number = sn
# print(self._info_message)
self._serial_number = int(sn)
if self._serial_number is None:
self._serial_number = self._info_message['serial_number']
Expand Down Expand Up @@ -244,7 +246,7 @@ def get_stages(self, only_chan_idents = None):
ret = dict([(k, self._stages.get(k, None)) for k in only_chan_idents])
for k in only_chan_idents:
if ret[k] is None:
ret[k] = GenericStage(self, 0x01, stage_name_from_get_hw_info(self._info_message))
ret[k] = GenericStage(self, 0x01, stage_name_from_get_hw_info(self._info_message, self._serial_number))
self._stages[k] = ret[k]

return ret
30 changes: 30 additions & 0 deletions thorpy/flipmount/flipmount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import thorpy.comm.discovery as thc

class flipMount:

def __init__(self, sn):
self.__stage = thc.discover_stages(sn)

def identify(self):
self.__stage.identify()

def open(self):
self.__stage.move_jog_forward()

def close(self):
self.__stage.move_jog_backward()

def is_open(self):
return self.__stage.status_forward_hardware_limit_switch_active

def is_close(self):
return self.__stage.status_reverse_hardware_limit_switch_active

def is_moving(self):
return self.__stage.status_in_motion_forward

def flip(self):
if self.is_open():
self.close()
else:
self.open()
23 changes: 19 additions & 4 deletions thorpy/stages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ def _print_stage_detection_improve_message(m):
'- stage type\n' + \
'- this data: {0}'.format(m), file = sys.stderr)

def stage_name_from_get_hw_info(m):
controller_type = m['serial_number'] // 1000000 #v7
def stage_name_from_get_hw_info(m, sn=None):
if sn is None:
controller_type = m['serial_number'] // 1000000 #v7
else:
controller_type = int(sn) // 1000000 #v7

stage_type = m['empty_space'][-2] #Reverse engineered
hw_version = m['hw_version']
model_number = m['model_number'].decode('ascii').strip('\x00')
Expand Down Expand Up @@ -164,7 +168,7 @@ def __init__(self, port, chan_ident, ini_section):

self._port.send_message(MGMSG_MOD_SET_CHANENABLESTATE(chan_ident = self._chan_ident, chan_enable_state = 0x01))

print("Constructed: {0!r}".format(self))
print("Test: "+"Constructed: {0!r}".format(self))

#STATUSUPDATE
self._state_position = None
Expand Down Expand Up @@ -474,7 +478,18 @@ def _wait_for_properties(self, properties, timeout = None, message = None, messa

def __repr__(self):
return '<{0} on {1!r} channel {2}>'.format(self._name, self._port, self._chan_ident)



def identify(self):
self._port.send_message(MGMSG_MOD_IDENTIFY())

def move_jog_forward(self):
self._port.send_message(MGMSG_MOT_MOVE_JOG(chan_ident = self._chan_ident, direction = 0x01))

def move_jog_backward(self):
self._port.send_message(MGMSG_MOT_MOVE_JOG(chan_ident = self._chan_ident, direction = 0x02))



#Message which should maybe be implemented?
#Should be in port: MGMSG_HUB_REQ_BAYUSED, MGMSG_HUB_GET_BAYUSED,
Expand Down