Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
iakov-aws committed Apr 30, 2024
1 parent cac1ad6 commit 9a5a6fc
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 21 deletions.
6 changes: 3 additions & 3 deletions cid/builtin/core/data/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ views:
- savings_plan_savings_plan_a_r_n

hourly_view:
spriFile: cudos/hourly_view_sp_ri.sql
spFile: cudos/hourly_view_sp.sql
riFile: cudos/hourly_view_ri.sql
#spriFile: cudos/hourly_view_sp_ri.sql
#spFile: cudos/hourly_view_sp.sql
#riFile: cudos/hourly_view_ri.sql
File: cudos/hourly_view.sql
dependsOn:
cur:
Expand Down
20 changes: 11 additions & 9 deletions cid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ def s3(self) -> S3:

@cached_property
def cur1(self):
return ProxyCur(self.cur, target_cur_version='1')
return ProxyCUR(self.cur, target_cur_version='1')

@cached_property
def cur2(self):
return ProxyCur(self.cur, target_cur_version='2')
return ProxyCUR(self.cur, target_cur_version='2')

@property
def cur(self) -> CUR:
Expand All @@ -139,6 +139,7 @@ def cur(self) -> CUR:
self._clients['cur'] = _cur
break
except CidCritical as exc:
logger.exception(exc)
cid_print(f'CUR not found in {self.athena.DatabaseName}. If you have S3 bucket with CUR in this account you can create a CUR table with Crawler.')
self.create_cur_table()
return self._clients['cur']
Expand All @@ -149,7 +150,6 @@ def accountMap(self) -> AccountMap:
_account_map = AccountMap(self.base.session)
_account_map.athena = self.athena
_account_map.cur = self.cur

self._clients.update({
'accountMap': _account_map
})
Expand Down Expand Up @@ -1487,8 +1487,8 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non
columns_tpl = {
'athena_datasource_arn': athena_datasource.arn,
'athena_database_name': self.athena.DatabaseName,
'cur_table_name': self.cur1.get_table_name(version='1') if cur_required else None,
'cur2_table_name': self.cur2.get_table_name(version='2') if cur2_required else None,
'cur_table_name': self.cur1.table_name if cur_required else None,
'cur2_table_name': self.cur2.table_name if cur2_required else None,
}

logger.debug(f'dataset_id={dataset_id}')
Expand Down Expand Up @@ -1599,15 +1599,17 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo
dependencies = view_definition.get('dependsOn', {})

# Process CUR columns
if dependencies.get('cur1'):
self.cur1.ensure_columns(dependencies.get('cur1'))
if dependencies.get('cur'):
self.cur1.ensure_columns(dependencies.get('cur'))
if dependencies.get('cur2'):
self.cur2.ensure_columns(dependencies.get('cur2'))

if recursive:
dependency_views = dependencies.get('views', [])
if 'cur' in dependency_views:
dependency_views.remove('cur')
if 'cur2' in dependency_views:
dependency_views.remove('cur2')
# Discover dependency views (may not be discovered earlier)
self.athena.discover_views(dependency_views)
logger.info(f"Dependency views: {', '.join(dependency_views)}" if dependency_views else 'No dependency views')
Expand Down Expand Up @@ -1758,8 +1760,8 @@ def get_view_query(self, view_name: str) -> str:

