Skip to content

Commit

Permalink
refactor export
Browse files Browse the repository at this point in the history
  • Loading branch information
iakov-aws committed Dec 28, 2023
1 parent 16d427f commit 30f623b
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 74 deletions.
118 changes: 59 additions & 59 deletions cid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def get_definition(self, type: str, name: str=None, id: str=None, noparams: bool

@command
def export(self, **kwargs):
export_analysis(self.qs, self.athena)
export_analysis(self.qs, self.athena, glue=self.glue, s3=self.s3)

def track(self, action, dashboard_id):
""" Send dashboard_id and account_id to adoption tracker """
Expand Down Expand Up @@ -523,9 +523,9 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
src_fields = source_template.datasets.get(ds_map.get(dataset_name, dataset_name) )
required_fields = {col.get('Name'): col.get('DataType') for col in src_fields}
unmatched = {}
for k, v in required_fields.items():
if k not in dataset_fields or dataset_fields[k] != v:
unmatched.update({k: {'expected': v, 'found': dataset_fields.get(k)}})
for field_name, field_type in required_fields.items():
if field_name not in dataset_fields or dataset_fields[field_name] != field_type:
unmatched.update({field_name: {'expected': field_type, 'found': dataset_fields.get(field_name)}})
logger.debug(f'unmatched_fields={unmatched}')
if unmatched:
logger.warning(f'Found Dataset "{dataset_name}" ({ds.id}) but it is missing required fields. {(unmatched)}')
Expand Down Expand Up @@ -1328,62 +1328,62 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non
f'quicksight-datasource-id={datasource_id} not found or not in a valid state.'
) from exc
else:
# We have no explicit DataSource in parameters
# QuickSight DataSources are not obvious for customer so we will try to do our best guess
# - if there is just one? -> silently take that one
# - if DataSource is references in existing DataSet? -> silently take that one
# - if athena WorkGroup defined -> Try to find a DataSource with this WorkGroup
# - and if still nothing -> ask an explicit choice from the user
pre_compiled_dataset = json.loads(template.safe_substitute())
dataset_name = pre_compiled_dataset.get('Name')

# let's find the schema/database and workgroup name
schemas = []
datasources = []
if dataset_id:
schemas = self.qs.get_datasets(id=dataset_id)[0].schemas
datasources = self.qs.get_datasets(id=dataset_id)[0].datasources
else: # try to find dataset and get athena database
found_datasets = self.qs.get_datasets(name=dataset_name)
logger.debug(f'Related to dataset {dataset_name}: {[ds.id for ds in found_datasets]}')
if found_datasets:
schemas = list(set(sum([d.schemas for d in found_datasets], [])))
datasources = list(set(sum([d.datasources for d in found_datasets], [])))
logger.debug(f'Found following schemas={schemas}, related to dataset with name {dataset_name}')
logger.info(f'Found {len(datasources)} Athena DataSources related to the DataSet {dataset_name}')

if not get_parameters().get('athena-database') and len(schemas) == 1 and schemas[0]:
logger.debug(f'Picking the database={schemas[0]}')
self.athena.DatabaseName = schemas[0]
# else user will be suggested to choose database anyway

if len(datasources) == 1 and datasources[0] in self.qs.athena_datasources:
athena_datasource = self.qs.get_datasources(id=datasources[0])[0]
logger.info(f'Silently selecting the only available DataSources from other datasets: {datasources[0]}.')
# We have no explicit DataSource in parameters
# QuickSight DataSources are not obvious for customer so we will try to do our best guess
# - if there is just one? -> silently take that one
# - if DataSource is references in existing DataSet? -> silently take that one
# - if athena WorkGroup defined -> Try to find a DataSource with this WorkGroup
# - and if still nothing -> ask an explicit choice from the user
pre_compiled_dataset = json.loads(template.safe_substitute())
dataset_name = pre_compiled_dataset.get('Name')

# let's find the schema/database and workgroup name
schemas = []
datasources = []
if dataset_id:
schemas = self.qs.get_datasets(id=dataset_id)[0].schemas
datasources = self.qs.get_datasets(id=dataset_id)[0].datasources
else: # try to find dataset and get athena database
found_datasets = self.qs.get_datasets(name=dataset_name)
logger.debug(f'Related to dataset {dataset_name}: {[ds.id for ds in found_datasets]}')
if found_datasets:
schemas = list(set(sum([d.schemas for d in found_datasets], [])))
datasources = list(set(sum([d.datasources for d in found_datasets], [])))
logger.debug(f'Found following schemas={schemas}, related to dataset with name {dataset_name}')
logger.info(f'Found {len(datasources)} Athena DataSources related to the DataSet {dataset_name}')

