-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
fwoba.collector
executable file
·335 lines (259 loc) · 10.3 KB
/
fwoba.collector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Jackrabbit Relay
# 2021 Copyright © Robert APM Darin
# All rights reserved unconditionally.
# OrderBook collection and storage needs to be re-evaluated. Data requirements are extreme and CPU load is
# high. Realistacly, a separate database (Redis with timed data expiration) will be need to catalogue the
# extreme data flow.
import sys
sys.path.append('/home/JackrabbitRelay2/Base/Library')
import os
import gc
import json
import time
import datetime
import JackrabbitRelay as JRR
import JRRsupport
fwobaCFG='/home/fwoba/fwoba.cfg'
# Number o simultaneous process to handle pairs
NumberProcesses=os.cpu_count()
# Initialize global signal interceptor. prevent file trashing on CTRL-C
Log=JRR.JackrabbitLog()
interceptor=JRRsupport.SignalInterceptor(Log=Log)
# For memory DB
fwobaTimeout=5*60
###
### Collector class
###
class Collector():
def __init__(self,exchange,account,asset,saveOB=False,history=None):
# Initialize class operationing requirements
self.TickerDataDir='/home/fwoba/TickerData/'
self.Exchange=exchange.lower()
self.Account=account
self.Asset=asset.upper()
self.history=85400
if history!=None:
self.history=int(history)
self.tickerfile=self.TickerDataDir+self.Exchange+'.'+self.Account+'.'+self.Asset.replace('/','').replace('-','').replace(':','')+'.ticker'
self.filelock=JRRsupport.Locker(self.tickerfile)
self.saveOB=False
if saveOB==True:
self.saveOB=saveOB
self.TickerUpdate=False
self.oldticker=None
self.ticker=None
self.orderbook=None
self.TickerList=[]
# Initialize global signal interceptor. prevent file trashing on CTRL-C
self.Log=JRR.JackrabbitLog()
self.interceptor=JRRsupport.SignalInterceptor(Log=self.Log)
# Prime for Jackrabbit Relay for THIS processes. Each process MUST have its own SSL connection.
self.relay=JRR.JackrabbitRelay(exchange=self.Exchange,account=self.Account,asset=self.Asset,Usage=self.Help)
def Help(self,args,argslen):
print("An exchange name, an account name, and an asset are required.")
sys.exit(1)
# These functions are needed to load each ticker file set into the Config as each pair is loaded. This is done
# to prevent multiple loads os the same data.
# Load ticker data
def LoadTickerData(self):
# Prime for Jackrabbit Relay for THIS processes.
self.relay=JRR.JackrabbitRelay(exchange=self.Exchange,account=self.Account,asset=self.Asset,Usage=self.Help)
self.GetTicker()
# Reset and reload
self.TickerList=[]
self.TickerUpdate=False
if os.path.exists(self.tickerfile)==True:
self.interceptor.Critical(True)
self.filelock.Lock()
lines=JRRsupport.ReadFile(self.tickerfile).strip().split('\n')
self.filelock.Unlock()
self.interceptor.Critical(False)
self.interceptor.SafeExit()
for line in lines:
line=line.strip()
if line=='':
continue
try:
td=json.loads(line)
self.TickerList.append(td)
except: # damaged, skip
pass
# get last ticker before adding current ticker
if len(self.TickerList)>1:
self.oldticker=self.TickerList[-1]
if self.TickerList!=[]:
if self.ticker['Bid']!=self.TickerList[-1]['Bid'] \
and self.ticker['Ask']!=self.TickerList[-1]['Ask']:
self.TickerList.append(self.ticker)
self.TickerUpdate=True
else:
self.TickerList.append(self.ticker)
self.TickerUpdate=True
self.TickerList=self.TickerList[-self.history:]
# Store orderbook for volume and frequency weighted analysis.
#self.GetOrderBook()
if self.TickerUpdate==True:
self.SaveTickerData()
# Save ticker data
def SaveTickerData(self):
self.interceptor.Critical(True)
fw=JRRsupport.Locker(self.tickerfile)
self.filelock.Lock()
fh=open(self.tickerfile,"w")
ot=None
for ticker in self.TickerList:
if ot==None \
or (ticker['Bid']!=ot['Bid'] \
and ticker['Ask']!=ot['Ask']):
fh.write(json.dumps(ticker)+'\n')
ot=ticker
fh.close()
# Save orderbook
if self.saveOB==True:
self.GetOrderBook()
dt=self.TickerList[-1]['DateTime'].split(' ')[0]
tfn=self.TickerDataDir+dt+'.'+self.Exchange+'.'+self.Account+'.'+self.Asset.replace('/','')+'.orderbook'
ob={}
ob['DateTime']=self.TickerList[-1]['DateTime']
ob['OrderBook']=self.orderbook
tstr=json.dumps(ob)+'\n'
JRRsupport.AppendFile(tfn,tstr)
self.filelock.Unlock()
self.interceptor.Critical(False)
self.interceptor.SafeExit()
def GetTicker(self):
self.ticker=self.relay.GetTicker(symbol=self.Asset)
def GetOrderBook(self):
self.orderbook=self.relay.GetOrderBook(symbol=self.Asset)
def DuplicateTicker(self):
if self.oldticker==None or self.ticker==None:
return True
if self.oldticker['Bid']==self.ticker['Bid'] \
and self.oldticker['Ask']==self.ticker['Ask']:
return True
else:
return False
# Read the configuration file
# { "Exchange":"kraken","Account":"MAIN","Asset":"BTC/USDT","Percision":"1","History":"172800" }
# { "Exchange":"OANDA","Account":"CherryBlossom","Asset":"EUR/USD" }
def ReadConfig():
required=[ "Exchange","Account","Asset","History","Diagnostics" ]
fname=fwobaCFG
Config={}
lines=JRRsupport.ReadFile(fname).strip().split('\n')
for line in lines:
line=line.strip()
if line==None or line=='' or line[0]=='#':
continue
try:
cfg=json.loads(line)
except:
print(f'Config line damaged: {line}')
sys.exit(1)
if 'Diagnostics' not in cfg:
cfg['Diagnostics']='no'
else:
cfg['Diagnostics']=cfg['Diagnostics'].lower()
if "History" not in cfg:
cfg['History']=86400
else:
cfg['History']=int(cfg['History'])
if cfg['History']<86400:
cfg['History']=86400
for item in required:
if item not in cfg:
print(f'Item "{item}" is missing from this line:')
print(f'{line}')
sys.exit(1)
cfg['Key']='fwoba'
cfg['Pair']=cfg['Exchange']+'.'+cfg['Account']+'.'+cfg['Asset']
Config[cfg['Pair']]=cfg
return Config
def LoadSingleTicker(Config):
# This is a weird paradox as the connection is actually re-established to prevent "Man in the Middle"
# attacks
TickerCollect=Config['Relay']
TickerCollect.LoadTickerData()
if Config['Diagnostics']=='yes':
if not TickerCollect.DuplicateTicker():
print(Config['Pair'],TickerCollect.ticker['Bid'],TickerCollect.ticker['Spread'],TickerCollect.ticker['Ask'])
# Flush memory to keep OOM happy
TickerCollect.TickerList=None
gc.collect()
def SpawnLoadSingleTicker(*args,**kwargs):
Config=kwargs
try:
LoadSingleTicker(Config)
except Exception as e:
print(e)
# Tell parent we are done
fw=JRRsupport.Locker(Config['Key'],Timeout=fwobaTimeout)
fwobaMemory=JRRsupport.Locker(Config['Pair'],ID=Config['mID'],Timeout=fwobaTimeout)
fw.Lock()
fwobaMemory.Put(fwobaTimeout,"Done")
fw.Unlock()
###
### Main driver
###
def main():
global NumberProcesses
if len(sys.argv)>1:
NumberProcesses=int(sys.argv[1])
if NumberProcesses<1:
NumberProcesses=os.cpu_count()
cfgName=fwobaCFG
# Clear the config arguments, otherwise the Relay method will try to process it.
# This is a royal PAIN IN THE ASS to debug.
for i in range(1,len(sys.argv)):
sys.argv.remove(sys.argv[1])
fwobaMemory={}
fwobaRelay={}
fw=JRRsupport.Locker(cfgName)
while True:
Config=ReadConfig()
for cfg in Config:
if NumberProcesses>1:
if cfg not in fwobaRelay:
saveOB=False
if 'OrderBook' in Config[cfg]:
saveOB=True
fwobaRelay[cfg]=Collector(Config[cfg]['Exchange'],Config[cfg]['Account'],Config[cfg]['Asset'],history=Config[cfg]['History'],saveOB=saveOB)
JRRsupport.ElasticSleep(1)
if cfg not in fwobaMemory:
fwobaMemory[cfg]=JRRsupport.Locker(cfg,Timeout=fwobaTimeout)
# Get existing state of this pair
fw.Lock()
sData=json.loads(fwobaMemory[cfg].Get())
fw.Unlock()
status=None
if 'DataStore' in sData:
status=sData['DataStore'].lower()
else:
status=sData['Status'].lower()
# is status is NoData or DataStore status is done, then sweep this pair again. This is absolutely critical to make
# sure only 1 process is managing a single pair.
if status=='notfound' or status=='done':
fw.Lock()
fwobaMemory[cfg].Put(fwobaTimeout,"Running")
Config[cfg]['Relay']=fwobaRelay[cfg]
Config[cfg]['mID']=fwobaMemory[cfg].ID
fw.Unlock()
interceptor.StartProcess(SpawnLoadSingleTicker,kwargs=Config[cfg])
# Only allow "NumberProcesses" children to run as once.
while interceptor.GetChildren()>(NumberProcesses-1):
JRRsupport.ElasticSleep(1)
else:
saveOB=False
if 'OrderBook' in Config[cfg]:
saveOB=True
Config[cfg]['Relay']=Collector(Config[cfg]['Exchange'],Config[cfg]['Account'],Config[cfg]['Asset'],history=Config[cfg]['History'],saveOB=saveOB)
LoadSingleTicker(Config[cfg])
JRRsupport.ElasticSleep(1)
if __name__ == '__main__':
main()
###
### End of program
###