From 60434fe46cde6fe10aaefb6270952e66eef02c10 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Fri, 13 Sep 2024 12:55:55 +0530 Subject: [PATCH] MINOR: Optimise Snowflake Test Connection (#17779) --- .../source/database/bigquery/connection.py | 30 +++++++++++++++++-- .../source/database/redshift/connection.py | 11 +++++-- .../source/database/redshift/queries.py | 8 +++-- .../source/database/redshift/utils.py | 2 +- .../source/database/snowflake/connection.py | 7 ++++- .../source/database/snowflake/queries.py | 4 +++ 6 files changed, 53 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py b/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py index a4b71998e829..06bc25a031aa 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py @@ -17,6 +17,7 @@ from functools import partial from typing import Optional +from google.api_core.exceptions import NotFound from google.cloud.datacatalog_v1 import PolicyTagManagerClient from sqlalchemy.engine import Engine @@ -149,8 +150,8 @@ def test_connection_inner(engine): test_fn = { "CheckAccess": partial(test_connection_engine_step, engine), "GetSchemas": partial(execute_inspector_func, engine, "get_schema_names"), - "GetTables": partial(execute_inspector_func, engine, "get_table_names"), - "GetViews": partial(execute_inspector_func, engine, "get_view_names"), + "GetTables": partial(get_table_view_names, engine), + "GetViews": partial(get_table_view_names, engine), "GetTags": test_tags, "GetQueries": partial( test_query, @@ -170,3 +171,28 @@ def test_connection_inner(engine): ) test_connection_inner(engine) + + +def get_table_view_names(connection, schema=None): + with connection.connect() as conn: + current_schema = schema + client = conn.connection._client + item_types = ["TABLE", "EXTERNAL", "VIEW", "MATERIALIZED_VIEW"] + datasets = client.list_datasets() + result = [] + for dataset in datasets: + if current_schema is not None and current_schema != dataset.dataset_id: + continue + + try: + tables = client.list_tables(dataset.reference, page_size=1) + for table in tables: + if table.table_type in item_types: + break + except NotFound: + # It's possible that the dataset was deleted between when we + # fetched the list of datasets and when we try to list the + # tables from it. See: + # https://github.com/googleapis/python-bigquery-sqlalchemy/issues/105 + pass + return result diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/connection.py b/ingestion/src/metadata/ingestion/source/database/redshift/connection.py index 3bb2bb7a103d..a9193d5be298 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/connection.py @@ -16,6 +16,7 @@ from typing import Optional from sqlalchemy.engine import Engine +from sqlalchemy.sql import text from metadata.generated.schema.entity.automations.workflow import ( Workflow as AutomationWorkflow, @@ -38,6 +39,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import kill_active_connections from metadata.ingestion.source.database.redshift.queries import ( + REDSHIFT_GET_ALL_RELATIONS, REDSHIFT_GET_DATABASE_NAMES, REDSHIFT_TEST_GET_QUERIES, REDSHIFT_TEST_PARTITION_DETAILS, @@ -65,6 +67,11 @@ def test_connection( Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ + table_and_view_query = text( + REDSHIFT_GET_ALL_RELATIONS.format( + schema_clause="", table_clause="", limit_clause="LIMIT 1" + ) + ) def test_get_queries_permissions(engine_: Engine): """Check if we have the right permissions to list queries""" @@ -78,8 +85,8 @@ def test_get_queries_permissions(engine_: Engine): test_fn = { "CheckAccess": partial(test_connection_engine_step, engine), "GetSchemas": partial(execute_inspector_func, engine, "get_schema_names"), - "GetTables": partial(execute_inspector_func, engine, "get_table_names"), - "GetViews": partial(execute_inspector_func, engine, "get_view_names"), + "GetTables": partial(test_query, statement=table_and_view_query, engine=engine), + "GetViews": partial(test_query, statement=table_and_view_query, engine=engine), "GetQueries": partial(test_get_queries_permissions, engine), "GetDatabases": partial( test_query, statement=REDSHIFT_GET_DATABASE_NAMES, engine=engine diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py index f35cd6d1678d..6a2210626ea8 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py @@ -236,7 +236,7 @@ # hence we are appending "create view . as " to select query # to generate the column level lineage REDSHIFT_GET_ALL_RELATIONS = """ - SELECT + (SELECT c.relkind, n.oid as "schema_oid", n.nspname as "schema", @@ -255,8 +255,9 @@ JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner WHERE c.relkind IN ('r', 'v', 'm', 'S', 'f') AND n.nspname !~ '^pg_' {schema_clause} {table_clause} + {limit_clause}) UNION - SELECT + (SELECT 'r' AS "relkind", s.esoid AS "schema_oid", s.schemaname AS "schema", @@ -272,7 +273,8 @@ JOIN svv_external_schemas s ON s.schemaname = t.schemaname JOIN pg_catalog.pg_user u ON u.usesysid = s.esowner where 1 {schema_clause} {table_clause} - ORDER BY "relkind", "schema_oid", "schema"; + ORDER BY "relkind", "schema_oid", "schema" + {limit_clause}); """ diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py index 92bc5a2ef821..0329792017fd 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py @@ -386,7 +386,7 @@ def _get_all_relation_info(self, connection, **kw): # pylint: disable=unused-ar result = connection.execute( sa.text( REDSHIFT_GET_ALL_RELATIONS.format( - schema_clause=schema_clause, table_clause=table_clause + schema_clause=schema_clause, table_clause=table_clause, limit_clause="" ) ) ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index 4fcfe6cb908c..599a3f3815e3 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -45,6 +45,7 @@ SNOWFLAKE_TEST_FETCH_TAG, SNOWFLAKE_TEST_GET_QUERIES, SNOWFLAKE_TEST_GET_TABLES, + SNOWFLAKE_TEST_GET_VIEWS, ) from metadata.utils.logger import ingestion_logger @@ -176,7 +177,11 @@ def test_connection( statement=SNOWFLAKE_TEST_GET_TABLES, engine_wrapper=engine_wrapper, ), - "GetViews": partial(execute_inspector_func, engine_wrapper, "get_view_names"), + "GetViews": partial( + test_table_query, + statement=SNOWFLAKE_TEST_GET_VIEWS, + engine_wrapper=engine_wrapper, + ), "GetQueries": partial( test_query, statement=SNOWFLAKE_TEST_GET_QUERIES, engine=engine ), diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index 104c3ccaf599..842f32d0c5d5 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -245,6 +245,10 @@ SELECT TABLE_NAME FROM "{database_name}".information_schema.tables LIMIT 1 """ +SNOWFLAKE_TEST_GET_VIEWS = """ +SELECT TABLE_NAME FROM "{database_name}".information_schema.views LIMIT 1 +""" + SNOWFLAKE_GET_DATABASES = "SHOW DATABASES"