-
Notifications
You must be signed in to change notification settings - Fork 1
/
MyoManager.py
150 lines (124 loc) · 5.91 KB
/
MyoManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import myo
from myo import *
import time
from PyQt5.QtCore import QThread, pyqtSignal, QTimer
import threading
import LogWindow as lw
class Listener(myo.DeviceListener):
manager = None
clock_emg = 0
clock_orientation = 0
def __init__(self, m):
super().__init__();
self.manager = m;
def on_connected(self, event:Event):
self.manager.connecting = False
self.manager.connected = True
event.device.stream_emg(True)
event.device.request_battery_level()
event.device.request_rssi()
lw.LogWindow.addToLog("Connected to " + repr(event.device_name) + " with mac address: " + repr(event.mac_address))
event.device.vibrate(myo.VibrationType.short)
self.manager.signals.emit({"type": event.type, "data": {"name": event.device_name, "mac_address": event.mac_address, "firmware_version": event.firmware_version}, "timestamp" : time.clock()})
def on_disconnected(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {"name": event.device_name, "mac_address": event.mac_address, "firmware_version": event.firmware_version}, "timestamp" : time.clock()})
self.manager.connected = False
def on_paired(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {"name": event.device_name, "mac_address": event.mac_address, "firmware_version": event.firmware_version}, "timestamp" : time.clock()})
def on_unpaired(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {"name": event.device_name, "mac_address": event.mac_address, "firmware_version": event.firmware_version}, "timestamp" : time.clock()})
return False
def on_orientation(self, event:Event):
if not self.manager.refresh_rate:
self.manager.signals.emit({"type": event.type, "data": {"gyroscope": event.gyroscope, "acceleration": event.acceleration, "orientation": event.orientation}, "timestamp" : time.clock()})
else:
t = time.clock()
if (not self.clock_orientation) or (t - self.clock_orientation) >= (self.manager.refresh_rate / 10000):
self.clock_orientation = t
self.manager.signals.emit({"type" : event.type, "data" : {"gyroscope" : event.gyroscope, "acceleration" : event.acceleration, "orientation" : event.orientation}, "timestamp" : time.clock()})
def on_emg(self, event:Event):
if not self.manager.refresh_rate:
self.manager.signals.emit({"type": event.type, "data": {"emg": event.emg}, "timestamp" : time.clock()})
else:
t = time.clock()
if (not self.clock_emg) or (t - self.clock_emg) >= (self.manager.refresh_rate / 10000):
self.clock_emg = t
self.manager.signals.emit({"type": event.type, "data": {"emg": event.emg}, "timestamp" : time.clock()})
def on_event(self, event):
super().on_event(event)
if self.manager.battery:
self.manager.battery = False
event.device.request_battery_level()
if self.manager.rssi:
self.manager.rssi = False
event.device.request_rssi()
if self.manager.lock:
self.manager.lock = False
event.device.lock()
if self.manager.unlock:
self.manager.unlock = False
event.device.unlock()
def on_battery_level(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {"battery" : event.battery_level}, "timestamp" : time.clock()})
def on_pose(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {"pose": event.pose}, "timestamp" : time.clock()})
def on_rssi(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {"rssi": event.rssi}, "timestamp" : time.clock()})
def on_locked(self, event:Event):
self.manager.signals.emit({"type": event.type, "data": {}, "timestamp" : time.clock()})
def on_unlocked(self, event):
self.manager.signals.emit({"type": event.type, "data": {}, "timestamp" : time.clock()})
class MyoManager(QThread):
signals = pyqtSignal(dict)
interval = pyqtSignal(int)
connecting = False
connected = False
timer = None
stop = False
rssi = False
battery = False
lock = False
unlock = False
@property
def refresh_rate(self):
return self._refresh_rate
@refresh_rate.setter
def refresh_rate(self, value):
self.interval.emit(value)
self._refresh_rate = value
def __init__(self, callback):
super().__init__()
self.refresh_rate = 300
self.signals.connect(callback)
myo.init() #sdk_path="/Developer/sdk"
def timed_out(self):
if (not self.connected) and self.connecting:
lw.LogWindow.addToLog("Connection timed out!")
self.disconnect()
def connect(self):
if not self.connected and not self.connecting:
self.connecting = True
self.stop = False
QTimer.singleShot(5000, self.timed_out) # Let's wait for 5 seconds
lw.LogWindow.addToLog("Trying to connect to Myo (connection will timeout in 5 seconds)")
self.start()
def run(self):
try:
listener = Listener(self)
hub = myo.Hub("com.twins.emg")
while hub.run(listener.on_event, self.refresh_rate):
if self.stop:
self.stop = False
break
except:
self.connecting = False
lw.LogWindow.addToLog("An error has occured!")
lw.LogWindow.addToLog("********END********")
def disconnect(self):
if self.connected:
#hub.shutdown()
lw.LogWindow.addToLog("Disconnected!")
self.signals.emit({"type": EventType.disconnected, "data": {}, "timestamp" : time.clock()})
self.connecting = False
self.connected = False
self.stop = True