forked from mitdbg/aurum-datadiscovery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ddapi.py
619 lines (554 loc) · 23.4 KB
/
ddapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
from modelstore.elasticstore import StoreHandler
from modelstore.elasticstore import KWType
from api.apiutils import Operation
from api.apiutils import OP
from api.apiutils import Relation
from api.apiutils import DRS
from api.apiutils import DRSMode
from api.apiutils import Hit
from api.apiutils import compute_field_id as id_from
store_client = None
class DDAPI:
__network = None
def __init__(self, network):
self.__network = network
"""
Seed API
"""
def drs_from_raw_field(self, field: (str, str, str)) -> DRS:
"""
Given a field and source name, it returns a DRS with its representation
:param field: a tuple with the name of the field, (db_name, source_name, field_name)
:return: a DRS with the source-field internal representation
"""
db, source, field = field
nid = id_from(db, source, field)
h = Hit(nid, db, source, field, 0)
return self.drs_from_hit(h)
def drs_from_hit(self, hit: Hit) -> DRS:
drs = DRS([hit], Operation(OP.ORIGIN))
return drs
def drs_from_hits(self, hits: [Hit]) -> DRS:
drs = DRS(hits, Operation(OP.ORIGIN))
return drs
def drs_from_table(self, source: str) -> DRS:
"""
Given a source, it retrieves all fields of the source and returns them
in the internal representation
:param source: string with the name of the table
:return: a DRS with the source-field internal representation
"""
hits = self.__network.get_hits_from_table(source)
drs = DRS([x for x in hits], Operation(OP.ORIGIN))
return drs
def drs_from_table_hit(self, hit: Hit) -> DRS:
table = hit.source_name
hits = self.__network.get_hits_from_table(table)
drs = DRS([x for x in hits], Operation(OP.TABLE, params=[hit]))
return drs
def drs_expand_to_table(self, drs: DRS) -> DRS:
o_drs = DRS([], Operation(OP.NONE))
for h in drs:
table = h.source_name
hits = self.__network.get_hits_from_table(table)
drs = DRS([x for x in hits], Operation(OP.TABLE, params=[h]))
o_drs.absorb(drs)
return o_drs
def reverse_lookup(self, nid) -> [str]:
info = self.__network.get_info_for([nid])
return info
"""
View API
"""
def fields(self, drs: DRS) -> DRS:
"""
Given a DRS, it configures it to field view (default)
:param drs: the DRS to configure
:return: the same DRS in the fields mode
"""
drs.set_fields_mode()
return drs
def table(self, drs: DRS) -> DRS:
"""
Given a DRS, it configures it to the table view
:param drs: the DRS to configure
:return: the same DRS in the table mode
"""
drs.set_table_mode()
return drs
"""
Primitive API
"""
def keyword_search(self, kw: str, max_results=10) -> DRS:
"""
Performs a keyword search over the content of the data
:param kw: the keyword to search
:param max_results: the maximum number of results to return
:return: returns a DRS
"""
hits = store_client.search_keywords(kw, KWType.KW_CONTENT, max_results)
drs = DRS([x for x in hits], Operation(OP.KW_LOOKUP, params=[kw])) # materialize generator
return drs
def keywords_search(self, kws: [str], max_results=10) -> DRS:
"""
Given a collection of keywords, it returns the matches in the internal representation
:param kws: collection (iterable) of keywords (strings)
:return: the matches in the internal representation
"""
o_drs = DRS([], Operation(OP.NONE))
for kw in kws:
res_drs = self.keyword_search(kw, max_results=max_results)
o_drs = o_drs.absorb(res_drs)
return o_drs
def schema_name_search(self, kw: str, max_results=10) -> DRS:
"""
Performs a keyword search over the attribute/field names of the data
:param kw: the keyword to search
:param max_results: the maximum number of results to return
:return: returns a DRS
"""
hits = store_client.search_keywords(kw, KWType.KW_SCHEMA, max_results)
drs = DRS([x for x in hits], Operation(
OP.SCHNAME_LOOKUP, params=[kw])) # materialize generator
return drs
def schema_names_search(self, kws: [str], max_results=10) -> DRS:
"""
Given a collection of schema names, it returns the matches in the internal representation
:param kws: collection (iterable) of keywords (strings)
:return: a DRS
"""
o_drs = DRS([], Operation(OP.NONE))
for kw in kws:
res_drs = self.schema_name_search(kw, max_results=max_results)
o_drs = o_drs.absorb(res_drs)
return o_drs
def table_name_search(self, kw: str, max_results=10) -> DRS:
"""
Performs a keyword search over the names of the tables
:param kw: the keyword to search
:param max_results: the maximum number of results to return
:return: returns a DRS
"""
hits = store_client.search_keywords(kw, KWType.KW_TABLE, max_results)
drs = DRS([x for x in hits], Operation(OP.KW_LOOKUP, params=[kw])) # materialize generator
return drs
def table_names_search(self, kws: [str], max_results=10) -> DRS:
"""
Given a collection of schema names, it returns the matches in the internal representation
:param kws: collection (iterable) of keywords (strings)
:return: a DRS
"""
o_drs = DRS([], Operation(OP.NONE))
for kw in kws:
res_drs = self.table_name_search(kw, max_results=max_results)
o_drs = o_drs.absorb(res_drs)
return o_drs
def entity_search(self, kw: str, max_results=10) -> DRS:
"""
Performs a keyword search over the entities represented by the data
:param kw: the keyword to search
:param max_results: the maximum number of results to return
:return: returns a list of Hit elements of the form (id, source_name, field_name, score)
"""
hits = store_client.search_keywords(
kw, KWType.KW_ENTITIES, max_results)
drs = DRS([x for x in hits], Operation(
OP.ENTITY_LOOKUP, params=[kw])) # materialize generator
return drs
def schema_neighbors(self, field: (str, str, str)) -> DRS:
"""
Returns all the other attributes/fields that appear in the same relation than the provided field
:param field: the provided field
:return: returns a list of Hit elements of the form (id, source_name, field_name, score)
"""
db_name, source_name, field_name = field
hits = self.__network.get_hits_from_table(source_name)
origin_hit = Hit(id_from(db_name, source_name, field_name), db_name, source_name, field_name, 0)
o_drs = DRS([x for x in hits], Operation(OP.TABLE, params=[origin_hit]))
return o_drs
def schema_neighbors_of(self, i_drs: DRS) -> DRS:
o_drs = DRS([], Operation(OP.NONE))
o_drs = o_drs.absorb_provenance(i_drs)
if i_drs.mode == DRSMode.TABLE:
i_drs.set_fields_mode()
for h in i_drs:
fields_table = self.drs_from_table_hit(h)
i_drs = i_drs.absorb(fields_table)
for h in i_drs:
hits = self.__network.get_hits_from_table(h.source_name)
hits_drs = DRS([x for x in hits], Operation(OP.TABLE, params=[h]))
o_drs = o_drs.absorb(hits_drs)
return o_drs
def similar_schema_name_to_field(self, field: (str, str, str)) -> DRS:
"""
Returns all the attributes/fields with schema names similar to the provided field
:param field: the provided field
:return: returns a list of Hit elements of the form (id, source_name, field_name, score)
"""
field_drs = self.drs_from_raw_field(field)
hits_drs = self.similar_schema_name_to(field_drs)
return hits_drs
def similar_schema_name_to_table(self, table: str) -> DRS:
"""
Returns all the attributes/fields with schema names similar to the fields of the given table
:param table: the given table
:return: DRS
"""
fields = self.drs_from_table(table)
hits_drs = self.similar_schema_name_to(fields)
return hits_drs
def similar_schema_name_to(self, i_drs: DRS) -> DRS:
"""
Given a DRS it returns another DRS that contains all fields similar to the fields of the input
:param i_drs: the input DRS
:return: DRS
"""
o_drs = DRS([], Operation(OP.NONE))
o_drs = o_drs.absorb_provenance(i_drs)
if i_drs.mode == DRSMode.TABLE:
i_drs.set_fields_mode()
for h in i_drs:
fields_table = self.drs_from_table_hit(h)
i_drs = i_drs.absorb(fields_table)
for h in i_drs:
hits_drs = self.__network.neighbors_id(h, Relation.SCHEMA_SIM)
o_drs = o_drs.absorb(hits_drs)
return o_drs
def similar_content_to_field(self, field: (str, str, str)) -> DRS:
"""
Returns all the attributes/fields with content similar to the provided field
:param field: the provided field
:return: returns a list of Hit elements of the form (id, source_name, field_name, score)
"""
field_drs = self.drs_from_raw_field(field)
hits_drs = self.similar_content_to(field_drs)
return hits_drs
def similar_content_to_table(self, table: str) -> DRS:
fields = self.drs_from_table(table)
hits_drs = self.similar_content_to(fields)
return hits_drs
def similar_content_to(self, i_drs: DRS) -> DRS:
"""
Given a DRS it returns another DRS that contains all fields similar to the fields of the input
:param i_drs: the input DRS
:return: DRS
"""
o_drs = DRS([], Operation(OP.NONE))
o_drs = o_drs.absorb_provenance(i_drs)
if i_drs.mode == DRSMode.TABLE:
i_drs.set_fields_mode()
for h in i_drs:
fields_table = self.drs_from_table_hit(h)
i_drs = i_drs.absorb(fields_table)
for h in i_drs:
hits_drs = self.__network.neighbors_id(h, Relation.CONTENT_SIM)
o_drs = o_drs.absorb(hits_drs)
return o_drs
def inclusion_dependency_to(self, i_drs: DRS) -> DRS:
"""
Given a DRS it returns another DRS that contains all fields similar to the fields of the input
:param i_drs: the input DRS
:return: DRS
"""
o_drs = DRS([], Operation(OP.NONE))
o_drs = o_drs.absorb_provenance(i_drs)
if i_drs.mode == DRSMode.TABLE:
i_drs.set_fields_mode()
for h in i_drs:
fields_table = self.drs_from_table_hit(h)
i_drs = i_drs.absorb(fields_table)
for h in i_drs:
hits_drs = self.__network.neighbors_id(h, Relation.INCLUSION_DEPENDENCY)
o_drs = o_drs.absorb(hits_drs)
return o_drs
def pkfk_field(self, field: (str, str, str)) -> DRS:
"""
Returns all the attributes/fields that are primary-key or foreign-key candidates with respect to the
provided field
:param field: the providef field
:return: returns a list of Hit elements of the form (id, source_name, field_name, score)
"""
field_drs = self.drs_from_raw_field(field)
hits_drs = self.pkfk_of(field_drs)
return hits_drs
def pkfk_table(self, table: str) -> DRS:
fields = self.drs_from_table(table)
hits_drs = self.pkfk_of(fields)
return hits_drs
def pkfk_of(self, i_drs: DRS) -> DRS:
"""
Given a DRS it returns another DRS that contains all fields similar to the fields of the input
:param i_drs: the input DRS
:return: DRS
"""
# alternative provenance propagation
o_drs = DRS([], Operation(OP.NONE))
o_drs = o_drs.absorb_provenance(i_drs)
if i_drs.mode == DRSMode.TABLE:
i_drs.set_fields_mode()
for h in i_drs:
fields_table = self.drs_from_table_hit(h)
i_drs = i_drs.absorb(fields_table)
# o_drs.extend_provenance(fields_drs)
for h in i_drs:
hits_drs = self.__network.neighbors_id(h, Relation.PKFK)
o_drs = o_drs.absorb(hits_drs)
# o_drs.extend_provenance(i_drs)
return o_drs
"""
Combiner API
"""
def intersection(self, a: DRS, b: DRS) -> DRS:
"""
Returns elements that are both in a and b
:param a: an iterable object
:param b: another iterable object
:return: the intersection of the two provided iterable objects
"""
assert a.mode == b.mode, "Input parameters are not in the same mode (fields, table)"
o_drs = a.intersection(b)
return o_drs
def union(self, a: DRS, b: DRS) -> DRS:
"""
Returns elements that are in either a or b
:param a: an iterable object
:param b: another iterable object
:return: the union of the two provided iterable objects
"""
assert a.mode == b.mode, "Input parameters are not in the same mode (fields, table)"
o_drs = a.union(b)
return o_drs
def difference(self, a: DRS, b: DRS) -> DRS:
"""
Returns elements that are in either a or b
:param a: an iterable object
:param b: another iterable object
:return: the union of the two provided iterable objects
"""
assert a.mode == b.mode, "Input parameters are not in the same mode (fields, table)"
o_drs = a.set_difference(b)
return o_drs
"""
TC Primitive API
"""
def paths_between(self, a: DRS, b: DRS, primitives, max_hops=2) -> DRS:
"""
Is there a transitive relationship between any element in a with any element in b?
This functions finds the answer constrained on the primitive (singular for now) that is passed
as a parameter.
:param a:
:param b:
:param primitives:
:return:
"""
assert(a.mode == b.mode)
o_drs = DRS([], Operation(OP.NONE))
o_drs.absorb_provenance(a)
o_drs.absorb_provenance(b)
if a.mode == DRSMode.FIELDS:
for h1 in a: # h1 is a Hit
for h2 in b: # h2 is a Hit
if h1 == h2:
return o_drs # same source and target field
res_drs = self.__network.find_path_hit(h1, h2, primitives, max_hops=max_hops)
o_drs = o_drs.absorb(res_drs)
elif a.mode == DRSMode.TABLE:
for h1 in a: # h1 is a table: str
for h2 in b: # h2 is a table: str
if h1 == h2:
return o_drs # same source ant target table
res_drs = self.__network.find_path_table(
h1, h2, primitives, self, max_hops=max_hops)
o_drs = o_drs.absorb(res_drs)
return o_drs
def paths(self, a: DRS, primitives) -> DRS:
"""
Is there any transitive relationship between any two elements in a?
This function finds the answer constrained on the primitive (singular for now) passed as parameter
:param a:
:param primitives:
:return:
"""
o_drs = DRS([], Operation(OP.NONE))
o_drs = o_drs.absorb_provenance(a)
if a.mode == DRSMode.FIELDS:
for h1 in a: # h1 is a Hit
for h2 in a: # h2 is a Hit
if h1 == h2:
continue
res_drs = self.__network.find_path_hit(h1, h2, primitives)
o_drs = o_drs.absorb(res_drs)
elif a.mode == DRSMode.TABLE:
for h1 in a: # h1 is a table: str
for h2 in a: # h2 is a table: str
res_drs = self.__network.find_path_table(
h1, h2, primitives, self)
o_drs = o_drs.absorb(res_drs)
return o_drs
def traverse(self, a: DRS, primitives, max_hops) -> DRS:
o_drs = DRS([], Operation(OP.NONE))
if a.mode == DRSMode.TABLE:
print("ERROR: input mode TABLE not supported")
return []
fringe = [x for x in a]
o_drs.absorb_provenance(a)
while max_hops > 0:
max_hops = max_hops - 1
for h in fringe:
hits_drs = self.__network.neighbors_id(h, primitives)
o_drs = self.union(o_drs, hits_drs)
fringe = [x for x in o_drs] # grow the initial input
return o_drs
"""
Convenience functions
"""
def serialize_sources_drs(drs: DRS, basedir: str, output_json: str):
"""
Given a DRS as input, it produces a JSON file with the contents of the
DRS
"""
sources_str = ",".join([str(sn) for (nid, sn, fn, s) in drs])[:-1]
json_dict = dict()
json_dict["CSV"] = dict()
json_dict["CSV"]["dir"] = basedir
json_dict["CSV"]["table"] = sources_str
json_obj = json.dumps(json_dict)
with open(output_json, 'w') as f:
f.write(json_obj)
def output_raw(self, result_set):
"""
Given an iterable object it prints the raw elements
:param result_set: an iterable object
"""
for r in result_set:
print(str(r))
def output(self, result_set):
"""
Given an iterable object of elements of the form (nid, source_name, field_name, score) it prints
the source and field names for every element in the iterable
:param result_set: an iterable object
"""
for r in result_set:
(nid, sn, fn, s) = r
print("source: " + str(sn) + "\t\t\t\t\t field: " + fn)
def help(self):
"""
Prints general help information, or specific usage information of a function if provided
:param function: an optional function
"""
from IPython.display import Markdown, display
def print_md(string):
display(Markdown(string))
# Check whether the request is for some specific function
#if function is not None:
# print_md(self.function.__doc__)
# If not then offer the general help menu
#else:
print_md("### Help Menu")
print_md("You can use the system through an **API** object. API objects are returned"
"by the *init_system* function, so you can get one by doing:")
print_md("***your_api_object = init_system('path_to_stored_model')***")
print_md("Once you have access to an API object there are a few concepts that are useful "
"to use the API. **content** refers to actual values of a given field. For "
"example, if you have a table with an attribute called __Name__ and values *Olu, Mike, Sam*, content "
"refers to the actual values, e.g. Mike, Sam, Olu.")
print_md("**schema** refers to the name of a given field. In the previous example, schema refers to the word"
"__Name__ as that's how the field is called.")
print_md("Finally, **entity** refers to the *semantic type* of the content. This is in experimental state. For "
"the previous example it would return *'person'* as that's what those names refer to.")
print_md("Certain functions require a *field* as input. In general a field is specified by the source name ("
"e.g. table name) and the field name (e.g. attribute name). For example, if we are interested in "
"finding content similar to the one of the attribute *year* in the table *Employee* we can provide "
"the field in the following way:")
print(
"field = ('Employee', 'year') # field = [<source_name>, <field_name>)")
class ResultFormatter:
@staticmethod
def format_output_for_webclient(raw_output, consider_col_sel):
"""
Format raw output into something client understands,
mostly, enrich the data with schema and samples
"""
def get_repr_columns(source_name, columns, consider_col_sel):
def set_selected(c):
if consider_col_sel:
if c in columns:
return 'Y'
return 'N'
# Get all fields in source_name
all_fields = store_client.get_all_fields_of_source(source_name)
colsrepr = []
for (nid, sn, fn) in all_fields:
colrepr = {
'colname': fn,
# ['fake1', 'fake2'], p.peek((fname, c), 15),
'samples': store_client.peek_values((sn, fn), 15),
'selected': set_selected(fn)
}
colsrepr.append(colrepr)
return colsrepr
entries = []
# Group results into a dict with file -> [column]
group_by_file = dict()
for (fname, cname) in raw_output:
if fname not in group_by_file:
group_by_file[fname] = []
group_by_file[fname].append(cname)
# Create entry per filename
for fname, columns in group_by_file.items():
entry = {'filename': fname,
'schema': get_repr_columns(
fname,
columns,
consider_col_sel)
}
entries.append(entry)
return entries
@staticmethod
def format_output_for_webclient_ss(raw_output, consider_col_sel):
"""
Format raw output into something client understands.
The output in this case is the result of a table search.
"""
def get_repr_columns(source_name, columns, consider_col_sel):
def set_selected(c):
if consider_col_sel:
if c in columns:
return 'Y'
return 'N'
# Get all fields of source_name
all_fields = store_client.get_all_fields_of_source(source_name)
all_cols = [fn for (nid, sn, fn) in all_fields]
for myc in columns:
all_cols.append(myc)
colsrepr = []
for c in all_cols:
colrepr = {
'colname': c,
'samples': store_client.peek_values((source_name, c), 15),
'selected': set_selected(c)
}
colsrepr.append(colrepr)
return colsrepr
entries = []
# Create entry per filename
# for fname, columns in group_by_file.items():
for fname, column_scores in raw_output:
columns = [c for (c, _) in column_scores]
entry = {'filename': fname,
'schema': get_repr_columns(
fname,
columns,
consider_col_sel)
}
entries.append(entry)
return entries
class API(DDAPI):
def __init__(self, *args, **kwargs):
super(API, self).__init__(*args, **kwargs)
def init_store(self):
# create store handler
global store_client
store_client = StoreHandler()
if __name__ == '__main__':
print("Aurum API")