-
Notifications
You must be signed in to change notification settings - Fork 79
/
verification.py
154 lines (101 loc) · 2.67 KB
/
verification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
Verification script for RuuviTags. Requires at least one active RuuviTag.
"""
import time
from ruuvitag_sensor.decoder import Df3Decoder, UrlDecoder
from ruuvitag_sensor.ruuvi import RunFlag, RuuviTagSensor
from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive
from ruuvitag_sensor.ruuvitag import RuuviTag
# Uncomment to turn on console print
# import ruuvitag_sensor
# from ruuvitag_sensor.log import log
# ruuvitag_sensor.log.enable_console()
#
# Helper Functions
#
def print_header(name):
print("############################################")
print(name)
print("############################################")
def wait_for_finish(run_flag, name):
max_time = 20
while run_flag.running:
time.sleep(0.1)
max_time -= 0.1
if max_time < 0:
raise Exception(f"{name} not finished")
#
# UrlDecoder.decode_data
#
print_header("UrlDecoder.decode_data")
decoder = UrlDecoder()
data = decoder.decode_data("AjwYAMFc")
print(data)
if not data["temperature"]:
raise Exception("FAILED")
print("OK")
#
# UrlDecoder.decode_data
#
print_header("UrlDecoder.decode_data")
decoder = Df3Decoder()
data = decoder.decode_data("03291A1ECE1EFC18F94202CA0B5300000000BB")
print(data)
if not data["temperature"]:
raise Exception("FAILED")
print("OK")
#
# RuuviTagSensor.get_data_for_sensors
#
print_header("RuuviTagSensor.get_data_for_sensors")
data = RuuviTagSensor.get_data_for_sensors(search_duration_sec=15)
print(data)
if not data:
raise Exception("FAILED")
print("OK")
#
# RuuviTagSensor.get_data_for_sensors with macs
#
print_header("RuuviTagSensor.get_data_for_sensors with macs")
data = RuuviTagSensor.get_data_for_sensors(list(data.keys())[0], search_duration_sec=15)
print(data)
if not data:
raise Exception("FAILED")
print("OK")
#
# RuuviTag.update
#
print_header("RuuviTag.update")
tag = RuuviTag(list(data.keys())[0])
tag.update()
print(tag.state)
if not tag.state:
raise Exception("FAILED")
print("OK")
#
# RuuviTagSensor.get_data
#
print_header("RuuviTagSensor.get_data")
flag = RunFlag()
def handle_data(found_data):
flag.running = False
if not found_data:
raise Exception("FAILED")
print("OK")
RuuviTagSensor.get_data(handle_data, run_flag=flag)
wait_for_finish(flag, "RuuviTagSensor.get_data")
#
# ruuvi_rx.subscribe
#
print_header("ruuvi_rx.subscribe")
ruuvi_rx = RuuviTagReactive()
def hadle_rx(found_data):
print(found_data)
ruuvi_rx.stop()
if not found_data:
raise Exception("FAILED")
print("OK")
ruuvi_rx.get_subject().subscribe(hadle_rx)
# pylint: disable=protected-access
wait_for_finish(ruuvi_rx._run_flag, "ruuvi_rx.subscribe")
print("Verification OK")