Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Virtual datasets & jinja2 templating for Superset #19115

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@
Markdown,
SourceUrl,
)
from jinja2 import Template


from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin
from metadata.ingestion.source.dashboard.superset.models import (
Expand All @@ -67,6 +73,49 @@

logger = ingestion_logger()

def get_jinja_context():
# Define placeholder static functions and variables
def get_time_filter():
return None

columns = []
filter = ""
from_dttm = None # Deprecated
to_dttm = None # Deprecated
groupby = [] # Deprecated
metrics = []
row_limit = 1000
row_offset = 0
table_columns = []
time_column = None
time_grain = None

# Static empty function definitions for Jinja parsing
def results():
return ""

# Add context for rendering
context = {
"columns": columns,
"filter": filter,
"get_filters": lambda *_, remove_filter=False: [],
"filter_values": lambda *_, remove_filter=False: [],
"from_dttm": from_dttm,
"to_dttm": to_dttm,
"groupby": groupby,
"metrics": metrics,
"row_limit": row_limit,
"row_offset": row_offset,
"table_columns": table_columns,
"time_column": time_column,
"time_grain": time_grain,
"get_time_filter": get_time_filter,
"flags": {},
"results": results,
}

return context


class SupersetDBSource(SupersetSourceMixin):
"""
Expand All @@ -75,6 +124,7 @@ class SupersetDBSource(SupersetSourceMixin):

def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata)

self.engine: Engine = self.client

def prepare(self):
Expand Down Expand Up @@ -161,6 +211,43 @@ def yield_dashboard(
)
)

def _get_source_table_for_lineage(
self, chart_json: FetchChart, db_service_entity: DatabaseService
):
if chart_json.sql:
tpl = Template(source=chart_json.sql)
rendered_sql = tpl.render(get_jinja_context())

lineage_parser = LineageParser(
rendered_sql,
ConnectionTypeDialectMapper.dialect_of(
db_service_entity.serviceType.value
)
if db_service_entity
else None,
)

tables_list = []
for source_table in lineage_parser.source_tables or []:
database_schema_table = fqn.split_table_name(str(source_table))
database_name = get_database_name_for_lineage(
db_service_entity, database_schema_table.get("database")
)
schema_name = self.check_database_schema_name(
database_schema_table.get("database_schema")
)
table_name = database_schema_table.get("table")
from_entities = search_table_entities(
metadata=self.metadata,
database=database_name,
service_name=db_service_entity.fullyQualifiedName.root,
database_schema=schema_name,
table=table_name,
)
tables_list.extend(from_entities)

return tables_list;

def _get_datasource_fqn_for_lineage(
self, chart_json: FetchChart, db_service_entity: DatabaseService
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,26 @@ def yield_dashboard_lineage_details(
chart_json = self.all_charts.get(chart_id)
if chart_json:
try:
datasource_fqn = self._get_datasource_fqn_for_lineage(
chart_json, db_service_entity
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
from_entities = []

if chart_json.sql != "":
# multiple entities
from_entities = self._get_source_table_for_lineage(
chart_json, db_service_entity
)
else:
datasource_fqn = self._get_datasource_fqn_for_lineage(
chart_json, db_service_entity
)
if not datasource_fqn:
print("Skipping, did not find datasource_fqn.")
continue
from_entities = [
self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
]
datamodel_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
Expand All @@ -183,10 +194,17 @@ def yield_dashboard_lineage_details(
)

columns_list = self._get_columns_list_for_lineage(chart_json)
column_lineage = self._get_column_lineage(
from_entity, to_entity, columns_list
)
if from_entity and to_entity:

if from_entities == None or len(from_entities) == 0:
print("Missing from entity")
from_entities = []
if not to_entity:
print("Missing to entity")
for from_entity in from_entities:
print('Found both - from & To entity, adding lineage')
column_lineage = self._get_column_lineage(
from_entity, to_entity, columns_list
)
yield self._get_add_lineage_request(
to_entity=to_entity,
from_entity=from_entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class FetchChart(BaseModel):
database_name: Optional[str] = None
sqlalchemy_uri: Optional[str] = None
viz_type: Optional[str] = None
sql: Optional[str] = None
datasource_id: Optional[int] = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
s.datasource_id,
s.viz_type,
t.table_name,
t.sql,
t.schema,
db.database_name,
db.sqlalchemy_uri
Expand Down
Loading