From 7c300325b2cf9483df02b2aa21736bec9ad6fd74 Mon Sep 17 00:00:00 2001 From: Honza Stepanovsky Date: Tue, 17 Dec 2024 15:13:08 +0100 Subject: [PATCH 1/2] Add support for Virtual Superset datasets with jinja2 templating --- .../source/dashboard/superset/db_source.py | 87 +++++++++++++++++++ .../source/dashboard/superset/mixin.py | 50 ++++++++--- .../source/dashboard/superset/models.py | 1 + .../source/dashboard/superset/queries.py | 1 + 4 files changed, 126 insertions(+), 13 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py index a9a2e9ef8454..849d8066f6b3 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py @@ -42,7 +42,13 @@ Markdown, SourceUrl, ) +from jinja2 import Template + + from metadata.ingestion.api.models import Either +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import get_column_fqn, search_table_entities from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin from metadata.ingestion.source.dashboard.superset.models import ( @@ -67,6 +73,49 @@ logger = ingestion_logger() +def get_jinja_context(): + # Define placeholder static functions and variables + def get_time_filter(): + return None + + columns = [] + filter = "" + from_dttm = None # Deprecated + to_dttm = None # Deprecated + groupby = [] # Deprecated + metrics = [] + row_limit = 1000 + row_offset = 0 + table_columns = [] + time_column = None + time_grain = None + + # Static empty function definitions for Jinja parsing + def results(): + return "" + + # Add context for rendering + context = { + "columns": columns, + "filter": filter, + "get_filters": lambda *_, remove_filter=False: [], + "filter_values": lambda *_, remove_filter=False: [], + "from_dttm": from_dttm, + "to_dttm": to_dttm, + "groupby": groupby, + "metrics": metrics, + "row_limit": row_limit, + "row_offset": row_offset, + "table_columns": table_columns, + "time_column": time_column, + "time_grain": time_grain, + "get_time_filter": get_time_filter, + "flags": {}, + "results": results, + } + + return context + class SupersetDBSource(SupersetSourceMixin): """ @@ -75,6 +124,7 @@ class SupersetDBSource(SupersetSourceMixin): def __init__(self, config: WorkflowSource, metadata: OpenMetadata): super().__init__(config, metadata) + self.engine: Engine = self.client def prepare(self): @@ -161,6 +211,43 @@ def yield_dashboard( ) ) + def _get_source_table_for_lineage( + self, chart_json: FetchChart, db_service_entity: DatabaseService + ): + if chart_json.sql: + tpl = Template(source=chart_json.sql) + rendered_sql = tpl.render(get_jinja_context()) + + lineage_parser = LineageParser( + rendered_sql, + ConnectionTypeDialectMapper.dialect_of( + db_service_entity.serviceType.value + ) + if db_service_entity + else None, + ) + + tables_list = [] + for source_table in lineage_parser.source_tables or []: + database_schema_table = fqn.split_table_name(str(source_table)) + database_name = get_database_name_for_lineage( + db_service_entity, database_schema_table.get("database") + ) + schema_name = self.check_database_schema_name( + database_schema_table.get("database_schema") + ) + table_name = database_schema_table.get("table") + from_entities = search_table_entities( + metadata=self.metadata, + database=database_name, + service_name=db_service_entity.fullyQualifiedName.root, + database_schema=schema_name, + table=table_name, + ) + tables_list.extend(from_entities) + + return tables_list; + def _get_datasource_fqn_for_lineage( self, chart_json: FetchChart, db_service_entity: DatabaseService ): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py index 9eb49593dd46..84bd87549734 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py @@ -21,6 +21,12 @@ from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import ( SupersetConnection, ) +from metadata.generated.schema.entity.services.connections.database import postgresConnection +from metadata.generated.schema.entity.services.connections.database.common import basicAuth + +from metadata.generated.schema.entity.services.connections.dashboard.customDashboardConnection import ( + CustomDashboardConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -162,15 +168,26 @@ def yield_dashboard_lineage_details( chart_json = self.all_charts.get(chart_id) if chart_json: try: - datasource_fqn = self._get_datasource_fqn_for_lineage( - chart_json, db_service_entity - ) - if not datasource_fqn: - continue - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=datasource_fqn, - ) + from_entities = [] + + if chart_json.sql != "": + # multiple entities + from_entities = self._get_source_table_for_lineage( + chart_json, db_service_entity + ) + else: + datasource_fqn = self._get_datasource_fqn_for_lineage( + chart_json, db_service_entity + ) + if not datasource_fqn: + print("Skipping, did not find datasource_fqn.") + continue + from_entities = [ + self.metadata.get_by_name( + entity=Table, + fqn=datasource_fqn, + ) + ] datamodel_fqn = fqn.build( self.metadata, entity_type=DashboardDataModel, @@ -183,10 +200,17 @@ def yield_dashboard_lineage_details( ) columns_list = self._get_columns_list_for_lineage(chart_json) - column_lineage = self._get_column_lineage( - from_entity, to_entity, columns_list - ) - if from_entity and to_entity: + + if from_entities == None or len(from_entities) == 0: + print("Missing from entity") + from_entities = [] + if not to_entity: + print("Missing to entity") + for from_entity in from_entities: + print('Found both - from & To entity, adding lineage') + column_lineage = self._get_column_lineage( + from_entity, to_entity, columns_list + ) yield self._get_add_lineage_request( to_entity=to_entity, from_entity=from_entity, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/models.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/models.py index 76977f2b575d..54cb081652b6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/models.py @@ -149,6 +149,7 @@ class FetchChart(BaseModel): database_name: Optional[str] = None sqlalchemy_uri: Optional[str] = None viz_type: Optional[str] = None + sql: Optional[str] = None datasource_id: Optional[int] = None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py index 0897777f9af0..d9d01b0ba056 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py @@ -21,6 +21,7 @@ s.datasource_id, s.viz_type, t.table_name, + t.sql, t.schema, db.database_name, db.sqlalchemy_uri From e4364b1676e18b00e4233cfcd27217d83586af65 Mon Sep 17 00:00:00 2001 From: Honza Stepanovsky Date: Tue, 17 Dec 2024 15:52:35 +0100 Subject: [PATCH 2/2] cleanup --- .../metadata/ingestion/source/dashboard/superset/mixin.py | 6 ------ .../metadata/ingestion/source/dashboard/superset/queries.py | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py index 84bd87549734..d71bea6dd377 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py @@ -21,12 +21,6 @@ from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import ( SupersetConnection, ) -from metadata.generated.schema.entity.services.connections.database import postgresConnection -from metadata.generated.schema.entity.services.connections.database.common import basicAuth - -from metadata.generated.schema.entity.services.connections.dashboard.customDashboardConnection import ( - CustomDashboardConnection, -) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py index d9d01b0ba056..7a872175cf89 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/queries.py @@ -21,7 +21,7 @@ s.datasource_id, s.viz_type, t.table_name, - t.sql, + t.sql, t.schema, db.database_name, db.sqlalchemy_uri