-
Notifications
You must be signed in to change notification settings - Fork 2
/
microtest.py
175 lines (135 loc) · 4.73 KB
/
microtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import uasyncio
from sys import modules
class Expect:
def __init__(self, value):
self.value = value
self._not = False
@property
def it_not(self):
self._not = True
return self
def to_be(self, expected):
if self._fails(self.value != expected):
raise Exception(self._format('Expected: {n}{}\nReceived: {}', expected, self.value))
def to_have_been_called(self):
if self._fails(len(self.value.calls) == 0):
raise Exception(self._format('Expected spy to {n}be called.'))
def to_have_been_called_with(self, *expected_args, **expected_kwargs):
has_expected = False
call_tuple = (expected_args, expected_kwargs)
for call in self.value.calls:
has_expected = call == call_tuple
if has_expected:
break
if self._fails(not has_expected):
message = self._format('Expected spy to {n}have been called with: {}', self._format_call(call_tuple))
if len(self.value.calls) == 0:
message += '\nReceived 0 calls'
else:
message += '\nReceived calls:\n' + '\n'.join(self._format_calls(self.value.calls))
raise Exception(message)
def to_have_been_called_times(self, expected):
count = len(self.value.calls)
if self._fails(count != expected):
raise Exception(self._format('Expected spy to {n}be called {} times. It was called {} times.', expected, count))
def to_have_been_triggered(self):
if self._fails(not self.value.triggered):
raise Exception(self._format('Expected event to {n}be triggered.'))
async def to_throw(self, exception = Exception):
threw_expected = False
try:
await self.value()
except Exception as caught_exception:
threw_expected = exception.__name__ == type(caught_exception).__name__
pass
if self._fails(not threw_expected):
raise Exception(self._format('Expected method to {n}throw a {}.', exception.__name__))
def _fails(self, value):
return not value if self._not else value
def _format(self, str, *args):
not_text = 'not ' if self._not else ''
return str.format(*args, n = not_text)
def _format_call(self, call):
args_string = ', '.join(['{}'.format(value) for value in call[0]])
kwargs_string = ', '.join('{}={}'.format(key, value) for key, value in call[1].items())
return ', '.join(filter(lambda s : s != '', [args_string, kwargs_string]))
def _format_calls(self, calls):
return ['{}: {}'.format(i, self._format_call(c)) for i, c in enumerate(calls)]
class Spy:
def __init__(self):
self._return_value = None
self._returns = []
self.calls = []
def returns(self, value):
self._return_value = value
return self
def define_returns(self, *args):
self._returns += list(args)
return self
def __call__(self, *args, **kwargs):
self.calls.append((args, kwargs))
return self._return_value if len(self._returns) == 0 else self._returns.pop(0)
class AsyncSpy(Spy):
async def __call__(self, *args, **kwargs):
self.calls.append((args, kwargs))
return self._return_value if len(self._returns) == 0 else self._returns.pop(0)
class EventObserver:
def __init__(self, event):
self._event = event
self._task = uasyncio.create_task(self._observer())
self.triggered = False
async def _observer(self):
while not self.triggered:
await self._event.wait()
self._event.clear()
self.triggered = True
async def wait(self, timeout = 3):
try:
await uasyncio.wait_for(self._task, timeout)
except:
pass
def spy():
return Spy()
def async_spy():
return AsyncSpy()
def expect(value):
return Expect(value)
def observe(event):
return EventObserver(event)
original_modules = {}
def mock_module(module_name, mock):
original_modules[module_name] = modules.get(module_name)
modules[module_name] = mock
def restore_modules():
for name, original in original_modules.items():
modules[name] = original
async def test_runner(functions):
print('\n------------------------------------------------')
passed = 0
failed = 0
for test_function in functions:
name = test_function.__name__.replace("_", " ")
try:
await test_function()
print('> PASS', name)
passed += 1
except Exception as e:
print('> FAIL', name)
print(e)
failed += 1
print('\nSummary')
print('-----------')
print('Passed: {}'.format(passed))
print('Failed: {}\n'.format(failed))
test_functions = []
only_function = []
def test(fn):
global test_functions
test_functions.append(fn)
# Convenience method during development to run only "this" test. Use in the same way as @test
def only(fn):
global only_function
only_function.append(fn)
def run():
uasyncio.run(test_runner(test_functions if len(only_function) == 0 else only_function))
restore_modules()