diff --git a/cid/common.py b/cid/common.py index 89da4664..28056d3d 100644 --- a/cid/common.py +++ b/cid/common.py @@ -256,7 +256,7 @@ def get_definition(self, type: str, name: str=None, id: str=None, noparams: bool @command def export(self, **kwargs): - export_analysis(self.qs, self.athena) + export_analysis(self.qs, self.athena, glue=self.glue, s3=self.s3) def track(self, action, dashboard_id): """ Send dashboard_id and account_id to adoption tracker """ @@ -523,9 +523,9 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs src_fields = source_template.datasets.get(ds_map.get(dataset_name, dataset_name) ) required_fields = {col.get('Name'): col.get('DataType') for col in src_fields} unmatched = {} - for k, v in required_fields.items(): - if k not in dataset_fields or dataset_fields[k] != v: - unmatched.update({k: {'expected': v, 'found': dataset_fields.get(k)}}) + for field_name, field_type in required_fields.items(): + if field_name not in dataset_fields or dataset_fields[field_name] != field_type: + unmatched.update({field_name: {'expected': field_type, 'found': dataset_fields.get(field_name)}}) logger.debug(f'unmatched_fields={unmatched}') if unmatched: logger.warning(f'Found Dataset "{dataset_name}" ({ds.id}) but it is missing required fields. {(unmatched)}') @@ -1328,62 +1328,62 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non f'quicksight-datasource-id={datasource_id} not found or not in a valid state.' ) from exc else: - # We have no explicit DataSource in parameters - # QuickSight DataSources are not obvious for customer so we will try to do our best guess - # - if there is just one? -> silently take that one - # - if DataSource is references in existing DataSet? -> silently take that one - # - if athena WorkGroup defined -> Try to find a DataSource with this WorkGroup - # - and if still nothing -> ask an explicit choice from the user - pre_compiled_dataset = json.loads(template.safe_substitute()) - dataset_name = pre_compiled_dataset.get('Name') - - # let's find the schema/database and workgroup name - schemas = [] - datasources = [] - if dataset_id: - schemas = self.qs.get_datasets(id=dataset_id)[0].schemas - datasources = self.qs.get_datasets(id=dataset_id)[0].datasources - else: # try to find dataset and get athena database - found_datasets = self.qs.get_datasets(name=dataset_name) - logger.debug(f'Related to dataset {dataset_name}: {[ds.id for ds in found_datasets]}') - if found_datasets: - schemas = list(set(sum([d.schemas for d in found_datasets], []))) - datasources = list(set(sum([d.datasources for d in found_datasets], []))) - logger.debug(f'Found following schemas={schemas}, related to dataset with name {dataset_name}') - logger.info(f'Found {len(datasources)} Athena DataSources related to the DataSet {dataset_name}') - - if not get_parameters().get('athena-database') and len(schemas) == 1 and schemas[0]: - logger.debug(f'Picking the database={schemas[0]}') - self.athena.DatabaseName = schemas[0] - # else user will be suggested to choose database anyway - - if len(datasources) == 1 and datasources[0] in self.qs.athena_datasources: - athena_datasource = self.qs.get_datasources(id=datasources[0])[0] - logger.info(f'Silently selecting the only available DataSources from other datasets: {datasources[0]}.') + # We have no explicit DataSource in parameters + # QuickSight DataSources are not obvious for customer so we will try to do our best guess + # - if there is just one? -> silently take that one + # - if DataSource is references in existing DataSet? -> silently take that one + # - if athena WorkGroup defined -> Try to find a DataSource with this WorkGroup + # - and if still nothing -> ask an explicit choice from the user + pre_compiled_dataset = json.loads(template.safe_substitute()) + dataset_name = pre_compiled_dataset.get('Name') + + # let's find the schema/database and workgroup name + schemas = [] + datasources = [] + if dataset_id: + schemas = self.qs.get_datasets(id=dataset_id)[0].schemas + datasources = self.qs.get_datasets(id=dataset_id)[0].datasources + else: # try to find dataset and get athena database + found_datasets = self.qs.get_datasets(name=dataset_name) + logger.debug(f'Related to dataset {dataset_name}: {[ds.id for ds in found_datasets]}') + if found_datasets: + schemas = list(set(sum([d.schemas for d in found_datasets], []))) + datasources = list(set(sum([d.datasources for d in found_datasets], []))) + logger.debug(f'Found following schemas={schemas}, related to dataset with name {dataset_name}') + logger.info(f'Found {len(datasources)} Athena DataSources related to the DataSet {dataset_name}') + + if not get_parameters().get('athena-database') and len(schemas) == 1 and schemas[0]: + logger.debug(f'Picking the database={schemas[0]}') + self.athena.DatabaseName = schemas[0] + # else user will be suggested to choose database anyway + + if len(datasources) == 1 and datasources[0] in self.qs.athena_datasources: + athena_datasource = self.qs.get_datasources(id=datasources[0])[0] + logger.info(f'Silently selecting the only available DataSources from other datasets: {datasources[0]}.') + else: + # Ask user to choose the datasource + # Narrow the choice to only datasources with the given workgroup + datasources_with_workgroup = self.qs.get_datasources(athena_workgroup_name=self.athena.WorkGroup) + logger.info(f'Found {len(datasources_with_workgroup)} Athena DataSources with WorkGroup={self.athena.WorkGroup}.') + datasource_choices = { + f"{datasource.name} {datasource.id} (workgroup={datasource.AthenaParameters.get('WorkGroup')})": datasource.id + for datasource in datasources_with_workgroup + } + if 'CID-CMD-Athena' not in list(datasource_choices.values()): + datasource_choices['CID-CMD-Athena '] = 'Create New DataSource' + #TODO: add possibility to update datasource and role + datasource_id = get_parameter( + param_name='quicksight-datasource-id', + message=f"Please choose DataSource (Select the first one if not sure)", + choices=datasource_choices, + ) + if not datasource_id or datasource_id == 'Create New DataSource': + datasource_id = 'CID-CMD-Athena' + logger.info(f'Creating DataSource {datasource_id}') + athena_datasource = self.create_datasource(datasource_id) else: - # Ask user to choose the datasource - # Narrow the choice to only datasources with the given workgroup - datasources_with_workgroup = self.qs.get_datasources(athena_workgroup_name=self.athena.WorkGroup) - logger.info(f'Found {len(datasources_with_workgroup)} Athena DataSources with WorkGroup={self.athena.WorkGroup}.') - datasource_choices = { - f"{datasource.name} {datasource.id} (workgroup={datasource.AthenaParameters.get('WorkGroup')})": datasource.id - for datasource in datasources_with_workgroup - } - if 'CID-CMD-Athena' not in list(datasource_choices.values()): - datasource_choices['CID-CMD-Athena '] = 'Create New DataSource' - #TODO: add possibility to update datasource and role - datasource_id = get_parameter( - param_name='quicksight-datasource-id', - message=f"Please choose DataSource (Select the first one if not sure)", - choices=datasource_choices, - ) - if not datasource_id or datasource_id == 'Create New DataSource': - datasource_id = 'CID-CMD-Athena' - logger.info(f'Creating DataSource {datasource_id}') - athena_datasource = self.create_datasource(datasource_id) - else: - athena_datasource = self.qs.get_datasources(id=datasource_id)[0] - logger.info(f'Using DataSource = {athena_datasource.id}') + athena_datasource = self.qs.get_datasources(id=datasource_id)[0] + logger.info(f'Using DataSource = {athena_datasource.id}') if not get_parameters().get('athena-workgroup'): # set default workgroup from datasource if not provided via parameters if isinstance(athena_datasource, Datasource) and athena_datasource.AthenaParameters.get('WorkGroup', None): diff --git a/cid/export.py b/cid/export.py index 5397707e..bdd3bd47 100644 --- a/cid/export.py +++ b/cid/export.py @@ -16,7 +16,7 @@ import yaml import boto3 -from cid.helpers import Dataset, QuickSight, Athena, Glue +from cid.helpers import Dataset, QuickSight, Athena, Glue, S3 from cid.helpers import CUR from cid.utils import get_parameter, get_parameters, cid_print from cid.exceptions import CidCritical @@ -70,7 +70,7 @@ def choose_analysis(qs): return choices[choice]['AnalysisId'] -def export_analysis(qs, athena, glue): +def export_analysis(qs, athena, glue, s3): """ Export analysis to yaml resource File """ @@ -95,9 +95,11 @@ def export_analysis(qs, athena, glue): resources = {} resources['dashboards'] = {} resources['datasets'] = {} + resources['crawlers'] = {} - cur_helper = CUR(session=athena.session) + cur_helper = CUR(athena=athena, glue=glue, s3=s3) + resources_datasets = [] dataset_references = [] @@ -221,8 +223,8 @@ def export_analysis(qs, athena, glue): logger.debug(f'{dep_view_name} skipping as not in the views list') non_cur_dep_views.append(dep_view_name) if deps.get('views'): - deps['views'] = non_cur_dep_views - if not deps['views']: + deps['views'] = non_cur_dep_views + if not deps['views']: del deps['views'] logger.debug(f'cur_tables = {cur_tables}') @@ -238,23 +240,35 @@ def export_analysis(qs, athena, glue): crawler_name = crawler_names[0] crawler = {'data': glue.get_crawler(crawler_name)} + # Post processing + for crawler_key in crawler['data'].keys(): + if crawler_key not in ( + 'Name', 'Description', 'Role', 'DatabaseName', 'Targets', + 'SchemaChangePolicy', 'RecrawlPolicy', 'Schedule', 'Configuration' + ): # remove all keys that are not needed + crawler['data'].pop(crawler_key, None) + if 'Schedule' in crawler['data']['Schedule']: + crawler['data']['Schedule'] = crawler['data']['Schedule']['ScheduleExpression'] + crawler['data']['Role'] = '${crawler_role_arn}' + crawler['data']['DatabaseName'] = "${athena_database_name}" for index, target in enumerate(crawler['data'].get('Targets', [])): path = target.get('Path') - logger.info(f'Please replace manually location bucket with a parameter: {path}') default = get_parameter( f'{key}-s3path', 'Please provide default value. (You can use {account_id} variable if needed)', default=re.sub(r'(\d{12})', '{account_id}', path), template_variables={'account_id': '{account_id}'}, ) - crawler['parameters'] = { - f's3path': { + crawler['parameters'] = { #FIXME: path should be the same as for table + 's3path': { 'default': default, 'description': f"S3 Path for {key} table", } } crawler['data']['Targets'][index]['Path'] = '${s3path}' - crawlers.append(crawler) + crawler_name = key + resources['crawlers'][crawler_name] = crawler + view_data['crawler'] = crawler_name # replace location with variables @@ -268,13 +282,16 @@ def export_analysis(qs, athena, glue): template_variables={'account_id': '{account_id}'}, ) view_data['parameters'] = { - f's3path': { + 's3path': { 'default': default, 'description': f"S3 Path for {key} table", } } view_data['data'] = view_data['data'].replace(location, '${s3path}') + if re.findall(r"PARTITION", view_data.get('data')) and 'crawler' not in view_data: + logger.warning(f'The table {key} is partitioned but there no crawler info please make sure partitions are managed some way after install.') + resources['views'][key] = view_data logger.debug('Building dashboard resource') @@ -292,7 +309,7 @@ def export_analysis(qs, athena, glue): dashboard_resource['dependsOn'] = { # Historically CID uses dataset names as dataset reference. IDs of manually created resources have uuid format. # We can potentially reconsider this and use IDs at some point - 'datasets': sorted(list(set([dataset_name for dataset_name in datasets] + resources_datasets))) + 'datasets': sorted(list(set(datasets + resources_datasets))) } dashboard_resource['name'] = analysis['Name'] dashboard_resource['dashboardId'] = dashboard_id @@ -409,11 +426,17 @@ def export_analysis(qs, athena, glue): cid_print(f'Output: {output}') -if __name__ == "__main__": # for testing +def quick_try(): + ''' just trying the export + ''' logging.basicConfig(level=logging.INFO) logging.getLogger('cid').setLevel(logging.DEBUG) identity = boto3.client('sts').get_caller_identity() qs = QuickSight(boto3.session.Session(), identity) - athena = Athena(boto3.session.Session(), identity) - glue = Glue(boto3.session.Session(), identity) - export_analysis(qs, athena, glue) + athena = Athena(boto3.session.Session()) + glue = Glue(boto3.session.Session()) + s3 = S3(boto3.session.Session()) + export_analysis(qs, athena, glue, s3) + +if __name__ == "__main__": # for testing + quick_try()