Skip to content

Commit

Permalink
optimized dynamic column tagging for massive scale
Browse files Browse the repository at this point in the history
  • Loading branch information
shirleycohen committed Mar 31, 2024
1 parent eba858f commit a1629a9
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 103 deletions.
95 changes: 93 additions & 2 deletions BigQueryUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,65 @@ def copy_tag(self, tag_creator_account, tag_invoker_account, job_uuid, table_nam
return success


# API method used by tag history function
def copy_tags(self, tag_creator_account, tag_invoker_account, job_uuid, table_name, table_fields, asset_table, column_fields_list):

print('enter BigQueryUtils.copy_tags')
print('asset_table:', asset_table)
print('column_fields_list:', column_fields_list)

rows_to_insert = []
success = True

exists, table_id, settings = self.history_table_exists(table_name)

if exists != True:
success, dataset_id = self.create_dataset(settings['bigquery_project'], settings['bigquery_dataset'])
#print('created_dataset:', success)

if success:
table_id = self.create_history_table(dataset_id, table_name, table_fields)
else:
print('Error creating tag_history dataset')

for column_fields in column_fields_list:

column = column_fields['column']
fields = column_fields['fields']

if column and column != "" and "/column/" not in asset_table:
asset_name = ("{}/column/{}".format(asset_table, column))
else:
success = False
print("Error: could not find the tagged column in column_fields_list, therefore skipping tag history.")
return success

asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/")
#print('asset_name: ', asset_name)

row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name,
'tag_creator_account': tag_creator_account, 'tag_invoker_account': tag_invoker_account, 'job_uuid': job_uuid}

for field in fields:
field_id = field['field_id']
field_value = field['field_value']

if isinstance(field_value, decimal.Decimal):
row[field_id] = float(field_value)
elif isinstance(field_value, datetime.datetime) or isinstance(field_value, datetime.date):
row[field_id] = field_value.isoformat()
else:
row[field_id]= json.dumps(field_value, default=str)

#print('rows_to_insert:', row)
rows_to_insert.append(row)


success = self.load_history_rows(tag_creator_account, tag_invoker_account, table_id, rows_to_insert, job_uuid)

return success


# API method used by job metadata function
def write_job_metadata(self, job_uuid, table_name, metadata):

Expand Down Expand Up @@ -200,7 +259,6 @@ def report_table_create(self, project, dataset, table, table_type):

return created


# used by tag export function
def get_report_dataset_schema(self):

Expand Down Expand Up @@ -386,6 +444,39 @@ def insert_history_row(self, tag_creator_account, tag_invoker_account, job_uuid,
return success


# writes tag history records
def load_history_rows(self, tag_creator_account, tag_invoker_account, table_id, rows_to_insert, job_uuid):

print('enter load_history_rows')
success = True

try:
table = self.client.get_table(table_id)
#print("table schema: {}".format(table.schema))

job_config = bigquery.LoadJobConfig(schema=table.schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, write_disposition='WRITE_APPEND')
load_job = self.client.load_table_from_json(rows_to_insert, destination=table_id, job_config=job_config)
load_job.result()

if load_job.errors:
print('Errors while writing to tag history table:', load_job.errors)

except Exception as e:
print('Error while writing to tag history table:', e)
if '404' in str(e):
# table isn't open for writes (it may have been just created)
print('Tag history table not ready to be written to. Sleeping for 5 seconds.')
time.sleep(5)
try:
load_job = self.client.load_table_from_json(rows_to_insert, table_ref, job_config=job_config)
load_job.result()
destination_table = self.client.get_table(table_id)
except Exception as e:
print('Error occurred while writing to tag history table: {}'.format(e))
success = False

return success

# used by job metadata function
def job_metadata_table_exists(self, table_name):

Expand Down Expand Up @@ -435,7 +526,7 @@ def create_job_metadata_table(self, dataset_id, table_name):
def insert_job_metadata_row(self, table_id, job_uuid, metadata):

print('enter insert_job_metadata_row')
print('job_uuid:', job_uuid)
#print('job_uuid:', job_uuid)

success = True

Expand Down
Loading

0 comments on commit a1629a9

Please sign in to comment.