Skip to content

Commit

Permalink
improve table creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Iakov Gan authored and darken99 committed Apr 22, 2022
1 parent 3c2c3a2 commit 88afd9a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cid/builtin/core/data/queries/shared/business_units_map.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ SELECT *
FROM
(
VALUES
ROW ('111111111', '111111111', 'Business Unit 1')
, ROW ('222222222', '111111111', 'Business Unit 1')
ROW ('111111111', 'account1', 'Business Unit 1')
, ROW ('222222222', 'account2', 'Business Unit 2')
) ignored_table_name (account_id, account_name,bu)
40 changes: 11 additions & 29 deletions cid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import os
import sys
import time

import click
import requests
Expand Down Expand Up @@ -842,7 +841,6 @@ def create_view(self, view_name: str) -> None:
self.accountMap.create(view_name)
return
# Create a view
print(f'\nCreating view: {view_name}')
logger.info(f'Creating view: {view_name}')
logger.info(f'Getting view definition')
view_definition = self.resources.get('views').get(view_name, dict())
Expand All @@ -856,38 +854,22 @@ def create_view(self, view_name: str) -> None:
dep = dependency_views.copy().pop()
# for dep in dependency_views:
if dep not in self.athena._metadata.keys():
print(f'Missing dependency view: {dep}, trying to create')
logger.info(f'Missing dependency view: {dep}, trying to create')
print(f'Missing dependency view: {dep}, creating')
logger.info(f'Missing dependency view: {dep}, creating')
self.create_view(dep)
dependency_views.remove(dep)
view_query = self.get_view_query(view_name=view_name)
if view_definition.get('type') == 'Glue_Table':
self.create_glue_table(view_name, view_query)
if view_name in self.athena._metadata.keys():
logger.debug(f'View "{view_name}" exists')
return
else:
self.athena.execute_query(view_query)
print(f'\nView "{view_name}" created')


def create_glue_table(self, view_name: str, view_query: str, fore_recreate=False) -> None:
poll_interval = 1
max_timeout = 60
try:
self.glue.create_table(json.loads(view_query))
except self.glue.client.exceptions.AlreadyExistsException:
print(f'Glue table "{view_name}" exists')
logger.error(f'Glue table "{view_name}" exists')
deadline = time.time() + max_timeout
while time.time() < deadline:
self.athena.discover_views([view_name])
if view_name in self.athena._metadata.keys():
print(f'Glue table {view_name} is created')
logger.info(f'Glue table {view_name} is created')
break
logger.info(f'Creating view: {view_name}')
if view_definition.get('type') == 'Glue_Table':
self.glue.ensure_glue_table_created(view_name, view_query)
else:
time.sleep(poll_interval)
else:
logger.error(f'Glue table {view_name} is not created before timeout')
return None
self.athena.execute_query(view_query)
assert self.athena.wait_for_view(view_name), f"Failed to create a view {view_name}"
logger.info(f'View "{view_name}" created')


def get_view_query(self, view_name: str) -> str:
Expand Down
14 changes: 14 additions & 0 deletions cid/helpers/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,17 @@ def discover_views(self, views: dict={}) -> None:
self.get_table_metadata(TableName=view_name)
except self.client.exceptions.MetadataException:
pass


def wait_for_view(self, view_name: str, poll_interval=1, timeout=60) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
self.discover_views([view_name])
if view_name in self._metadata.keys():
logger.info(f'view {view_name} exists')
return True
else:
time.sleep(poll_interval)
else:
logger.info(f'view {view_name} exists')
return False
9 changes: 8 additions & 1 deletion cid/helpers/glue.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import logging

logger = logging.getLogger(__name__)


class Glue():

def __init__(self, session):
def __init__(self, session):
self.region = session.region_name

# QuickSight client
Expand All @@ -15,3 +16,9 @@ def __init__(self, session):
def create_table(self, table: dict) -> dict:
""" Creates an AWS Glue table """
return self.client.create_table(**table)

def ensure_glue_table_created(self, view_name: str, view_query: str) -> None:
try:
self.create_table(json.loads(view_query))
except self.glue.client.exceptions.AlreadyExistsException:
logger.info(f'Glue table "{view_name}" exists')

0 comments on commit 88afd9a

Please sign in to comment.