-
Notifications
You must be signed in to change notification settings - Fork 0
/
TestingCenter.py
executable file
·146 lines (117 loc) · 3.77 KB
/
TestingCenter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/bin/python3
import socket
import time
import logging
"""
This class is responsible for connecting to BeeeOn gateway
running TestingCenter on specific host and port, with use
of class Commander, we are able to send commands to Gateway.
"""
class TestingCenter:
PROMPT = "> "
def __init__(self, host, port):
self.sockAddr = (host, port)
self.socket = None
def connect(self):
connectCount = 0
connectLimit = 5
while self.socket is None:
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self.sockAddr)
message = self.nextMessage()
except Exception as ex:
connectCount += 1
if connectCount < connectLimit:
print("connecting failed, retry")
time.sleep(1)
else:
raise ex
"""
Send the given message to testing center. Newline character is automatically
appended at the end of the message.
"""
def send(self, message):
if self.socket is None:
self.connect()
message = message + "\n"
self.socket.sendall(message.encode())
logging.debug("Sending message:\"" + message + "\"")
"""
Receive data from TestingCenter until next console prompt
is received. Return list of loaded lines from TestingCenter.
"""
def nextMessage(self): # TODO timeout
acc = ""
stop = False
while not stop:
data = self.socket.recv(4096)
acc += data.decode('utf-8')
logging.debug("Received: \"" + data.decode('utf-8') + "\"")
if (TestingCenter.PROMPT) in acc:
return acc.split("\n")[:-1]
def listDevices(self):
self.send("device list-new")
return self.nextMessage()
def disconnect(self):
if not self.socket is None:
self.socket.close()
def createCommander(self):
return Commander(self)
class Commander:
def __init__(self, testingCenter):
self.testingCenter = testingCenter
def waitForDone(self, lastResponse, timeout):
response = lastResponse
while not self.isDone(response[0]):
self.testingCenter.send("wait-queue " + timeout)
response = self.testingCenter.nextMessage()
if len(response) != 1:
logging.critical("COMMAND DID NOT FINISHED")
raise Exception("command did not finish in timeout")
return response
def sendCommand(self, command, timeout="15000"):
self.testingCenter.send(command)
response = self.testingCenter.nextMessage()
if "error" in response[0]:
return (0,0)
response = self.waitForDone(response, timeout)
return self.parseResult(response)
def isDone(self, message):
return "DONE" in message
"""
Parse the result for sent command or wait-queue expected in
format "COMMANDID DONE X/X [{S,F}]"
E.g.:
"0x7F0188004600 DONE 6/6 SSSSSS"
"0x7F0188004600 DONE 6/6 SSFSSF"
"0x7F0188004600 DONE 0/0"
Return tuple containing count of success and failures.
"""
def parseResult(self, result):
tokens = result[0].split(" ")
success = 0
failure = 0
if len(tokens) == 4:
for resultChar in tokens[-1]:
if resultChar == "S":
success += 1
elif resultChar == "F":
failure += 1
return (success, failure)
def listen(self, timeout = ""):
commandTimeout = (int(timeout) + 10) * 1000
return self.sendCommand("command listen %s" % timeout, str(commandTimeout))
def deviceAccept(self, deviceID):
return self.sendCommand("command device-accept %s" % deviceID)
def unpair(self, deviceID):
return self.sendCommand("command unpair %s" % deviceID)
def deviceSetValue(self, deviceID, moduleID, value, timeout = ""):
return self.sendCommand("command set-value %s %s %s %s" % (deviceID, moduleID, value, timeout))
"""
Send echo message and receive echo response. Given message and its response should be identical strings. This
comparison can be used to verify basic functionality of connected TestingCenter.
"""
def echo(self, message):
self.testingCenter.send("echo " + message)
return self.testingCenter.nextMessage()[0]