Skip to content

Commit

Permalink
Merge branch 'cloud-run' into feature/add-pytest
Browse files Browse the repository at this point in the history
  • Loading branch information
eyalbenivri authored Jul 29, 2024
2 parents 381f9ae + 273d0ea commit 5840516
Show file tree
Hide file tree
Showing 48 changed files with 520 additions and 548 deletions.
14 changes: 5 additions & 9 deletions BigQueryUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def copy_tag(self, tag_creator_account, tag_invoker_account, job_uuid, table_nam
asset_name = tagged_table

asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/")
print('asset_name: ', asset_name)
#print('asset_name: ', asset_name)

success = self.insert_history_row(tag_creator_account, tag_invoker_account, job_uuid, table_id, asset_name, tagged_values)

Expand Down Expand Up @@ -390,12 +390,6 @@ def create_history_table(self, dataset_id, table_name, fields):
# writes tag history record
def insert_history_row(self, tag_creator_account, tag_invoker_account, job_uuid, table_id, asset_name, tagged_values):

print('enter insert_history_row')
print('job_uuid:', job_uuid)
print('table_id:', table_id)
print('asset_name:', asset_name)
print('tagged_values:', tagged_values)

success = True

row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name,
Expand All @@ -419,14 +413,16 @@ def insert_history_row(self, tag_creator_account, tag_invoker_account, job_uuid,
row[tagged_value['field_id']]= json.dumps(tagged_value['field_value'], default=str)
row[tagged_value['field_id']]= tagged_value['field_value']

print('insert row: ' + str(row))
#print('insert row: ' + str(row))
row_to_insert = [row,]

try:
status = self.client.insert_rows_json(table_id, row_to_insert)

if len(status) > 0:
print('Inserted row into tag history table. Return status: ', status)
print('Inserted row into tag history table. Return status: ', status)
else:
print('Inserted row into tag history table.')

except Exception as e:
print('Error while writing to tag history table:', e)
Expand Down
128 changes: 79 additions & 49 deletions DataCatalogController.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@

BIGQUERY_REGION = config['DEFAULT']['BIGQUERY_REGION']

# this variable is needed for tagging filesets
if 'CLOUD_STORAGE_REGION' in config['DEFAULT']:
CLOUD_STORAGE_REGION = config['DEFAULT']['CLOUD_STORAGE_REGION']

USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v2'

class DataCatalogController:
Expand Down Expand Up @@ -158,7 +154,7 @@ def get_template(self, included_fields=None):

def check_if_tag_exists(self, parent, column=None):

print('enter check_if_tag_exists')
print(f'enter check_if_tag_exists, parent: {parent}')

tag_exists = False
tag_id = ""
Expand Down Expand Up @@ -375,20 +371,20 @@ def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, conf

target_columns = [] # columns in the table which need to be tagged
columns_query = self.parse_query_expression(uri, columns_query)
#print('columns_query:', columns_query)
print('columns_query:', columns_query)

rows = self.bq_client.query(columns_query).result()

num_columns = 0
for row in rows:
for column in row:
#print('column:', column)
print('column:', column)
target_columns.append(column)
num_columns += 1

if num_columns == 0:
# no columns to tag
msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {column_query}"
msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}"
log_error(msg, None, job_uuid)
op_status = constants.ERROR
return op_status
Expand Down Expand Up @@ -1216,8 +1212,10 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe
return export_status


def apply_import_config(self, job_uuid, config_uuid, tag_dict, tag_history, overwrite=False):
def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False):

print(f'apply_import_config: {job_uuid}, {config_uuid}, {data_asset_type}, {data_asset_region}, {tag_dict}, {tag_history}')

op_status = constants.SUCCESS

if 'project' in tag_dict:
Expand All @@ -1228,48 +1226,71 @@ def apply_import_config(self, job_uuid, config_uuid, tag_dict, tag_history, over
op_status = constants.ERROR
return op_status

if ('dataset' not in tag_dict):
if ('entry_group' not in tag_dict or 'fileset' not in tag_dict):
msg = "Error: could not find required fields in CSV. Expecting either dataset or entry_group in CSV"
if data_asset_type == constants.BQ_ASSET:
if 'dataset' not in tag_dict:
msg = "Error: could not find the required dataset field in the CSV"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
entry_type = constants.DATASET
dataset = tag_dict['dataset']

if 'table' in tag_dict:
table = tag_dict['table']
entry_type = constants.BQ_TABLE

if 'dataset' in tag_dict:
dataset = tag_dict['dataset']
entry_type = constants.DATASET

if 'table' in tag_dict:
table = tag_dict['table']
entry_type = constants.TABLE

if 'entry_group' in tag_dict:
entry_group = tag_dict['entry_group']
entry_type = constants.FILESET

if 'fileset' in tag_dict:
fileset = tag_dict['fileset']
if data_asset_type == constants.FILESET_ASSET:
if 'entry_group' not in tag_dict or 'fileset' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
msg = "Error: could not find required fields in CSV. Expecting entry_group and fileset in CSV"
entry_type = constants.FILESET
entry_group = tag_dict['entry_group']
fileset = tag_dict['fileset']

if data_asset_type == constants.SPAN_ASSET:
if 'instance' not in tag_dict or 'database' not in tag_dict or 'table' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance, database, and table"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status

else:
entry_type = constants.SPAN_TABLE
instance = tag_dict['instance']
database = tag_dict['database']

if 'schema' in tag_dict:
schema = tag_dict['schema']
table = tag_dict['table']
table = f"`{schema}.{table}`"
else:
table = tag_dict['table']

if entry_type == constants.DATASET:
resource = '//bigquery.googleapis.com/projects/{}/datasets/{}'.format(project, dataset)
resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}'
request = datacatalog.LookupEntryRequest()
request.linked_resource=resource

