forked from mxcube/HardwareObjects
-
Notifications
You must be signed in to change notification settings - Fork 0
/
PhotonFlux.py
91 lines (80 loc) · 3.28 KB
/
PhotonFlux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from HardwareRepository.BaseHardwareObjects import Equipment
import numpy
import logging
class PhotonFlux(Equipment):
def __init__(self, *args, **kwargs):
Equipment.__init__(self, *args, **kwargs)
def init(self):
self.read_counts_chan = self.getChannelObject("counts")
self.gain = 1e6
self.calibration_chan = self.getChannelObject("calibration")
try:
self.aperture = self.getObjectByRole("aperture")
except:
pass
self.energy_motor = self.getDeviceByRole("energy")
self.shutter = self.getDeviceByRole("shutter")
self.counter = self.getProperty("cnt")
if self.counter == "i0":
self.index = self.getProperty("index")
if self.index is None:
self.index = 1
else:
self.index = 1
self.read_counts_chan.connectSignal("update", self.countsUpdated)
self.getChannelObject("flag").connectSignal("update", self.updateFlux)
self.shutter.connect("shutterStateChanged", self.shutterStateChanged)
def connectNotify(self, signal):
if signal == "valueChanged":
self.emitValueChanged()
def shutterStateChanged(self, _):
self.countsUpdated(self.read_counts_chan.getValue())
def updateFlux(self, _):
self.countsUpdated(self.read_counts_chan.getValue(), ignore_shutter_state=True)
def countsUpdated(self, counts, ignore_shutter_state=False):
if not ignore_shutter_state and self.shutter.getShutterState()!="opened":
self.emitValueChanged(0)
return
try:
counts = counts[self.index]*self.gain
except TypeError:
logging.getLogger("HWR").error("%s: counts is None", self.name())
return
flux = None
try:
egy = self.energy_motor.getPosition()*1000.0
except:
logging.getLogger("HWR").exception("%s: could not get energy", self.name())
else:
try:
calib_dict = self.calibration_chan.getValue()
if calib_dict is None:
logging.getLogger("HWR").error("%s: calibration is None", self.name())
else:
calibs = [(float(c["energy"]), float(c[self.counter])) for c in calib_dict.values()]
calibs.sort()
print(c)
E = [c[0] for c in calibs]
C = [c[1] for c in calibs]
except:
logging.getLogger("HWR").exception("%s: could not get calibration", self.name())
else:
try:
aperture_coef = self.aperture.getApertureCoef()
except:
aperture_coef = 1
if aperture_coef <= 0:
aperture_coef = 1
calib = numpy.interp([egy], E, C)
flux = counts * calib * aperture_coef
#logging.getLogger("HWR").debug("%s: flux-> %f * %f=%f , calib_dict=%r", self.name(), counts, calib, counts*calib, calib_dict)
self.emitValueChanged("%1.3g" % flux)
def getCurrentFlux(self):
return self.current_flux
def emitValueChanged(self, counts=None):
if counts is None:
self.current_flux = None
self.emit("valueChanged", ("?", ))
else:
self.current_flux = float(counts)
self.emit("valueChanged", (counts, ))