Skip to content

Commit

Permalink
add parameters via athena (#605)
Browse files Browse the repository at this point in the history
* add parameters via athena

* wip

* better error handling
  • Loading branch information
iakov-aws authored Aug 23, 2023
1 parent 76fffb4 commit fed7f94
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions cid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ def getPlugin(self, plugin) -> dict:
return self.plugins.get(plugin)


def get_definition(self, type: str, name: str=None, id: str=None) -> dict:
""" return resource definition that matches parameters """
def get_definition(self, type: str, name: str=None, id: str=None, noparams: bool=False) -> dict:
""" return resource definition that matches parameters
:noparams: do not process parameters as they may not exist by this time
"""
res = None
if type not in ['dashboard', 'dataset', 'view', 'schedule']:
raise ValueError(f'{type} is not a valid definition type')
Expand All @@ -240,7 +242,7 @@ def get_definition(self, type: str, name: str=None, id: str=None) -> dict:
break

# template
if isinstance(res, dict):
if isinstance(res, dict) and not noparams:
name = name or res.get('name')
params = self.get_template_parameters(res.get('parameters', {}), param_prefix=f'{type}-{name}-')
# FIXME: can be recursive?
Expand Down Expand Up @@ -300,7 +302,7 @@ def load_resources(self):
self.resources = self.resources_with_global_parameters(self.resources)


def get_template_parameters(self, parameters: dict, param_prefix: str='', others: dict={}):
def get_template_parameters(self, parameters: dict, param_prefix: str='', others: dict=None):
""" Get template parameters. """
params = get_parameters()
for key, value in parameters.items():
Expand All @@ -314,10 +316,21 @@ def get_template_parameters(self, parameters: dict, param_prefix: str='', others
message=f"Required parameter: {key} ({value.get('description')})",
choices=self.cur.tag_and_cost_category_fields + ["'none'"],
)
elif isinstance(value, dict) and value.get('type') == 'athena':
if 'query' not in value:
raise CidCritical(f'Failed fetching parameter {prefix}{key}: paramter with type ahena must have query value.')
query = value['query']
try:
res = self.athena.query(query)[0]
except (self.athena.client.exceptions.ClientError, CidError, CidCritical) as exc:
raise CidCritical(f'Failed fetching parameter {prefix}{key}: {exc}') from exc
if not res:
raise CidCritical(f'Failed fetching parameter {prefix}{key}, {value}. Athena returns empty result')
params[key] = res[0]
elif isinstance(value, dict):
params[key] = value.get('value')
while params[key] == None:
if value.get('silentDefault') != None and get_parameters().get(key) == None:
while params[key] is None:
if value.get('silentDefault') is not None and get_parameters().get(key) is None:
params[key] = value.get('silentDefault')
else:
params[key] = get_parameter(
Expand All @@ -328,7 +341,7 @@ def get_template_parameters(self, parameters: dict, param_prefix: str='', others
)
else:
raise CidCritical(f'Unknown parameter type for "{key}". Must be a string or a dict with value or with default key')
return always_merger.merge(params, others)
return always_merger.merge(params, others or {})


@command
Expand Down Expand Up @@ -1083,7 +1096,7 @@ def create_datasets(self, _datasets: list, known_datasets: dict={}, recursive: b
print('\nLooking by DataSetId defined in template...', end='')
for dataset_name in missing_datasets[:]:
try:
dataset_definition = self.get_definition(type='dataset', name=dataset_name)
dataset_definition = self.get_definition(type='dataset', name=dataset_name, noparams=True)
raw_template = self.get_data_from_definition('dataset', dataset_definition)
if raw_template:
ds = self.qs.describe_dataset(raw_template.get('DataSetId'))
Expand Down Expand Up @@ -1339,8 +1352,13 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non
columns_tpl,
)
logger.debug(columns_tpl)

compiled_dataset = json.loads(template.safe_substitute(columns_tpl))
compiled_dataset_text = template.safe_substitute(columns_tpl)
try:
compiled_dataset = json.loads(compiled_dataset_text)
except json.JSONDecodeError as exc:
logger.error('The json of dataset is not correct. Please check parameters of the dasbhoard.')
logger.debug(compiled_dataset_text)
raise
if dataset_id:
compiled_dataset.update({'DataSetId': dataset_id})

Expand Down

0 comments on commit fed7f94

Please sign in to comment.