From 88afd9aa9a924fb24606608f915458cae0c79483 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Thu, 21 Apr 2022 23:09:01 +0200 Subject: [PATCH] improve table creation --- .../queries/shared/business_units_map.sql | 4 +- cid/common.py | 40 +++++-------------- cid/helpers/athena.py | 14 +++++++ cid/helpers/glue.py | 9 ++++- 4 files changed, 35 insertions(+), 32 deletions(-) diff --git a/cid/builtin/core/data/queries/shared/business_units_map.sql b/cid/builtin/core/data/queries/shared/business_units_map.sql index eadddc55..9b106c1e 100644 --- a/cid/builtin/core/data/queries/shared/business_units_map.sql +++ b/cid/builtin/core/data/queries/shared/business_units_map.sql @@ -3,6 +3,6 @@ SELECT * FROM ( VALUES - ROW ('111111111', '111111111', 'Business Unit 1') - , ROW ('222222222', '111111111', 'Business Unit 1') + ROW ('111111111', 'account1', 'Business Unit 1') + , ROW ('222222222', 'account2', 'Business Unit 2') ) ignored_table_name (account_id, account_name,bu) \ No newline at end of file diff --git a/cid/common.py b/cid/common.py index 4b932bcc..7bb7b744 100644 --- a/cid/common.py +++ b/cid/common.py @@ -8,7 +8,6 @@ import os import sys -import time import click import requests @@ -842,7 +841,6 @@ def create_view(self, view_name: str) -> None: self.accountMap.create(view_name) return # Create a view - print(f'\nCreating view: {view_name}') logger.info(f'Creating view: {view_name}') logger.info(f'Getting view definition') view_definition = self.resources.get('views').get(view_name, dict()) @@ -856,38 +854,22 @@ def create_view(self, view_name: str) -> None: dep = dependency_views.copy().pop() # for dep in dependency_views: if dep not in self.athena._metadata.keys(): - print(f'Missing dependency view: {dep}, trying to create') - logger.info(f'Missing dependency view: {dep}, trying to create') + print(f'Missing dependency view: {dep}, creating') + logger.info(f'Missing dependency view: {dep}, creating') self.create_view(dep) dependency_views.remove(dep) view_query = self.get_view_query(view_name=view_name) - if view_definition.get('type') == 'Glue_Table': - self.create_glue_table(view_name, view_query) + if view_name in self.athena._metadata.keys(): + logger.debug(f'View "{view_name}" exists') + return else: - self.athena.execute_query(view_query) - print(f'\nView "{view_name}" created') - - - def create_glue_table(self, view_name: str, view_query: str, fore_recreate=False) -> None: - poll_interval = 1 - max_timeout = 60 - try: - self.glue.create_table(json.loads(view_query)) - except self.glue.client.exceptions.AlreadyExistsException: - print(f'Glue table "{view_name}" exists') - logger.error(f'Glue table "{view_name}" exists') - deadline = time.time() + max_timeout - while time.time() < deadline: - self.athena.discover_views([view_name]) - if view_name in self.athena._metadata.keys(): - print(f'Glue table {view_name} is created') - logger.info(f'Glue table {view_name} is created') - break + logger.info(f'Creating view: {view_name}') + if view_definition.get('type') == 'Glue_Table': + self.glue.ensure_glue_table_created(view_name, view_query) else: - time.sleep(poll_interval) - else: - logger.error(f'Glue table {view_name} is not created before timeout') - return None + self.athena.execute_query(view_query) + assert self.athena.wait_for_view(view_name), f"Failed to create a view {view_name}" + logger.info(f'View "{view_name}" created') def get_view_query(self, view_name: str) -> str: diff --git a/cid/helpers/athena.py b/cid/helpers/athena.py index c43b86b0..a7efd9c2 100644 --- a/cid/helpers/athena.py +++ b/cid/helpers/athena.py @@ -300,3 +300,17 @@ def discover_views(self, views: dict={}) -> None: self.get_table_metadata(TableName=view_name) except self.client.exceptions.MetadataException: pass + + + def wait_for_view(self, view_name: str, poll_interval=1, timeout=60) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + self.discover_views([view_name]) + if view_name in self._metadata.keys(): + logger.info(f'view {view_name} exists') + return True + else: + time.sleep(poll_interval) + else: + logger.info(f'view {view_name} exists') + return False diff --git a/cid/helpers/glue.py b/cid/helpers/glue.py index 68e282b4..c80da540 100644 --- a/cid/helpers/glue.py +++ b/cid/helpers/glue.py @@ -1,3 +1,4 @@ +import json import logging logger = logging.getLogger(__name__) @@ -5,7 +6,7 @@ class Glue(): - def __init__(self, session): + def __init__(self, session): self.region = session.region_name # QuickSight client @@ -15,3 +16,9 @@ def __init__(self, session): def create_table(self, table: dict) -> dict: """ Creates an AWS Glue table """ return self.client.create_table(**table) + + def ensure_glue_table_created(self, view_name: str, view_query: str) -> None: + try: + self.create_table(json.loads(view_query)) + except self.glue.client.exceptions.AlreadyExistsException: + logger.info(f'Glue table "{view_name}" exists')