Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Jaiswal committed Oct 14, 2024
1 parent 98f3f45 commit 393f32c
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 119 deletions.
47 changes: 33 additions & 14 deletions python-packages/core/src/omigo_core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, header_fields, data_fields):
utils.warn("Zero length header fields:" + str(self.header_fields))

# create hashmap
for i in range(self.num_rows()):
for i in range(len(self.header_fields)):
h = self.header_fields[i]

# validation
Expand All @@ -47,10 +47,20 @@ def __init__(self, header_fields, data_fields):
self.header_map[h] = i
self.header_index_map[i] = h

# workaround
if (len(self.header_fields) == 1):
if (len(data_fields) > 0):
if (isinstance(data_fields[0], (str))):
data_fields = list([[t] for t in data_fields])

# basic validation
if (len(data_fields) > 0 and len(data_fields[0]) != len(self.header_fields)):
raise Exception("Header length: {} is not matching with data length: {}".format(
len(self.header_fields), len(data_fields[0])))
if (len(self.data_fields) > 0):
if (len(self.header_fields) > 1):
if (len(data_fields[0]) != len(self.header_fields)):
raise Exception("Header length: {} is not matching with data length: {}".format(len(self.header_fields), len(data_fields[0])))
else:
if (len(data_fields[0]) != 1):
raise Exception("Header length: {} is not matching with data length: {}".format(len(self.header_fields), len(data_fields[0])))

# debugging
def to_string(self):
Expand Down Expand Up @@ -1477,7 +1487,7 @@ def to_maps(self, resolve_url_encoded_cols = False, dmsg = ""):
mp = {}

# iterate over all header columns
for i in range(self.num_rows()):
for i in range(len(self.header_fields)):
key = self.header_fields[i]
value = str(fields[i])

Expand Down Expand Up @@ -1538,7 +1548,7 @@ def add_seq_num(self, new_col, start = 1, dmsg = ""):
for fields in self.data_fields:
counter = counter + 1
utils.report_progress("add_seq_num: [1/1] adding new column", dmsg, counter, self.num_rows())
new_data.append([str(counter)] + fields)
new_data_fields.append([str(counter)] + fields)

# return
return DataFrame(new_header_fields, new_data_fields)
Expand Down Expand Up @@ -2007,7 +2017,7 @@ def to_json_records(self, new_col = "json"):
# iterate
for fields in self.data_fields:
mp = {}
for i in range(self.num_rows()):
for i in range(len(self.header_fields)):
mp[self.header_fields[i]] = fields[i]
new_data_fields.append([json.dumps(mp)])

Expand Down Expand Up @@ -2065,7 +2075,7 @@ def resolve_all_url_encoded_cols(self, dmsg = ""):

def union(self, tsv_or_that_arr):
# check if this is a single element TSV or an array
if (type(tsv_or_that_arr) == TSV):
if (type(tsv_or_that_arr) == DataFrame):
that_arr = [tsv_or_that_arr]
else:
that_arr = tsv_or_that_arr
Expand Down Expand Up @@ -2103,6 +2113,7 @@ def union(self, tsv_or_that_arr):
raise Exception("Invalid input data. Fields size are not same as header: header: {}, fields: {}".format(self.header_fields, fields))
new_data_fields.append(fields)

# return
return DataFrame(self.header_fields, new_data_fields)