if not get_parameters().get('athena-database') and len(schemas) == 1 and schemas[0]:
logger.debug(f'Picking the database={schemas[0]}')
self.athena.DatabaseName = schemas[0]
# else user will be suggested to choose database anyway

if len(datasources) == 1 and datasources[0] in self.qs.athena_datasources:
athena_datasource = self.qs.get_datasources(id=datasources[0])[0]
logger.info(f'Silently selecting the only available DataSources from other datasets: {datasources[0]}.')
else:
# Ask user to choose the datasource
# Narrow the choice to only datasources with the given workgroup
datasources_with_workgroup = self.qs.get_datasources(athena_workgroup_name=self.athena.WorkGroup)
logger.info(f'Found {len(datasources_with_workgroup)} Athena DataSources with WorkGroup={self.athena.WorkGroup}.')
datasource_choices = {
f"{datasource.name} {datasource.id} (workgroup={datasource.AthenaParameters.get('WorkGroup')})": datasource.id
for datasource in datasources_with_workgroup
}
if 'CID-CMD-Athena' not in list(datasource_choices.values()):
datasource_choices['CID-CMD-Athena <CREATE NEW DATASOURCE>'] = 'Create New DataSource'
#TODO: add possibility to update datasource and role
datasource_id = get_parameter(
param_name='quicksight-datasource-id',
message=f"Please choose DataSource (Select the first one if not sure)",
choices=datasource_choices,
)
if not datasource_id or datasource_id == 'Create New DataSource':
datasource_id = 'CID-CMD-Athena'
logger.info(f'Creating DataSource {datasource_id}')
athena_datasource = self.create_datasource(datasource_id)
else:
# Ask user to choose the datasource
# Narrow the choice to only datasources with the given workgroup
datasources_with_workgroup = self.qs.get_datasources(athena_workgroup_name=self.athena.WorkGroup)
logger.info(f'Found {len(datasources_with_workgroup)} Athena DataSources with WorkGroup={self.athena.WorkGroup}.')
datasource_choices = {
f"{datasource.name} {datasource.id} (workgroup={datasource.AthenaParameters.get('WorkGroup')})": datasource.id
for datasource in datasources_with_workgroup
}
if 'CID-CMD-Athena' not in list(datasource_choices.values()):
datasource_choices['CID-CMD-Athena <CREATE NEW DATASOURCE>'] = 'Create New DataSource'
#TODO: add possibility to update datasource and role
datasource_id = get_parameter(
param_name='quicksight-datasource-id',
message=f"Please choose DataSource (Select the first one if not sure)",
choices=datasource_choices,
)
if not datasource_id or datasource_id == 'Create New DataSource':
datasource_id = 'CID-CMD-Athena'
logger.info(f'Creating DataSource {datasource_id}')
athena_datasource = self.create_datasource(datasource_id)
else:
athena_datasource = self.qs.get_datasources(id=datasource_id)[0]
logger.info(f'Using DataSource = {athena_datasource.id}')
athena_datasource = self.qs.get_datasources(id=datasource_id)[0]
logger.info(f'Using DataSource = {athena_datasource.id}')
if not get_parameters().get('athena-workgroup'):
# set default workgroup from datasource if not provided via parameters
if isinstance(athena_datasource, Datasource) and athena_datasource.AthenaParameters.get('WorkGroup', None):
Expand Down
53 changes: 38 additions & 15 deletions cid/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import yaml
import boto3

from cid.helpers import Dataset, QuickSight, Athena, Glue
from cid.helpers import Dataset, QuickSight, Athena, Glue, S3
from cid.helpers import CUR
from cid.utils import get_parameter, get_parameters, cid_print
from cid.exceptions import CidCritical
Expand Down Expand Up @@ -70,7 +70,7 @@ def choose_analysis(qs):
return choices[choice]['AnalysisId']


def export_analysis(qs, athena, glue):
def export_analysis(qs, athena, glue, s3):

Check notice

Code scanning / CodeGuru Reviewer Scanner

Low maintainability with high cyclomatic complexity Low

The cyclomatic complexity of this function is 51. By comparison, 99% of the functions in the CodeGuru reference dataset have a lower cyclomatic complexity. This indicates the function has a high number of decisions and it can make the logic difficult to understand and test.

We recommend that you simplify this function or break it into multiple functions. For example, consider extracting the code block on lines 418-426 into a separate function.
""" Export analysis to yaml resource File
"""

