-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_fifo.py
144 lines (114 loc) · 4.18 KB
/
run_fifo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# save function call in a blocking queue
# the called function is responsible to unblock the queue when it is done.
import time
from datetime import datetime
import time
start_time = round(time.time() * 1000)
milliseconds = lambda: round(time.time() * 1000) - start_time
class RunFifo:
def __init__(self):
global app_fifo
app_fifo = self
self.fifo = []
self.flag = 'run'
self.final = None
self.verbose = False
self.blocking_time = milliseconds()
def __str__(self):
s = self.final[0] if self.final else "'none'"
return f"RunFifo(flag:'{self.flag}', length:{len(self.fifo)}, final:{s}"
@staticmethod
def unpack(foo):
# foo may be a function or a tuple(function, p, kw) of length 1, 2, or 3
p, kw = tuple(), {} # default values in case they are missing
foo = foo if type(foo) in (tuple, list) else (foo,)
n = len(foo)
foo, p, kw = foo if n == 3 else (*foo, kw) if n == 2 else (*foo, p, kw)
return foo, p, kw
def append(self, foo, *p, **kw):
n = len(self.fifo)
if n > 2:
return
self.fifo.append((foo, p, kw))
if self.verbose: print(f'fifo.append [{len(self.fifo)}]:', foo.__name__, p, kw)
self._run_if_free()
def _run_if_free(self):
if self.flag == 'run' and self.fifo:
self.flag = 'block'
foo = self.fifo.pop(0)
foo, p, kw = RunFifo.unpack(foo)
if self.verbose: print(f'fifo.run [{len(self.fifo)}]:', foo.__name__, p, kw)
self.blocking_time = milliseconds() + 500
foo(*p, **kw)
def run(self):
self.flag = 'run'
self.blocking_time = milliseconds()
self._run_if_free()
def block(self, ms=None):
self.flag = 'block'
if ms:
self.keep_blocking(ms)
def keep_blocking(self, ms):
if self.verbose: print('+', end='')
self.blocking_time = milliseconds() + ms
def is_blocked(self):
return self.flag == 'block' and milliseconds() < self.blocking_time
def set_completion_routine(self, final, *p, **kw):
# called from within a function when it is run
if not self.final:
pass
#return
#raise Exception("RunFifo: only one completion routine may be defined")
self.final = (final, p, kw)
def complete_if_timed_out(self):
# runs if there is no self.final routine or if time-out
if milliseconds() > self.blocking_time:
if self.verbose: print('fifo is past blocking_time')
self.complete()
else:
if False and self.verbose:
print('.', end='')
time.sleep(0.5)
def complete(self, **kwc):
# kwc - special keyword argument used to update kw
if self.final:
final, p, kw = RunFifo.unpack(self.final)
if self.verbose: print('fifo.complete: run', final.__name__, p, kw)
self.final = None
kw.update(kwc)
final(*p, **kw)
self.final = None
if self.verbose:
print('fifo.complete')
self.blocking_time = milliseconds()
self.flag = 'run'
# app_fifo has to be set by app if multiple fifos in use
app_fifo = None # reference to app's fifo
def fifo_run_queue(foo):
# A decorator function (@)
def inner(*p, **kw):
app_fifo.append(foo, *p, **kw)
return inner
if __name__ == "__main__":
fifo = RunFifo()
fifo.verbose = True
def complete_this(msg):
print('complete_this:', msg)
@fifo_run_queue
def do_this(*p, **kw):
fifo.set_completion_routine(complete_this, 'Hello, this is done')
print('do_this:', p, kw)
@fifo_run_queue
def do_other(*p, **kw):
print('do_other:', p, kw)
fifo.block()
do_this(1, 2, 3)
do_other('that', 'other', height=800)
print('before call to fifo.complete')
fifo.run()
fifo.block(2000)
while fifo.is_blocked():
pass #fifo.complete_if_timed_out()
fifo.complete()
print(fifo)
print('done')