# this method finds the set difference between this and that. if cols is None, then all columns are taken
Expand Down Expand Up @@ -3239,7 +3250,7 @@ def __join__(self, that, lkeys, rkeys = None, join_type = "inner", lsuffix = Non
new_header_copy_fields_map[lkeys[rkey_index]] = rkey

# add the left side columns
for i in range(self.num_rows()):
for i in range(len(self.header_fields)):
if (self.header_fields[i] not in lkeys):
if (lsuffix is not None):
new_header_fields.append(self.header_fields[i] + ":" + lsuffix)
Expand Down Expand Up @@ -3555,7 +3566,7 @@ def __map_join__(self, that, lkeys, rkeys = None, join_type = "inner", lsuffix =
new_header_copy_fields_map[lkeys[rkey_index]] = rkey

# add the left side columns
for i in range(self.num_rows()):
for i in range(len(self.header_fields)):
if (self.header_fields[i] not in lkeys):
if (lsuffix is not None):
new_header_fields.append(self.header_fields[i] + ":" + lsuffix)
Expand Down Expand Up @@ -3696,7 +3707,7 @@ def __split_batches_randomly__(self, num_batches, preserve_order = False, seed =
batch_index = (i + seed) % effective_batches

# append to the splits data
data_list[batch_index].append(self.data[i])
data_list[batch_index].append(self.data_fields[i])

# create list of xtsvs
for i in range(effective_batches):
Expand Down Expand Up @@ -3923,7 +3934,7 @@ def explode(self, cols, exp_func, prefix, default_val = None, collapse = True, i
# create header
new_header_fields = []
if (collapse == True):
for j in range(self.num_rows()):
for j in range(len(self.header_fields)):
if (j not in indexes):
new_header_fields.append(self.header_fields[j])
else:
Expand Down Expand Up @@ -5175,7 +5186,7 @@ def from_maps(mps, accepted_cols = None, excluded_cols = None, url_encoded_cols
for mp in mps:
header_fields = ["json"]
fields = [utils.url_encode(json.dumps(mp))]
xtsvs.append(DataFrame(header_fields, fields))
xtsvs.append(DataFrame(header_fields, [fields]))

# use explode
result = merge_union(xtsvs) \
Expand All @@ -5188,7 +5199,15 @@ def from_maps(mps, accepted_cols = None, excluded_cols = None, url_encoded_cols

def from_tsv(xtsv):
header_fields = xtsv.get_header_fields()
data_fields = list([t.split("\t") for t in xtsv.get_data()])

# special condition where only single column is present
data_fields = None
if (len(header_fields) == 1):
data_fields = list([[t] for t in xtsv.get_data()])
else:
data_fields = list([list(t.split("\t")) for t in xtsv.get_data()])

# return
return DataFrame(header_fields, data_fields)

def enable_debug_mode():
Expand Down
6 changes: 3 additions & 3 deletions python-packages/core/src/omigo_core/tsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ def rename(self, *args, **kwargs):
return dataframe.from_tsv(self.header, self.data).rename(*args, **kwargs)

def get_header(self, *args, **kwargs):
return dataframe.from_tsv(self.header, self.data).get_header(*args, **kwargs)
return self.header

def get_data(self, *args, **kwargs):
return dataframe.from_tsv(self.header, self.data).get_data(*args, **kwargs)
return self.data

def get_header_map(self, *args, **kwargs):
return dataframe.from_tsv(self.header, self.data).get_header_map(*args, **kwargs)
raise Exception("get_header_map: not supported")

def num_rows(self, *args, **kwargs):
return dataframe.from_tsv(self.header, self.data).num_rows(*args, **kwargs)
Expand Down
8 changes: 6 additions & 2 deletions python-packages/core/src/omigo_core/tsvutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ def merge(tsv_list, def_val_map = None):
tsv_list[i].show_transpose(1, title = "merge: big tsv")

# check for valid headers
header = tsv_list[0].get_header()
# header = tsv_list[0].get_header()
header_fields = tsv_list[0].get_header_fields()

# iterate to check mismatch in header
index = 0
for t in tsv_list:
# Use a different method for merging if the header is different
if (header != t.get_header()):
if (header_fields != t.get_header_fields()):
header_diffs = get_diffs_in_headers(tsv_list)

# display warning according to kind of differences found
Expand All @@ -79,6 +79,7 @@ def merge(tsv_list, def_val_map = None):

def split_headers_in_common_and_diff(tsv_list):
common = {}

# get the counts for each header field
for t in tsv_list:
for h in t.get_header_fields():
Expand Down Expand Up @@ -181,12 +182,15 @@ def merge_intersect(tsv_list, def_val_map = None):
for t in tsv_list:
new_tsvs.append(t.select(same_cols))

# return
return new_tsvs[0].union(new_tsvs[1:])
else:
# probably landed here because of mismatch in headers position
tsv_list2 = []
for t in tsv_list:
tsv_list2.append(t.select(same_cols))

# return
return merge(tsv_list2)

def read(input_file_or_files, sep = None, def_val_map = None, username = None, password = None, num_par = 0, s3_region = None, aws_profile = None):
Expand Down
Loading

0 comments on commit 393f32c

Please sign in to comment.