Expand All @@ -95,9 +95,11 @@ def export_analysis(qs, athena, glue):
resources = {}
resources['dashboards'] = {}
resources['datasets'] = {}
resources['crawlers'] = {}


cur_helper = CUR(session=athena.session)
cur_helper = CUR(athena=athena, glue=glue, s3=s3)

resources_datasets = []

dataset_references = []
Expand Down Expand Up @@ -221,8 +223,8 @@ def export_analysis(qs, athena, glue):
logger.debug(f'{dep_view_name} skipping as not in the views list')
non_cur_dep_views.append(dep_view_name)
if deps.get('views'):
deps['views'] = non_cur_dep_views
if not deps['views']:
deps['views'] = non_cur_dep_views
if not deps['views']:
del deps['views']

logger.debug(f'cur_tables = {cur_tables}')
Expand All @@ -238,23 +240,35 @@ def export_analysis(qs, athena, glue):
crawler_name = crawler_names[0]
crawler = {'data': glue.get_crawler(crawler_name)}

# Post processing
for crawler_key in crawler['data'].keys():
if crawler_key not in (
'Name', 'Description', 'Role', 'DatabaseName', 'Targets',
'SchemaChangePolicy', 'RecrawlPolicy', 'Schedule', 'Configuration'
): # remove all keys that are not needed
crawler['data'].pop(crawler_key, None)
if 'Schedule' in crawler['data']['Schedule']:
crawler['data']['Schedule'] = crawler['data']['Schedule']['ScheduleExpression']
crawler['data']['Role'] = '${crawler_role_arn}'
crawler['data']['DatabaseName'] = "${athena_database_name}"
for index, target in enumerate(crawler['data'].get('Targets', [])):
path = target.get('Path')
logger.info(f'Please replace manually location bucket with a parameter: {path}')
default = get_parameter(
f'{key}-s3path',
'Please provide default value. (You can use {account_id} variable if needed)',
default=re.sub(r'(\d{12})', '{account_id}', path),
template_variables={'account_id': '{account_id}'},
)
crawler['parameters'] = {
f's3path': {
crawler['parameters'] = { #FIXME: path should be the same as for table
's3path': {
'default': default,
'description': f"S3 Path for {key} table",
}
}
crawler['data']['Targets'][index]['Path'] = '${s3path}'
crawlers.append(crawler)
crawler_name = key
resources['crawlers'][crawler_name] = crawler
view_data['crawler'] = crawler_name


# replace location with variables
Expand All @@ -268,13 +282,16 @@ def export_analysis(qs, athena, glue):
template_variables={'account_id': '{account_id}'},
)
view_data['parameters'] = {
f's3path': {
's3path': {
'default': default,
'description': f"S3 Path for {key} table",
}
}
view_data['data'] = view_data['data'].replace(location, '${s3path}')

if re.findall(r"PARTITION", view_data.get('data')) and 'crawler' not in view_data:
logger.warning(f'The table {key} is partitioned but there no crawler info please make sure partitions are managed some way after install.')

resources['views'][key] = view_data

logger.debug('Building dashboard resource')
Expand All @@ -292,7 +309,7 @@ def export_analysis(qs, athena, glue):
dashboard_resource['dependsOn'] = {
# Historically CID uses dataset names as dataset reference. IDs of manually created resources have uuid format.
# We can potentially reconsider this and use IDs at some point
'datasets': sorted(list(set([dataset_name for dataset_name in datasets] + resources_datasets)))
'datasets': sorted(list(set(datasets + resources_datasets)))
}
dashboard_resource['name'] = analysis['Name']
dashboard_resource['dashboardId'] = dashboard_id
Expand Down Expand Up @@ -409,11 +426,17 @@ def export_analysis(qs, athena, glue):
cid_print(f'Output: <BOLD>{output}<END>')


if __name__ == "__main__": # for testing
def quick_try():
''' just trying the export
'''
logging.basicConfig(level=logging.INFO)
logging.getLogger('cid').setLevel(logging.DEBUG)
identity = boto3.client('sts').get_caller_identity()
qs = QuickSight(boto3.session.Session(), identity)
athena = Athena(boto3.session.Session(), identity)
glue = Glue(boto3.session.Session(), identity)
export_analysis(qs, athena, glue)
athena = Athena(boto3.session.Session())
glue = Glue(boto3.session.Session())
s3 = S3(boto3.session.Session())
export_analysis(qs, athena, glue, s3)

if __name__ == "__main__": # for testing
quick_try()

0 comments on commit 30f623b

Please sign in to comment.