Skip to content

Commit

Permalink
MINOR: Optimise Snowflake Test Connection (#17779)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Sep 19, 2024
1 parent 3084189 commit 60434fe
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from functools import partial
from typing import Optional

from google.api_core.exceptions import NotFound
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
from sqlalchemy.engine import Engine

Expand Down Expand Up @@ -149,8 +150,8 @@ def test_connection_inner(engine):
test_fn = {
"CheckAccess": partial(test_connection_engine_step, engine),
"GetSchemas": partial(execute_inspector_func, engine, "get_schema_names"),
"GetTables": partial(execute_inspector_func, engine, "get_table_names"),
"GetViews": partial(execute_inspector_func, engine, "get_view_names"),
"GetTables": partial(get_table_view_names, engine),
"GetViews": partial(get_table_view_names, engine),
"GetTags": test_tags,
"GetQueries": partial(
test_query,
Expand All @@ -170,3 +171,28 @@ def test_connection_inner(engine):
)

test_connection_inner(engine)


def get_table_view_names(connection, schema=None):
with connection.connect() as conn:
current_schema = schema
client = conn.connection._client
item_types = ["TABLE", "EXTERNAL", "VIEW", "MATERIALIZED_VIEW"]
datasets = client.list_datasets()
result = []
for dataset in datasets:
if current_schema is not None and current_schema != dataset.dataset_id:
continue

try:
tables = client.list_tables(dataset.reference, page_size=1)
for table in tables:
if table.table_type in item_types:
break
except NotFound:
# It's possible that the dataset was deleted between when we
# fetched the list of datasets and when we try to list the
# tables from it. See:
# https://github.com/googleapis/python-bigquery-sqlalchemy/issues/105
pass
return result
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Optional

from sqlalchemy.engine import Engine
from sqlalchemy.sql import text

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
Expand All @@ -38,6 +39,7 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import kill_active_connections
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATIONS,
REDSHIFT_GET_DATABASE_NAMES,
REDSHIFT_TEST_GET_QUERIES,
REDSHIFT_TEST_PARTITION_DETAILS,
Expand Down Expand Up @@ -65,6 +67,11 @@ def test_connection(
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
table_and_view_query = text(
REDSHIFT_GET_ALL_RELATIONS.format(
schema_clause="", table_clause="", limit_clause="LIMIT 1"
)
)

def test_get_queries_permissions(engine_: Engine):
"""Check if we have the right permissions to list queries"""
Expand All @@ -78,8 +85,8 @@ def test_get_queries_permissions(engine_: Engine):
test_fn = {
"CheckAccess": partial(test_connection_engine_step, engine),
"GetSchemas": partial(execute_inspector_func, engine, "get_schema_names"),
"GetTables": partial(execute_inspector_func, engine, "get_table_names"),
"GetViews": partial(execute_inspector_func, engine, "get_view_names"),
"GetTables": partial(test_query, statement=table_and_view_query, engine=engine),
"GetViews": partial(test_query, statement=table_and_view_query, engine=engine),
"GetQueries": partial(test_get_queries_permissions, engine),
"GetDatabases": partial(
test_query, statement=REDSHIFT_GET_DATABASE_NAMES, engine=engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
# hence we are appending "create view <schema>.<table> as " to select query
# to generate the column level lineage
REDSHIFT_GET_ALL_RELATIONS = """
SELECT
(SELECT
c.relkind,
n.oid as "schema_oid",
n.nspname as "schema",
Expand All @@ -255,8 +255,9 @@
JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
WHERE c.relkind IN ('r', 'v', 'm', 'S', 'f')
AND n.nspname !~ '^pg_' {schema_clause} {table_clause}
{limit_clause})
UNION
SELECT
(SELECT
'r' AS "relkind",
s.esoid AS "schema_oid",
s.schemaname AS "schema",
Expand All @@ -272,7 +273,8 @@
JOIN svv_external_schemas s ON s.schemaname = t.schemaname
JOIN pg_catalog.pg_user u ON u.usesysid = s.esowner
where 1 {schema_clause} {table_clause}
ORDER BY "relkind", "schema_oid", "schema";
ORDER BY "relkind", "schema_oid", "schema"
{limit_clause});
"""


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def _get_all_relation_info(self, connection, **kw): # pylint: disable=unused-ar
result = connection.execute(
sa.text(
REDSHIFT_GET_ALL_RELATIONS.format(
schema_clause=schema_clause, table_clause=table_clause
schema_clause=schema_clause, table_clause=table_clause, limit_clause=""
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
SNOWFLAKE_TEST_FETCH_TAG,
SNOWFLAKE_TEST_GET_QUERIES,
SNOWFLAKE_TEST_GET_TABLES,
SNOWFLAKE_TEST_GET_VIEWS,
)
from metadata.utils.logger import ingestion_logger

Expand Down Expand Up @@ -176,7 +177,11 @@ def test_connection(
statement=SNOWFLAKE_TEST_GET_TABLES,
engine_wrapper=engine_wrapper,
),
"GetViews": partial(execute_inspector_func, engine_wrapper, "get_view_names"),
"GetViews": partial(
test_table_query,
statement=SNOWFLAKE_TEST_GET_VIEWS,
engine_wrapper=engine_wrapper,
),
"GetQueries": partial(
test_query, statement=SNOWFLAKE_TEST_GET_QUERIES, engine=engine
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@
SELECT TABLE_NAME FROM "{database_name}".information_schema.tables LIMIT 1
"""

SNOWFLAKE_TEST_GET_VIEWS = """
SELECT TABLE_NAME FROM "{database_name}".information_schema.views LIMIT 1
"""

SNOWFLAKE_GET_DATABASES = "SHOW DATABASES"


Expand Down

0 comments on commit 60434fe

Please sign in to comment.