-
Notifications
You must be signed in to change notification settings - Fork 2
/
vecdbsrv.dyalog
462 lines (368 loc) · 15.1 KB
/
vecdbsrv.dyalog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
:Namespace vecdbsrv
⍝ Uses #.vecdbclt
(⎕IO ⎕ML)←1 1
RUNTIME←0 ⍝ Use runtimes?
NEXTPORT←8000
fromJSON←7159⌶
∇ {r}←Start(folder port);sink;data;event;obj;rc;wait;z;cmd;name
⍝ Run a vecdb Server - based on CONGA RPCServer sample
NEXTPORT←port+1
{}##.DRC.Init''
CONFIG←fromJSON⊃⎕NGET folder,'config.json'
Init CONFIG
{}##.DRC.Close name←'VECSRV'
:If 0=1⊃r←##.DRC.Srv name''port'Command'
1 Log'Server ''',name,''', listening on port ',⍕port
2 Log'Handler thread started: ',⍕Run&name port
:Else
3 Log'Server failed to start: ',,⍕r
:EndIf
∇
∇ {r}←Shutdown msg;db;i;j;slave
⍝ Shutdown
⍝ /// Should validate user authorisation
⍝ /// Should broadcast msg to all users
:For i :In ⍳≢DBs ⍝ Close all slaves
db←i⊃DBs
:For j :In ⍳≢db.Slaves
slave←j⊃db.Slaves
#.vecdbclt.SrvDo slave.Connection('Shutdown'TOKEN)
{}#.DRC.Close slave.Connection
:EndFor
:EndFor
done←1 ⍝ Global flag to shut down
r←⍬ ⍝ Need a result
∇
∇ {r}←Init config;db;i;j;slave;path;ws;tn
⍝ Intialise the vecdb server
∘∘∘
:Trap 6 ⋄ source←SALT_Data.SourceFile
:Else ⋄ source←⎕WSID
:EndTrap
path←{(-⌊/(⌽⍵)⍳'\/')↓⍵}source
CONNS←TASKS←USERS←TOKENS←⍬
NEXTTASK←1000
LOGLEVEL←0
(DBs Server)←config.(DBs Server)
TOKEN←{⎕RL←0 ⋄ ⎕PP←10 ⋄ 2↓⍕?0}0
DBFolders←DBs.Folder
:For i :In ⍳≢DBs ⍝ Launch all the processes
db←i⊃DBs
tn←(db.Folder,'meta.vecdb') ⎕FSTIE 0
db.((_Columns _Types) (ShardFn ShardCols))←⎕FREAD tn (5 7)
⎕funtie tn
:For j :In ⍳≢db.Slaves
slave←j⊃db.Slaves
slave.Port←NEXTPORT
:Select slave.Launch.Type
:Case 'local'
slave.Address←'127.0.0.1' ⍝ /// for now
slave.UserId←¯1 ⍝ /// ditto
ws←path,'/vecdbboot.dws'
slave.Proc←slave.Shards Launch slave.Folder slave.Port RUNTIME ws
:Case 'ssh'
slave.Address←slave.Launch.Host
slave.UserId←¯1
ws←({(-⌊/(⌽⍵)⍳'\/')↓⍵}¯1↓slave.Folder),'/vecdbboot.dws'
slave.Proc←slave.Shards Launch slave.Folder slave.Port slave.Launch.(Host User KeyFile Cmd) ws
:Else
∘∘∘ ⍝ Unknown launch type
:EndSelect
NEXTPORT←NEXTPORT+1
:EndFor
:EndFor
:For i :In ⍳≢DBs ⍝ Now try to connect to them all
⍝ /// in future perhaps launch a thread for each one and just check status?
db←i⊃DBs
:For j :In ⍳≢db.Slaves
slave←j⊃db.Slaves
:If 0=⊃r←''#.vecdbclt.Connect slave.(Address Port UserId)
slave.Connection←2⊃r
#.vecdbclt.SrvDo slave.Connection('SetToken'TOKEN)
:Else
∘∘∘ ⍝ start up failed
:EndIf
:EndFor
:EndFor
r←⍬ ⍝ Need a result
∇
∇ (sdata is)←SlavePartition(cmd slaves data);ixs;cols;new;slave;slave_recs;cix
⍝ Return sdata: command to send to each slave
⍝ is: indices into slaves
Shards←∊slaves.Shards
ShardSlaves←(≢¨slaves.Shards)/⍳≢slaves.Shards
:Select cmd
:Case 'Read'
(ixs cols)←data
slave←ShardSlaves[Shards⍳ixs[;1]]
sdata←↓(⍪slave{⊂⍵}⌸ixs),⊂cols
is←⍳≢sdata
:Case 'Update'
(ixs cols new)←data
slave←ShardSlaves[Shards⍳ixs[;1]]
slave_recs←(≢¨ixs[;2])/slave
:If 1=≡,cols ⍝ Simple column name
sdata←↓(slave{⊂⍵}⌸ixs),(⊂cols),⍪slave_recs{⊂⍵}⌸new
:Else ⍝ Multiple columns
sdata←↓(slave{⊂⍵}⌸ixs),(⊂cols),⍪{slave_recs{⊂⍵}⌸⍵}¨new
∘∘∘
:EndIf
is←⍳≢sdata
:Case 'Append'
(cols new)←data
:If ∧/db.ShardCols∊cix←db._Columns⍳cols
slave←ShardSlaves⍳(⍎db.ShardFn) new[cix⍳db.ShardCols]
sdata←{slave{⍺,⊂⍵}⌸⍵}¨new
is←(⊃sdata)[;1] ⍝ slave indices
sdata←↓(⊂cols),⍪↓⊃,/0 1∘↓¨sdata ⍝ data pertaining to each slave
:Else
∘∘∘ ⍝ Unable to shard the data
:EndIf
:Else
sdata←,⊂data ⋄ is←(≢slaves)/1
:EndSelect
∇
∇ Process(obj data);r;CONNECTION;cmd;arg;close;txt;db;i;slave;rs;sdata;ixs;s;slaves;cmds
⍝ Process a call. data[1] contains function name, data[2] an argument
⍝ {}##.DRC.Progress obj(' Thread ',(⍕⎕TID),' started to run: ',,⍕data) ⍝ Send progress report
CONNECTION←obj
Conn←1↓⊃(obj='.')⊂obj
(cmd arg)←2↑data
close←0
:If (⊂cmd)∊'Open' 'SetUser' 'Shutdown' ⍝ Non-DB commands
:Trap 9999
r←0(⍎cmd,' obj arg')
:Else ⋄ r←⎕EN ⎕DM
:EndTrap
:ElseIf (⊂cmd)∊'Append' 'Count' 'Query' 'Update' 'Read'
:If (≢DBs)<i←DBFolders⍳arg[1]
r←999('Database not found: ',⊃arg)
:Else
slaves←(db←i⊃DBs).Slaves
(sdata ixs)←SlavePartition cmd slaves(2⊃arg)
cmds←(≢slaves)⍴⊂''
:For s :In ⍳≢slaves ⍝ Send all the commands
slave←s⊃slaves
(s⊃cmds)←#.vecdbclt.SrvSend slave.Connection(cmd((s⊃ixs)⊃sdata))
:EndFor
rs←⍬
:For s :In cmds ⍝ Receive all the results
rs,←⊂#.vecdbclt.SrvRcv s
:EndFor
r←0 rs
:EndIf
:Else
r←999('Unsupported command: ',cmd)
:EndIf
{}##.DRC.Respond obj r
:If close
⍝ /// {{}##.DRC.Close ⍵⊣⎕DL 1}&Conn ⍝ Start thread which waits 1s then closes
:EndIf
∇
∇ proc←{shards}Launch(target port runtime ws);path;runtime;args;slave;source
⍝ Launch a full vecdbsrv or, if shards is defined, a slave
:If 0=≢runtime ⋄ runtime←RUNTIME ⋄ :EndIf
:If slave←2=⎕NC'shards'
args←'VECDBSLAVE="',target,'" SHARDS="',(⍕shards),'" PORT=',(⍕port),' TOKEN="',TOKEN,'"'
:Else
args←'VECDBSRV="',target,'" PORT=',(⍕port)
:EndIf
proc←⎕NEW ##.APLProcess(ws args runtime)
∇
∇ Connect cmd;task;conn
⍝ Connection Arrived
conn←1↓⊃(cmd='.')⊂cmd
CONNS,←⊂conn
TASKS,←task←NEXTTASK
NEXTTASK←10000|NEXTTASK+1
USERS←USERS,0
0 Log'New connection ',conn,' assigned task id ',⍕task
∇
∇ Disconnect obj;m;i;held;task;conn
⍝ Connection Lost
conn←1↓⊃(obj='.')⊂obj
0 Log'Connection ',conn,' disconnected'
:If (⍴m)≥i←(m←~CONNS∊⊂conn)⍳0
task←i⊃TASKS
:If 0≠⍴held←(HELDBY=task)/RESOURCES
Release¨↓(⊂obj),[1.5]held ⍝ Release all held resources
:EndIf
QUEUES←{(⍵[;1]∊task)⌿⍵}¨QUEUES ⍝ Remove task from queues
CONNS←m/CONNS
TASKS←m/TASKS
USERS←m/USERS
:EndIf
∇
∇ level Log message
→(level<LOGLEVEL)⍴0
⎕←(,'ZI2,<:>,ZI2,<:>,ZI2,<.>,ZI3'⎕FMT 1 4⍴3↓⎕TS),' ',message
∇
∇ MockTest;assert;START;resources;nprocesses;nresources;nevents;i;conns;conn;z;s
assert←{'Assertion failed'⎕SIGNAL(⍵=0)/11}
InitLocks 0
LOGLEVEL←3 ⍝ Log everything
MOCK←1
Connect'C1'
assert(1 0)≡TASKS,USERS
SetUser'C1' 1234
assert(1 1234)≡TASKS,USERS
Connect'C2'
SetUser'C2' 4321
assert 0=Lock'C1' '/ALLOC10' ⍝ Granted
assert HELDBY≡,1 ⍝ Held by Task 1
Release'C1' '/ALLOC10' ⍝ Release
assert HELDBY≡,0 ⍝ Should now be free
assert 0=Lock'C1' '/ALLOC10' ⍝ Granted
assert HELDBY≡,1 ⍝ Held by Task 1
assert 1=Lock'C2' '/ALLOC10' ⍝ Queued
assert(2 'C2')≡2⍴⊃QUEUES ⍝ Task 2 is in the queue
Release'C1' '/ALLOC10'
assert HELDBY≡,2 ⍝ Should now be held by Task 2
assert 0=⊃⍴⊃QUEUES
Disconnect'C2'
assert 1=⍴TASKS
assert HELDBY≡,0 ⍝ Should now be free
Disconnect'C1'
assert 0=⍴TASKS
⍝ --- performance test ---
LOGLEVEL←3 ⍝ Erors only
nprocesses←10
nevents←1000×2×nprocesses
⎕←'Testing performance...'
Connect¨conns←'C'∘,¨⍕¨⍳nprocesses
SetUser¨↓conns,[1.5]⍳nprocesses
resources←nevents⍴('/BLAH/BLAH/ALLOC'∘,¨⍕¨⍳nprocesses),nprocesses⍴⊂'/BLAH/BLAH/ALLOC0'
START←3⊃⎕AI
:For i :In ⍳nprocesses+nevents
conn←(1+nprocesses|i-1)⊃conns
:If i≤nevents ⋄ z←Lock conn(i⊃resources) ⋄ :EndIf
:If i>nprocesses ⋄ z←Release conn((i-nprocesses)⊃resources) ⋄ :EndIf
:EndFor
s←0.001×(3⊃⎕AI)-START
⎕←(⍕nevents),' released & locked in',(1⍕s),'s (',(,' '~⍨,'CI12'⎕FMT nevents÷s),' locks/s)'
∇
∇ Notify(cmd Resource info);Conn;task
⍝ Notify connection that resource has been granted
LOCKSGRANTED+←1
:If LOGLEVEL=0
Conn←1↓⊃(cmd='.')⊂cmd
task←(CONNS⍳⊂Conn)⊃TASKS
0 Log'Lock for ',Resource,' granted to task ',task
:EndIf
:If ~MOCK
:If 0≠⊃r←#.DRC.Respond cmd(0(Resource info))
1 Log'Respond to ',cmd,' failed'
:EndIf
:EndIf
∇
∇ r←Run(name port);sink;data;event;obj;rc;wait;z;cmd
⍝ Run the Lock Server - based on CONGA RPCServer sample
:If 0=⎕NC'start' ⋄ start←1 ⋄ :EndIf
{}##.DRC.Init''
0 Log'Thread ',(⍕⎕TID),' is now handing server ''',name,'''.'
done←0 ⍝ done←1 in function "End"
:While ~done
rc obj event data←4↑wait←##.DRC.Wait name 3000 ⍝ Time out now and again
:Select rc
:Case 0
:Select event
:Case 'Error'
:If 1119≢data ⋄ 3 Log'Error ',(⍕data),' on ',obj ⋄ :EndIf
:If ~done∨←name≡obj ⍝ Error on the listener itself?
{}##.DRC.Close obj ⍝ Close connection in error
Disconnect obj ⍝ Let logic know
:EndIf
:Case 'Receive'
:If 2≠⍴data ⍝ Command is expected to be (function name)(argument)
{}##.DRC.Respond obj(99999 'Bad command format') ⋄ :Leave
:EndIf
Process obj data ⍝ NB Single-threaded
:Case 'Connect' ⍝ Ignored
Connect obj
:Else ⍝ Unexpected result?
∘
:EndSelect
:Case 100 ⍝ Time out - Insert code for housekeeping tasks here (deadlocks?)
:Case 1010 ⍝ Object Not Found
3 Log'Object ''',name,''' has been closed - RPC Server shutting down' ⋄ done←1
:Else
3 Log'Error in RPC.Wait: ',⍕wait
:EndSelect
:EndWhile
⎕DL 1 ⍝ Give responses time to complete
{}##.DRC.Close name
0 Log'Server ',name,' terminated.'
:If 2=⎕NC'#.AUTOSHUT'
:AndIf 0≠#.AUTOSHUT
⎕OFF
:EndIf
∇
∇ r←Open(cmd folder);i;Conn
⍝ Check whether a folder is serve-able
Conn←1↓⊃(cmd='.')⊂cmd
:If (⊂folder)∊DBFolders
r←0 'OK'
:Else
r←999('Database folder not found: ',folder)
:EndIf
∇
∇ task←SetUser(cmd User);i;Conn
⍝ Return task ID
Conn←1↓⊃(cmd='.')⊂cmd
:If (⍴CONNS)<i←CONNS⍳⊂Conn
3 Log'SetUser ',(⍕User),' for unknown connection ',Conn
:Else
0 Log'User set to ',(⍕User),' on connection ',Conn
(i⊃USERS)←User
:EndIf
task←i⊃TASKS
∇
∇ Test;assert;START;resources;nprocesses;nresources;nevents;i;conns;conn;z;s
⍝ This should be a stand-alone test of vecdbsrv
⍝ Assumes existence of #.TestVecdbSrv
assert←{'Assertion failed'⎕SIGNAL(⍵=0)/11}
#.TestVecdbSrv.CreateTestConfig folder,'config.json'
InitLocks 0
LOGLEVEL←3 ⍝ Log everything
MOCK←1 ⍝ Do not send CONGA messages
Connect'.C1'
assert(1 0)≡TASKS,USERS
{}SetUser'.C1' 1234
assert(1 1234)≡TASKS,USERS
Connect'.C2'
{}SetUser'.C2' 4321
assert 0=Lock'.C1' '/ALLOC10' ⍝ Granted
assert HELDBY≡,1 ⍝ Held by Task 1
Release'.C1' '/ALLOC10' ⍝ Release
assert HELDBY≡,0 ⍝ Should now be free
assert 0=Lock'.C1' '/ALLOC10' ⍝ Granted
assert HELDBY≡,1 ⍝ Held by Task 1
assert 1=Lock'.C2' '/ALLOC10' ⍝ Queued
assert(2 '.C2')≡2⍴⊃QUEUES ⍝ Task 2 is in the queue
Release'.C1' '/ALLOC10'
assert HELDBY≡,2 ⍝ Should now be held by Task 2
assert 0=⊃⍴⊃QUEUES
Disconnect'.C2'
assert 1=⍴TASKS
assert HELDBY≡,0 ⍝ Should now be free
Disconnect'.C1'
assert 0=⍴TASKS
⍝ --- performance test ---
LOGLEVEL←3 ⍝ Erors only
nprocesses←10
nevents←1000×2×nprocesses
⎕←'Testing performance...'
Connect¨conns←'.C'∘,¨⍕¨⍳nprocesses
{}SetUser¨↓conns,[1.5]⍳nprocesses
resources←nevents⍴('/BLAH/BLAH/ALLOC'∘,¨⍕¨⍳nprocesses),nprocesses⍴⊂'/BLAH/BLAH/ALLOC0'
START←3⊃⎕AI
:For i :In ⍳nprocesses+nevents
conn←(1+nprocesses|i-1)⊃conns
:If i≤nevents ⋄ z←Lock conn(i⊃resources) ⋄ :EndIf
:If i>nprocesses ⋄ z←Release conn((i-nprocesses)⊃resources) ⋄ :EndIf
:EndFor
s←0.001×(3⊃⎕AI)-START
⎕←(⍕nevents),' released & locked in',(1⍕s),'s (',(,' '~⍨,'CI12'⎕FMT nevents÷s),' locks/s)'
∇
assert←{'Assertion failed'⎕SIGNAL(⍵=0)/11}
:EndNamespace