-
Notifications
You must be signed in to change notification settings - Fork 0
/
device.py
318 lines (242 loc) · 9.2 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# -*- coding: utf-8-unix -*-
import sys, os
import platform
from cffi import FFI
from ucdev.common import GPIO, SPI, I2C
from header import src as cdef_src
import logging
log = logging.getLogger(__name__)
class CyUSBSerial(object):
__self = None
def __new__(cls, lib=None, ffi=None):
if not cls.__self:
if not ffi:
ffi = FFI()
obj = super(CyUSBSerial, cls).__new__(cls)
obj.ffi = ffi
obj.ffi.cdef(cdef_src)
obj.api = ffi.dlopen(lib if lib else "cyusbserial")
# initialize if API exists
if hasattr(obj.api, 'CyLibraryInit'):
rc = obj.api.CyLibraryInit()
if rc != obj.api.CY_SUCCESS:
raise Exception("ERROR: CyLibraryInit=%d" % rc)
cls.__self = obj
return cls.__self
def __del__(self):
# finalize if API exists
if self.api and hasattr(self.api, 'CyLibraryExit'):
self.api.CyLibraryExit()
def find(self, finder=None, vid=None, pid=None):
ffi, api = self.ffi, self.api
os = platform.system()
nr = ffi.new("UINT8 *")
rc = api.CyGetListofDevices(nr)
info = ffi.new("CY_DEVICE_INFO *")
for devno in range(0, nr[0]):
rc = api.CyGetDeviceInfo(devno, info)
if os == 'Windows' and info.deviceBlock != api.SerialBlock_SCB0:
continue
found = True
if finder:
found = finder(info)
elif vid or pid:
iv = info.vidPid.vid
ip = info.vidPid.pid
found = (vid, pid) in ((iv, ip), (iv, None), (None, ip))
if found:
yield CyUSBSerialDevice(self, devno, 0)
######################################################################
class CyUSBSerialDevice(object):
def __init__(self, lib, devno, ifnum):
self.lib = lib
self.devno = devno
self.ifnum = ifnum
self.dev = None
self.raise_on_error = True
# import API symbols from the library
dummy = self.CY_SUCCESS
self.__dict__.update(lib.api.__dict__)
# delegate API calls to the library
def __getattr__(self, key):
lib, api = self.lib, self.lib.api
val = getattr(api, key)
# wrap API so device handle is handled automatically
if callable(val):
def wrap(self, name, func):
def wrapper(*args, **kwargs):
# automatically open handle on first call
if not self.dev:
self.open()
# delegate API call
rc = func(self.dev, *args, **kwargs)
if self.raise_on_error and rc != api.CY_SUCCESS:
sym = self.err_to_sym(rc)
msg = "ERROR: {0}={1}, {2}".format(name, rc, sym)
raise Exception(msg)
# invalidate handle to force reopen on next call
elif name in ('CyCyclePort', 'CyResetDevice'):
self.dev = None
return rc
return wrapper
val = wrap(self, key, val)
# save as local attribute to help ipython completion
setattr(self, key, val)
return val
def __del__(self, *args):
self.close()
def err_to_sym(self, rc):
for k, v in vars(self.lib.api).items():
if k.startswith("CY_ERROR") and v == rc:
return k
return "UNKNOWN"
def open(self):
lib, ffi, api = self.lib, self.lib.ffi, self.lib.api
rc = api.CY_SUCCESS
if not self.dev:
dev = ffi.new("CY_HANDLE *")
rc = api.CyOpen(self.devno, self.ifnum, dev)
self.dev = dev[0]
return rc
def close(self):
lib, ffi, api = self.lib, self.lib.ffi, self.lib.api
if self.dev:
api.CyClose(self.dev)
self.dev = None
######################################################################
class CyI2C(SPI):
def __init__(self, dev):
if not isinstance(dev, CyUSBSerialDevice):
msg = "ERROR: Not a CyUSBSerialDevice object: %s" % str(dev)
raise Exception(msg)
self.dev = dev
def set_config(self, config):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
cfg = ffi.new("CY_I2C_CONFIG *", config)
return dev.CySetI2cConfig(cfg)
def get_config(self):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
cfg = ffi.new("CY_I2C_CONFIG *")
rc = dev.CyGetI2cConfig(cfg)
if rc != api.CY_SUCCESS:
raise Exception("ERROR: CyGetI2cConfig=%d" % rc)
ret = {}
for k, v in ffi.typeof(cfg[0]).fields:
ret[k] = getattr(cfg, k)
return ret
def prepare(self, slaveAddress, isStopBit=1, isNakBit=0):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
cfg = ffi.new("CY_I2C_DATA_CONFIG *",
(slaveAddress, isStopBit, isNakBit))
return cfg
def reset(self, resetMode=0):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
rc = dev.CyI2cReset(resetMode)
log.debug("rc=%d" % rc)
def read(self, cfg, data, timeout=1000):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
rlen = len(data)
rbuf = ffi.new("UCHAR[%d]" % rlen)
rcdb = ffi.new("CY_DATA_BUFFER *", (rbuf, rlen, 0))
rc = dev.CyI2cRead(cfg, rcdb, timeout)
log.debug("r:" + " ".join(["{:08b}".format(i) for i in rbuf]))
return bytearray(ffi.buffer(rcdb.buffer, rcdb.transferCount)[0:])
def write(self, cfg, data, timeout=1000):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
wlen = len(data)
wbuf = ffi.new("UCHAR[%d]" % wlen, bytes(data))
wcdb = ffi.new("CY_DATA_BUFFER *", (wbuf, wlen, 0))
log.debug("w:" + " ".join(["{:08b}".format(i) for i in wbuf]))
rc = dev.CyI2cWrite(cfg, wcdb, timeout)
return rc
######################################################################
class CySPI(SPI):
MOTOROLA = TI = NS = None
# Used for GPIO-based chip-select
CSN = property(lambda s:s.__csn.get() if s.__csn else None,
lambda s,v:s.__csn.set(1 if v else 0) if s.__csn else None)
def __init__(self, dev, CSN=None):
if not isinstance(dev, CyUSBSerialDevice):
msg = "ERROR: Not a CyUSBSerialDevice object: %s" % str(dev)
raise Exception(msg)
self.dev = dev
self.__csn = CSN
# FIXME:
# - These should be set on load-time, not run-time.
CySPI.MOTOROLA = dev.lib.api.CY_SPI_MOTOROLA
CySPI.TI = dev.lib.api.CY_SPI_TI
CySPI.NS = dev.lib.api.CY_SPI_NS
def set_config(self, config):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
cfg = ffi.new("CY_SPI_CONFIG *", config)
return dev.CySetSpiConfig(cfg)
def get_config(self):
dev = self.dev
ffi = dev.lib.ffi
api = dev.lib.api
cfg = ffi.new("CY_SPI_CONFIG *")
rc = dev.CyGetSpiConfig(cfg)
if rc != api.CY_SUCCESS:
raise Exception("ERROR: CyGetSpiConfig=%d" % rc)
ret = {}
for k, v in ffi.typeof(cfg[0]).fields:
ret[k] = getattr(cfg, k)
return ret
def send(self, data, timeout=1000):
dev = self.dev
ffi = dev.lib.ffi
wlen = len(data)
wbuf = ffi.new("UCHAR[%d]" % wlen, bytes(data))
wcdb = ffi.new("CY_DATA_BUFFER *", (wbuf, wlen, 0))
rlen = len(data)
rbuf = ffi.new("UCHAR[%d]" % rlen)
rcdb = ffi.new("CY_DATA_BUFFER *", (rbuf, rlen, 0))
log.debug("w:" + " ".join(["{:08b}".format(i) for i in wbuf]))
self.CSN = 1
rc = dev.CySpiReadWrite(rcdb, wcdb, timeout)
self.CSN = 0
log.debug("r:" + " ".join(["{:08b}".format(i) for i in rbuf]))
return bytearray(ffi.buffer(rcdb.buffer, rcdb.transferCount)[0:])
######################################################################
class CyGPIO(GPIO):
def __init__(self, dev):
if not isinstance(dev, CyUSBSerialDevice):
msg = "ERROR: Not a CyUSBSerialDevice object: %s" % str(dev)
raise Exception(msg)
self.dev = dev
def set(self, pin, val):
dev = self.dev
api = dev.lib.api
ret = dev.CySetGpioValue(pin, val)
if ret != api.CY_SUCCESS:
sym = dev.err_to_sym(ret)
msg = "ERROR: CySetGpioValue={0}, {1}".format(ret, sym)
raise Exception(msg)
def get(self, pin):
dev = self.dev
api = dev.lib.api
ffi = dev.lib.ffi
val = ffi.new("UINT8 *")
ret = dev.CyGetGpioValue(pin, val)
if ret != api.CY_SUCCESS:
sym = dev.err_to_sym(ret)
msg = "ERROR: CyGetGpioValue={0}, {1]".format(ret, sym)
raise Exception(msg)
return val[0]
######################################################################
# export symbols
__all__ = [i for i in list(locals()) if i.startswith("Cy")]