# Prepare template parameters
columns_tpl = {
'cur_table_name': self.cur.get_table_name(version='1') if cur_required else None,
'cur2_table_name': self.cur.get_table_name(version='2') if cur2_required else None,
'cur_table_name': self.cur1.table_name if cur_required else None,
'cur2_table_name': self.cur2.table_name if cur2_required else None,
'athenaTableName': view_name,
'athena_database_name': self.athena.DatabaseName,
}
Expand Down
4 changes: 2 additions & 2 deletions cid/helpers/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def execute_query(self, sql_query, sleep_duration=1, database: str=None, catalog
raise CidCritical(f'InvalidRequestException: {exc}') from exc
except Exception as exc:
logger.debug(f'Full query: {sql_query}')
raise CidCritical(f'Athena query failed: {exc}') from exc
raise CidCritical(f'Query:\n{sql_query}\n\nAthena query failed: {exc}') from exc

current_status = query_status['QueryExecution']['Status']['State']

Expand All @@ -330,7 +330,7 @@ def execute_query(self, sql_query, sleep_duration=1, database: str=None, catalog
logger.info(f'Athena query failed: {failure_reason}')
logger.debug(f'Full query: {sql_query}')
if fail:
raise CidCritical(f'Athena query failed: {failure_reason}')
raise CidCritical(f'Query:\n{sql_query}\n\nAthena query status failed : {failure_reason}')
return False

def get_query_results(self, query_id):
Expand Down
9 changes: 7 additions & 2 deletions cid/helpers/cur.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,20 @@ def ensure_column(self, column: str, column_type: str=None):

def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool:
""" return True if table metadata fits CUR definition. """

print(name)
try:
table = table or self.athena.get_table_metadata(name)
except Exception as exc: #pylint: disable=broad-exception-caught
logger.debug(exc)
logger.critical(exc)
return False if not return_reason else (False, f'cannot get table {name}. {exc}.')

table_name = table.get('Name')
if '_proxy' in table_name:
return False if not return_reason else (False, f"Table {table_name} most likely is a proxy.")
columns = [col.get('Name') for col in table.get('Columns')]
missing_columns = [col for col in self.cur_minimal_required_columns if col not in columns]
logger.critical(missing_columns)
if missing_columns:
return False if not return_reason else (False, f"Table {table_name} does not contain columns: {','.join(missing_columns)}. You can try ALTER TABLE {table_name} ADD COLUMNS (missing_column string).")

Expand Down Expand Up @@ -184,7 +187,7 @@ def find_cur(self):
metadata = self.athena.get_table_metadata(table_name)
except self.athena.client.exceptions.ResourceNotFoundException as exc:
raise CidCritical(f'Provided cur-table-name "{table_name}" is not found. Please make sure the table exists.') from exc
res, message = self.table_is_cur(table=self._metadata, return_reason=True)
res, message = self.table_is_cur(table=metadata, return_reason=True)
if not res:
raise CidCritical(f'Table {table_name} does not look like CUR. {message}')
else:
Expand Down Expand Up @@ -270,7 +273,9 @@ def metadata(self) -> dict:
return self._metadata

def ensure_columns(self, columns):
self.metadata
for column in columns:
column_type = self.cur.get_type_of_column(column)
self.proxy.fields_to_expose[column] = column_type
print(columns)
self.proxy.create_or_update_view()
10 changes: 5 additions & 5 deletions cid/helpers/cur_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
"concat('name', line_item_usage_account_id)": 'line_item_usage_account_name',
}


# various types require various empty
empty = {
'string': 'cast (null as varchar)',
Expand All @@ -198,7 +199,7 @@ def __init__(self, cur, target_cur_version, fields_to_expose=None):
self.target_cur_version = target_cur_version
self.current_cur_version = self.cur.version
logger.debug(f'CUR proxy from {self.current_cur_version } to {self.target_cur_version }')
self.fields_to_expose = fields_to_expose or []
self.fields_to_expose = fields_to_expose or {}
self.athena = self.cur.athena
self.name = 'cur1_proxy'
self.exposed_fields = []
Expand Down Expand Up @@ -238,7 +239,7 @@ def get_sql_expression(self, field, field_type):
'''
cur1to2mapping = {value: key for key, value in cur2to1mapping.items()}
if field in cur1to2mapping:
return f'{cur1to2mapping[field]}'
return f'{cur1to2mapping.ge(field, field)}'
else:
raise NotImplementedError(f'WARNING: {field} has not known equivalent')

Expand All @@ -247,7 +248,7 @@ def get_sql_expression(self, field, field_type):
return f"resource_tags['{field[len('resource_tags_'):]}']"
if field.startswith('cost_category_'):
return f"cost_category['{field[len('cost_category_'):]}']"
return cur2to1mapping[field]
return cur2to1mapping.get(field, field)

def create_or_update_view(self):
self.read_from_athena()
Expand All @@ -257,7 +258,6 @@ def create_or_update_view(self):
all_fields[field] = self.fields_to_expose[field]
lines = []
for field, field_type in all_fields.items():

mapped_expression = self.get_sql_expression(field, field_type)
requirement = mapped_expression.split('[')[0]
if not re.match(r'^[a-zA-Z0-9_]+$', requirement) or self.cur.column_exists(requirement):
Expand All @@ -274,7 +274,7 @@ def create_or_update_view(self):
"{self.cur.table_name}"
''')

logging.debug(query)
logging.critical(query)
res = self.athena.query(query)
logging.debug(res)

Expand Down

0 comments on commit 9a5a6fc

Please sign in to comment.