-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.py
120 lines (89 loc) · 3.39 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Connection handles serial connection with the printer mainboard
import serial
import time
import threading
serial_path = 'COM4'
serial_list = ["COM1", "COM2", "COM3", "COM4", "COM5"]
baud_rate = 115200
global globalresponse
globalresponse = ""
global ready_for_row
ready_for_row = False
def connection_delay(seconds):
start_time = time.monotonic()
while time.monotonic() - start_time < seconds:
pass
class Connection:
def __init__(self):
try:
self.ser = serial.Serial(serial_path, baud_rate, timeout=1)
print("Connected")
except Exception as e:
print(f"Error encountered during connection: {e}")
def send(self, data):
formatted = data + "\n"
# Homing specific
if(data=="G28 X Y"):
homing_complete = False
time.sleep(1)
self.ser.write(formatted.encode())
print(f"Sent command {data}")
connection_delay(1)
while(homing_complete == False):
response = self.ser.readline()
# print(f"Response: {response.decode()}")
connection_delay(1)
if("busy" not in response.decode() and "processing" not in response.decode()):
homing_complete = True
print("Homing complete")
else:
print("Homing ongoing")
# Process all other moves
else:
try:
move_complete = False
self.ser.write(formatted.encode())
print(f"Sent command {data}")
connection_delay(1)
while move_complete==False:
response = self.ser.readline()
# print(f"Response: {response.decode()}")
time.sleep(0.6)
global globalresponse
global ready_for_row
globalresponse = response.decode()
if('Row Complete' in globalresponse):
ready_for_row = True
if ("ok" in response.decode() and "processing" not in response.decode()):
move_complete = True
print(f"move complete {data}, response: {response.decode()}")
if('Row Complete' in data):
self.override_ready()
print("Row Ready initiated from line 83")
else:
pass
print(f"move ongoing | response: {response.decode()}")
except Exception as e:
print(f"Error : {e}")
print(f"Error encountered during GCode send: {data}")
def get_state(self):
return globalresponse
def read_next(self):
global globalresponse
response = self.ser.readline()
globalresponse = response.decode()
global ready_for_row
if('Row Complete' in globalresponse):
ready_for_row = True
def is_ready_for_row(self):
global ready_for_row
print(f'Row Ready: {ready_for_row}')
return ready_for_row
def reset_row(self):
global ready_for_row
ready_for_row = False
print("row state reset")
def override_ready(self):
global ready_for_row
ready_for_row = True
print("Ready override triggered")