-
Notifications
You must be signed in to change notification settings - Fork 0
/
SerialCommunication.py
240 lines (204 loc) · 8.54 KB
/
SerialCommunication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
# modified from https://blog.csdn.net/u013541325/article/details/113062191
import binascii
import serial # need to install pyserial first
import serial.tools.list_ports
import time
# global variables
# whether the serial port is created successfully or not
Ret = False
# list of serial port number
port_list_number = []
# list of serial port names
port_list_name = []
class Communication(object):
"""
Python serial communication package class
"""
# initialization
def __init__(self, com, bps, timeout):
self.port = com # serial port number
self.bps = bps # baud rate
self.timeout = timeout # timeout
self.main_engine = None # global serial communication object
global Ret
Ret = False
self.data = None
self.b_c_text = None
try:
# open the serial port and get the serial port object
self.main_engine = serial.Serial(self.port, self.bps, timeout=self.timeout)
# determine whether the opening is successful
if self.main_engine.is_open:
Ret = True
# print("Ret = ", Ret)
except Exception as e:
print("---Exception---:", e)
def Print_Name(self):
"""
Print the basic information of the device
"""
print(self.main_engine.name) # device name
print(self.main_engine.port) # read or write port
print(self.main_engine.baudrate) # baud rate
print(self.main_engine.bytesize) # byte size
print(self.main_engine.parity) # parity
print(self.main_engine.stopbits) # stop bits
print(self.main_engine.timeout) # read timeout setting
print(self.main_engine.writeTimeout) # write timeout setting
print(self.main_engine.xonxoff) # software flow control setting
print(self.main_engine.rtscts) # hardware (RTS/CTS) flow control setting
print(self.main_engine.dsrdtr) # hardware (DSR/DTR) flow control setting
print(self.main_engine.interCharTimeout) # character interval timeout
def Open_Engine(self):
"""
open serial port
"""
global Ret
# If the serial port is not open, open the serial port
if not self.main_engine.is_open:
self.main_engine.open()
Ret = True
def Close_Engine(self):
"""
close serial port
"""
global Ret
# print(self.main_engine.is_open) # check if the serial port is open
# determine whether to open
if self.main_engine.is_open:
self.main_engine.close() # close serial port
Ret = False
@staticmethod
def Print_Used_Com():
"""
print the list of available serial ports
"""
port_list_name.clear()
port_list_number.clear()
port_list = list(serial.tools.list_ports.comports())
if len(port_list) <= 0:
print("The Serial port can't find!")
else:
for each_port in port_list:
port_list_number.append(each_port[0])
port_list_name.append(each_port[1])
# print(port_list_number)
# print(port_list_name)
return port_list_number
# Receive data of specified size
# Read size bytes from the serial port.
# If a timeout is set it may return less characters as requested.
# With no timeout it will block until the requested number of bytes is read.
def Read_Size(self, size):
"""
Receive data of specified size
:param size:
:return:
"""
return self.main_engine.read(size=size)
# Receive a line of data
# When using readline(), you should pay attention:
# you should specify a timeout when opening the serial port,
# otherwise, if the serial port does not receive a new line,
# it will wait forever.
# If there is no timeout, readline() will report an exception.
def Read_Line(self):
"""
Receive a line of data
:return:
"""
return self.main_engine.readline()
def Send_data(self, data):
"""
send data
:param data:
"""
self.main_engine.write(data)
# more examples
# self.main_engine.write(bytes(listData)) # send list data listData = [0x01, 0x02, 0xFD] or listData = [1, 2, 253]
# self.main_engine.write(chr(0x06).encode("utf-8")) # send a data in hexadecimal
# print(self.main_engine.read().hex()) # read a byte in hexadecimal
# print(self.main_engine.read()) # read a byte
# print(self.main_engine.read(10).decode("gbk")) # read 10 bytes
# print(self.main_engine.readline().decode("gbk")) # read a line
# print(self.main_engine.readlines()) # read multiple lines, return the list, must match the timeout (timeout) use
# print(self.main_engine.in_waiting) # get the remaining bytes of the input buffer
# print(self.main_engine.out_waiting) # Get the remaining bytes of the output buffer
# print(self.main_engine.readall())# read all characters
# receive data
# an integer data occupies two bytes
# a character occupies one byte
def Receive_data_once(self, way):
"""
receive data
:param way:
"""
# Receiving data once, no loop
try:
# byte-by-byte reception
if self.main_engine.in_waiting:
if way == 0:
for i in range(self.main_engine.in_waiting):
print("receive ascii data:" + str(self.Read_Size(1)))
# data1 = self.Read_Size(1).hex() # convert to hexadecimal
# data2 = int(data1, 16) # convert to decimal
# print("received data hexadecimal:"+data1+" received data decimal:"+str(data2))
if way == 1:
# overall reception
self.data = self.main_engine.read(self.main_engine.in_waiting).decode("utf-8") # method 1
#self.data = self.main_engine.read_all() # method 2
#print("receive ascii data:", self.data)
'''
Hex Bytes to Bytes,e.g. b'\x01\x06\xaa\x92\x12' convert to b'0106aa9212'
'''
# self.b_c_text = binascii.hexlify(self.data)
# print("Bytes data:", self.b_c_text)
except:# Exception as e:
print("Exception report") #:", e)
#print("Data received!")
def Receive_data(self, way):
"""
receive data
:param way:
"""
# Receiving data cyclically, this is an endless loop,
# which can be implemented by threads
print("Start receiving data:")
bWaitRec = True
while bWaitRec:
#time.sleep(0.005)
try:
# byte-by-byte reception
if self.main_engine.in_waiting:
if way == 0:
for i in range(self.main_engine.in_waiting):
print("receive ascii data:" + str(self.Read_Size(1)))
# data1 = self.Read_Size(1).hex() # convert to hexadecimal
# data2 = int(data1, 16) # convert to decimal
# print("received data hexadecimal:"+data1+" received data decimal:"+str(data2))
if way == 1:
# overall reception
self.data = self.main_engine.read(self.main_engine.in_waiting).decode("utf-8") # method 1
#self.data = self.main_engine.read_all() # method 2
print("receive ascii data:", self.data)
'''
Hex Bytes to Bytes,e.g. b'\x01\x06\xaa\x92\x12' convert to b'0106aa9212'
'''
# self.b_c_text = binascii.hexlify(self.data)
# print("Bytes data:", self.b_c_text)
# if data is empty, exit the receiving loop
if self.data.strip() == '':
bWaitRec = False
except:# Exception as e:
print("Exception report") #:", e)
print("Data received!")
# if __name__ == '__main__':
# Communication.Print_Used_Com()
# port = port_list_number
# myCom = Communication(port[0], 115200, 0.5)
# print("Ret = ", Ret)
# myCom.Open_Engine()
# myCom.Receive_data(1)
# myCom.Close_Engine()