-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserial_interface.py
123 lines (103 loc) · 4.29 KB
/
serial_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import argparse
import re
import serial
import time
class SerialInterface():
def __init__(self):
self.parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# self.parser.add_argument("--cmd", help="command", type=str, default=None)
self.parser.add_argument("-port", help="Serial Port", type=str, default=None)
self.args = self.parser.parse_args()
self.serialClient = None
self.isLogin = False
self.serialClient = serial.Serial()
# ser.port = "/dev/ttyUSB0"
self.serialClient.port = self.args.port
# ser.port = "/dev/ttyS2"
self.serialClient.baudrate = 115201
self.serialClient.bytesize = serial.EIGHTBITS # number of bits per bytes
self.serialClient.parity = serial.PARITY_NONE # set parity check: no parity
self.serialClient.stopbits = serial.STOPBITS_ONE # number of stop bits
# ser.timeout = None #block read
# self.serialClient.timeout = None # non-block read
# ser.timeout = 2 #timeout block read
self.serialClient.writeTimeout = 2 # timeout for write
self.connect()
# self.cmd = 'ifconfig'
def connect(self):
try:
self.serialClient.open()
# serialClient = serial.serial_for_url("loop://")
except Exception as exp:
print("Not able to open PORT: {}".format(exp))
print("PORT Open: {}".format(self.serialClient.is_open))
serialClient = None
return False
print("connected to {}".format(self.serialClient.port))
return True
def execute(self, cmd):
out_write = self.writeCommandSerial(cmd)
print("out_write: {}".format(out_write))
bufStr = self.readSerial(timeout=3.0)
output = [item.rstrip() for item in bufStr.split('\n')][1:-1]
output = '\n'.join(output)
print("bufstr: {}".format(output))
ip_list = re.findall('(inet\saddr:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', output)
print ("IP List {}".format(ip_list))
str, def_ip = ip_list[0].split(':')
print(def_ip)
print ("Actual IP {}".format(def_ip))
self.disconnect()
return def_ip
def writeSerial(self, cmdStr, isDoNotFlush=False):
print("Enter writeSerial")
print("PORT Open: {}".format(self.serialClient.is_open))
if self.serialClient is None or not self.serialClient.is_open:
return None
if cmdStr is None or len(cmdStr) <= 0:
return None
numBytesWritten = self.serialClient.write(self.s2b(cmdStr))
print("Check isDoNotFlush {}".format(isDoNotFlush))
if not isDoNotFlush:
self.serialClient.flush()
return numBytesWritten
def readSerial(self, numBytes=1024, timeout=None):
print("Enter readSerial")
print("PORT Open: {}".format(self.serialClient.is_open))
if self.serialClient is None or not self.serialClient.is_open:
return None
time.sleep(3)
if timeout is not None:
self.serialClient.timeout = timeout
buf = self.serialClient.read(numBytes).decode('utf-8', 'ignore')
strBuf = self.b2s(buf)
self.serialClient.timeout = None
return strBuf
def b2s(self, buf):
return str(buf)
def s2b(self, textStr):
print("String to byte")
return bytes(textStr)
def writeCommandSerial(self, cmdStr):
if cmdStr is None:
return None
return self.writeSerial(str(cmdStr) + "\n")
def disconnect(self):
if not self.serialClient is None:
if self.isLogin:
self.writeCommandSerial("exit")
print("logged out")
# hang up
self.serialClient.close()
print("disconnected from remote")
# print("PORT Open: {}".format(self.serialClient.is_open))
return
if __name__ == '__main__':
try:
obj = SerialInterface()
ip = obj.execute('ifconfig')
print (ip)
except EOFError:
print("End of File Error")
except Exception as e:
print("Exception occurred {}".format(e))