if entry_type == constants.TABLE:
resource = '//bigquery.googleapis.com/projects/{}/datasets/{}/tables/{}'.format(project, dataset, table)
if entry_type == constants.BQ_TABLE:
resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}'
request = datacatalog.LookupEntryRequest()
request.linked_resource=resource

if entry_type == constants.FILESET:
resource = '//datacatalog.googleapis.com/projects/{}/locations/{}/entryGroups/{}/entries/{}'.format(project, CLOUD_STORAGE_REGION, entry_group, fileset)
resource = f'//datacatalog.googleapis.com/projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{fileset}'
request = datacatalog.LookupEntryRequest()
request.linked_resource=resource


if entry_type == constants.SPAN_TABLE:
resource = f'spanner:{project}.regional-{data_asset_region}.{instance}.{database}.{table}'
request = datacatalog.LookupEntryRequest()
request.fully_qualified_name=resource
request.project=project
request.location=data_asset_region

try:
entry = self.client.lookup_entry(request)
except Exception as e:
Expand All @@ -1278,6 +1299,16 @@ def apply_import_config(self, job_uuid, config_uuid, tag_dict, tag_history, over
op_status = constants.ERROR
return op_status

# format uri for storing in tag history table
if data_asset_type == constants.BQ_ASSET:
uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '')
if data_asset_type == constants.SPAN_ASSET:
uri = entry.linked_resource.replace('///projects/', '').replace('instances', 'instance').replace('databases', 'database') + '/table/' + table.replace('`', '')
if data_asset_type == constants.FILESET_ASSET:
uri = entry.linked_resource.replace('//datacatalog.googleapis.com/projects/', '').replace('locations', 'location').replace('entryGroups', 'entry_group').replace('entries', 'entry')

target_column = None

if 'column' in tag_dict:
target_column = tag_dict['column']

Expand All @@ -1289,11 +1320,8 @@ def apply_import_config(self, job_uuid, config_uuid, tag_dict, tag_history, over
op_status = constants.ERROR
return op_status

uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '') + '/column/' + target_column
else:
target_column = None
uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '')

uri = uri + '/column/' + target_column

try:
tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=target_column)

Expand All @@ -1315,7 +1343,8 @@ def apply_import_config(self, job_uuid, config_uuid, tag_dict, tag_history, over
for field_name in tag_dict:

if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \
field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset':
field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \
field_name == 'instance' or field_name == 'database' or field_name == 'schema':
continue

field_type = None
Expand Down Expand Up @@ -2163,18 +2192,19 @@ def update_tag_subset(self, template_id, template_project, template_region, entr
target_scopes=SCOPES,
lifetime=1200)

template_id = 'data_governance'
template_id = 'data_sensitivity'
template_project = 'tag-engine-run'
template_region = 'us-central1'

fields = [{'field_type': 'enum', 'field_id': 'data_domain', 'enum_values': ['ENG', 'PRODUCT', 'OPERATIONS', 'LOGISTICS', 'FINANCE', 'HR', 'LEGAL', 'MARKETING', 'SALES', 'CONSUMER', 'GOVERNMENT'], 'is_required': True, 'display_name': 'Data Domain', 'order': 10, 'query_expression': "select 'LOGISTICS'"}, {'field_type': 'enum', 'field_id': 'broad_data_category', 'enum_values': ['CONTENT', 'METADATA', 'CONFIGURATION'], 'is_required': True, 'display_name': 'Broad Data Category', 'order': 9, 'query_expression': "select 'CONTENT'"}]
columns_query = "select 'unique_key', 'created_date', 'incident.city', 'incident.county'"
uri = 'tag-engine-run/datasets/cities_311/tables/austin_311_service_requests'
job_uuid = '3291b93804d211ef9d2549bd5e1feaa2'
config_uuid = '6fb997443e0311ef9f5242004e494300'
template_uuid = 'fa8aa3007f1711eebe2b4f918967d564'
tag_history = False
job_uuid = 'df0ddb3e477511ef95dc42004e494300'
config_uuid = '3404d03a477a11ef995442004e494300'
data_asset_type = 'fileset'
data_asset_region = 'us-central1'
tag_dict = {'project': 'tag-engine-run', 'entry_group': 'sakila_eg', 'fileset': 'staff', 'column': 'first_name', 'sensitive_field': 'TRUE', 'sensitive_type': 'Sensitive_Personal_Identifiable_Information'}
tag_history = True
overwrite = True

dcu = DataCatalogController(credentials, target_service_account, 'scohen@gcp.solutions', template_id, template_project, template_region)
dcu.apply_dynamic_column_config(fields, columns_query, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False)

dcu.apply_import_config(job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite)


Loading

0 comments on commit 5840516

Please sign in to comment.