-
Notifications
You must be signed in to change notification settings - Fork 0
/
sensors.py
79 lines (55 loc) · 1.83 KB
/
sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from phidias.Types import *
import threading
import time
class TIMEOUT(Reactor): pass
class STT(Reactor): pass
class HotwordDetect(Sensor):
def on_start(self):
self.running = True
print("\nStarting Hotword detection...")
# put instantiation hotword code here
def on_stop(self):
print("\nStopping Hotword detection...")
self.running = False
def sense(self):
while self.running is True:
time.sleep(1)
# --------------> put hotword detection code here <---------------
# when right hotword is detected: self.assert_belief(HOTWORD_DETECTED("ON"))
class UtteranceDetect(Sensor):
def on_start(self):
self.running = True
print("\nStarting utterance detection...")
# instantiate hotword engine here
def on_stop(self):
print("\nStopping utterance detection...")
self.running = False
def sense(self):
while self.running:
time.sleep(1)
# --------------> put utterance detection code here <---------------
# when incoming new utterance detected: self.assert_belief(STT(utterance))
class Timer(Sensor):
def on_start(self, uTimeout):
evt = threading.Event()
self.event = evt
self.timeout = uTimeout()
self.do_restart = False
def on_restart(self, uTimeout):
self.do_restart = True
self.event.set()
def on_stop(self):
self.do_restart = False
self.event.set()
def sense(self):
while True:
self.event.wait(self.timeout)
self.event.clear()
if self.do_restart:
self.do_restart = False
continue
if self.stopped:
return
else:
self.assert_belief(TIMEOUT("ON"))
return