-
Notifications
You must be signed in to change notification settings - Fork 31
/
ecr.py
415 lines (374 loc) · 13.5 KB
/
ecr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# -*- coding: utf-8 -*-
#!/usr/bin/env python
"""
Maybe create a small console program which allows us to:
- send packets directly
- receive them directly
- see the binary data of the packet
- see the representation of the packet
- ability for incoming and outgoing
"""
from ecrterm.utils import is_stringlike
from ecrterm.packets import *
from ecrterm import transmission
from ecrterm.transmission.signals import *
import time, logging
from ecrterm.common import TERMINAL_STATUS_CODES
class A(object):
def write(self, *args, **kwargs):
pass
_logfile = A()
def dismantle_serial_packet(data):
apdu = []
crc = None
i = 2
header = data[:i]
#header = conv.bs2hl(header)
# test if there was a transmission:
if header == []:
raise common.TransportLayerException('No Header')
# test our header to be valid
if header != [DLE, STX]:
raise common.TransportLayerException("Header Error: %s" % header)
# read until DLE, ETX is reached.
dle = False
while not crc and i < len(data):
b = data[i] # read a byte.
if b == ETX and dle:
# dle was set, and this is ETX, so we are at the end.
# we read the CRC now.
crc = [data[i + 1], data[i + 2]]
# and break
continue
elif b == DLE:
if not dle:
# this is a dle
dle = True
continue
else:
# this is the second dle. we take it.
dle = False
elif dle:
# dle was set, but we got no etx here.
# this seems to be an error.
raise Exception("DLE without sense detected.")
# we add this byte to our apdu.
apdu += [b]
i += 1
return crc, apdu
def parse_represented_data(data):
# represented data
if is_stringlike(data):
# we assume a bytelist like 10 02 03....
data = conv.toBytes(data)
# first of all, serial data starts with 10 02, so everything
# starting with 10 will be assumed as "serial packet" and first "demantled"
if data[0] == DLE:
try:
crc, data = dismantle_serial_packet(data)
except common.TransportLayerException:
pass
elif data[0] == ACK:
if len(data) == 1:
return 'ACK'
elif data[0] == NAK:
if len(data) == 1:
return 'NAK'
# then we create the packet and return that.
p = Packet.parse(data)
return p
def ecr_log(data, incoming=False):
try:
if incoming:
incoming = '<'
else:
incoming = '>'
if is_stringlike(data):
data = conv.bs2hl(data)
# logit to the logfile
try:
_logfile.write('%s %s\n' % (incoming, conv.toHexString(data)))
except:
pass
try:
data = repr(parse_represented_data(data))
_logfile.write('= %s\n' % data)
except Exception as e:
print("DEBUG: Cannot be represented: %s" % data)
print(e)
_logfile.write('? did not understand ?\n')
data = conv.toHexString(data)
print("%s %s" % (incoming, data))
except:
import traceback
traceback.print_exc()
print("| error in log")
class ECR(object):
transmitter = None
transport = None
version = None
terminal_id = None
MAX_TEXT_LINES = 4
_state_registered = None
_state_connected = None
def __get_last(self):
if self.transmitter is not None:
return self.transmitter.last
#!: Last is a short access for transmitter.last if possible.
last = property(__get_last)
def __init__(self, device='/dev/ttyUSB0', password='123456'):
"""
Initializes an ECR object and connects to the serial device given
Fails if Serial Device is not found.
You can access the Device on low level as the `transport`
You can access the Protocol Handler on low level as `transmission`
"""
self.transport = transmission.SerialTransport(device)
#self.transport.slog = ecr_log
self.daylog = []
self.daylog_template = ''
self.history = []
self.terminal_id = None
# we save some states here.
self._state_registered = False
self._state_connected = False
self.password = password
if self.transport.connect():
self.transmitter = transmission.Transmission(self.transport)
self._state_connected = True
else:
raise Exception("ECR could not connect.")
def register(self, config_byte):
"""
registers this ECR at the PT, locking menus
for real world conditions.
"""
kwargs = {}
if self.password:
kwargs['password'] = self.password
if config_byte is not None:
kwargs['config_byte'] = config_byte
ret = self.transmit(Registration(**kwargs))
if ret == TRANSMIT_OK:
# get the terminal-id if its there.
for inc, packet in self.transmitter.last_history:
if inc and isinstance(packet, Completion):
if 'tid' in packet.bitmaps_as_dict().keys():
self.terminal_id = packet.bitmaps_as_dict()\
.get('tid', BCD(0)).value()
# remember this.
self._state_registered = True
return ret
def register_unlocked(self):
"""
registers to the PT, not locking the master menu on it.
do not use in production environment.
"""
ret = self.transmit(
Registration(password=self.password,
config_byte=Registration.generate_config(
ecr_controls_admin=False),))
if ret == TRANSMIT_OK:
self._state_registered = True
return ret
def _end_of_day_info_packet(self, history=None):
'''
search for an end of day packet status information in the last packets
can also search in any history list.
'''
# helper function to scan for end of day information via packets.
status_info = None
plist = history or self.transmitter.last_history
for inc, packet in plist:
if inc: # incoming
if isinstance(packet, StatusInformation):
status_info = packet
if status_info:
eod_info = status_info.get_end_of_day_information()
# we add terminal id to it.
eod_info['terminal-id'] = self.terminal_id
return eod_info
def end_of_day(self):
"""
- sends an end of day packet.
- saves the log in `daylog`
@returns: 0 if there were no protocol errors.
"""
#old_histoire = self.transmitter.history
#self.transmitter.history = []
# we send the packet
result = self.transmit(EndOfDay(self.password))
# now save the log
self.daylog = self.last_printout()
if not self.daylog:
# there seems to be no printout. we search in statusinformation.
eod_info = self._end_of_day_info_packet()
try:
self.daylog = (self.daylog_template % eod_info).split('\n')
except:
import traceback
traceback.print_exc()
logging.error('Error in Daylog Template')
return result
def last_printout(self):
"""
returns all printlines from the last history.
@todo: TextBlock support - if some printer decides to do it that way.
"""
printout = []
for entry in self.transmitter.last_history:
inc, packet = entry
#old_histoire += [(inc, packet)]
if inc and isinstance(packet, PrintLine):
printout += [ packet.fixed_values['text'] ]
return printout
def payment(self, amount_cent=50):
"""
executes a payment in amount of cents.
@returns: True, if payment went through, or False if it was
canceled.
throws exceptions.
"""
pkg = Authorisation(
amount=amount_cent, # in cents.
currency_code=978, #euro, only one that works, can be skipped.
)
code = self.transmit(pkg)
if code == 0:
# now check if the packet actually got what it wanted.
if self.transmitter.last.completion:
if isinstance(self.transmitter.last.completion, Completion):
return True
else:
return False
else:
# @todo: remove this.
print("transmit error?")
return False
def restart(self):
"""
restarts/resets the PT.
"""
self._state_registered = False
return self.transmit(ResetTerminal())
def reset(self):
"""
- resets transport: @see ecrterm.transmission.Transport.reset()
- restarts pt: @see self.restart()
"""
self.transport.reset()
time.sleep(1)
ret = self.restart()
time.sleep(1)
return ret
def show_text(self,
lines=None,
duration=5,
beeps=0):
"""
displays a text on the PT screen for duration of seconds.
@param lines: a list of strings.
@param duration: 0 for forever.
@param beeps: make some noise.
@note: any error due to wrong strings given are not checked.
"""
lines = lines or ['Hello world!', ]
kw = {'display_duration': duration}
if beeps:
kw['beeps'] = int(beeps)
i = 1
for line in lines[:self.MAX_TEXT_LINES]:
kw['line%s' % i] = line
i += 1
return self.transmit(ShowText(**kw))
def status(self):
"""
executes a status enquiry. also sets self.version if not set.
success:
returns 0 if successful, and status is unchanged.
returns an int status code if status has changed.
errors:
returns None if no status was transmitted.
returns False on transmit errors.
to check for the status code:
common.TERMINAL_STATUS_CODES.get( status, 'Unknown' )
"""
errors = self.transmit(StatusEnquiry(self.password))
if not errors:
if isinstance(self.last.completion, Completion):
# try to get version
if not self.version:
self.version = self.last.completion.fixed_values.get('sw-version', None)
return self.last.completion.fixed_values.get('terminal-status', None)
# no completion means some error.
return False
def transmit(self, packet):
"""
transmits a packet, therefore introducing the protocol cascade.
rewrite this function if you want packets be routed anywhere
since the whole ECR Object uses this function to transmit.
use `last` property to access last packet transmitted.
"""
# we actually make a small sleep, allowing better flow.
time.sleep(0.2)
transmission = self.transmitter.transmit(packet)
return transmission
# dev functions.
#########################################################################
def wait_for_status(self):
"""
waits until self.status() returns 0 (or False/None)
polls the PT in 2 second intervals.
this function prints out the status string.
use it as code example.
"""
status = self.status()
while status:
print(TERMINAL_STATUS_CODES.get(status, 'Unknown Status'))
time.sleep(2)
status = self.status()
def listen(self, timeout=15):
"""
dev function to simply listen.
"""
ok, message = None, None
while True:
try:
ok, message = self.transport.receive(timeout)
if ok and message:
return message
except Exception as e:
print(e)
continue
print("-mark-")
def devprint_packets(self):
"""
dev function to execute the script located in base_packets
useful to get a list of all parsed packets.
"""
from pprint import pprint
pprint(Packets.packets)
def devprint_bitmaps(self):
"""
dev function to execute the script located in bitmaps
useful to get a list of all valid bitmaps.
"""
from pprint import pprint
from ecrterm.packets.bitmaps import BITMAPS_ARGS
pprint(BITMAPS_ARGS)
def detect_pt(self):
# note: this only executes utils.detect_pt with the local ecrterm.
from ecrterm.utils import detect_pt
result = detect_pt(silent=False, ecr=self, timeout=2)
return result
def parse_str(self, s):
return parse_represented_data(s)
if __name__ == '__main__':
_logfile = open('./terminallog.txt', 'aw')
_logfile.write('-MARK-\n')
e = ECR()
#e.end_of_day()
e.show_text(['Hello world!', 'Testing', 'myself.'], 5, 0)
print("preparing for payment.")
e.get_ready()
print(e.payment(50))