Skip to content

Commit

Permalink
Merge branch 'main' into gen-1322
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin-chaurasiya authored Sep 29, 2024
2 parents 142b7ca + b9e6360 commit 781c753
Show file tree
Hide file tree
Showing 147 changed files with 22,157 additions and 673 deletions.
2 changes: 2 additions & 0 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@
VERSIONS["lkml"],
"gitpython~=3.1.34",
VERSIONS["giturlparse"],
"python-liquid",
},
"mlflow": {"mlflow-skinny>=2.3.0"},
"mongo": {VERSIONS["mongo"], VERSIONS["pandas"], VERSIONS["numpy"]},
Expand Down Expand Up @@ -299,6 +300,7 @@
"psycopg2-binary",
VERSIONS["geoalchemy2"],
},
"mstr": {"mstr-rest-requests==0.14.1"},
"sagemaker": {VERSIONS["boto3"]},
"salesforce": {"simple_salesforce~=1.11"},
"sample-data": {VERSIONS["avro"], VERSIONS["grpc-tools"]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import giturlparse
import lkml
from liquid import Template
from looker_sdk.sdk.api40.methods import Looker40SDK
from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard
from looker_sdk.sdk.api40.models import (
Expand Down Expand Up @@ -450,11 +451,11 @@ def yield_bulk_datamodel(
view.name, "Data model (View) filtered out."
)
continue

view_name = view.from_ if view.from_ else view.name
yield from self._process_view(
view_name=ViewName(view.name), explore=model
view_name=ViewName(view_name), explore=model
)
if len(model.joins) == 0 and model.sql_table_name:
if model.view_name:
yield from self._process_view(
view_name=ViewName(model.view_name), explore=model
)
Expand Down Expand Up @@ -570,7 +571,8 @@ def add_view_lineage(
db_service_names = self.get_db_service_names()

if view.sql_table_name:
source_table_name = self._clean_table_name(view.sql_table_name)
sql_table_name = self._render_table_name(view.sql_table_name)
source_table_name = self._clean_table_name(sql_table_name)

# View to the source is only there if we are informing the dbServiceNames
for db_service_name in db_service_names or []:
Expand Down Expand Up @@ -726,6 +728,33 @@ def _clean_table_name(table_name: str) -> str:

return table_name.lower().split(" as ")[0].strip()

@staticmethod
def _render_table_name(table_name: str) -> str:
"""
sql_table_names might contain Liquid templates
when defining an explore. e.g,:
sql_table_name:
{% if openmetadata %}
event
{% elsif event.created_week._in_query %}
event_by_week
{% else %}
event
{% endif %} ;;
we should render the template and give the option
to render a specific value during metadata ingestion
using the "openmetadata" context argument
:param table_name: table name with possible templating
:return: rendered table name
"""
try:
context = {"openmetadata": True}
template = Template(table_name)
sql_table_name = template.render(context)
except Exception:
sql_table_name = table_name
return sql_table_name

@staticmethod
def get_dashboard_sources(dashboard_details: LookerDashboard) -> Set[str]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,12 @@ def create_datamodel_report_lineage(
to_entity=report_entity, from_entity=datamodel_entity
)

# create the lineage between table and datamodel
yield from self.create_table_datamodel_lineage(
db_service_name=db_service_name,
tables=dataset.tables,
datamodel_entity=datamodel_entity,
)
for table in dataset.tables or []:
yield self._get_table_and_datamodel_lineage(
db_service_name=db_service_name,
table=table,
datamodel_entity=datamodel_entity,
)

# create the lineage between table and datamodel using the pbit files
if self.client.file_client:
Expand Down Expand Up @@ -678,20 +678,6 @@ def create_table_datamodel_lineage_from_files(
)
)

def create_table_datamodel_lineage(
self,
db_service_name: str,
tables: Optional[List[PowerBiTable]],
datamodel_entity: Optional[DashboardDataModel],
) -> Iterable[Either[CreateDashboardRequest]]:
"""Method to create lineage between table and datamodels"""
for table in tables or []:
yield self._get_table_and_datamodel_lineage(
db_service_name=db_service_name,
table=table,
datamodel_entity=datamodel_entity,
)

def yield_dashboard_lineage_details(
self,
dashboard_details: Union[PowerBIDashboard, PowerBIReport],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def remove_manifest_non_required_keys(self, manifest_dict: dict):
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}

for node, value in manifest_dict.get( # pylint: disable=unused-variable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:

parameters = {
"warehouse": catalog.warehouseLocation,
"uri": catalog.connection.uri,
"uri": str(catalog.connection.uri),
"credential": credential,
"token": catalog.connection.token.get_secret_value()
if catalog.connection.token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def fn(self, res: Dict[str, Any]) -> Optional[float]:
results of other Metrics
"""

count = res.get(Count.name())
null_count = res.get(NullCount.name())
if count + null_count == 0:
count = res.get(Count.name(), 0)
null_count = res.get(NullCount.name(), 0)
total = count + null_count
if total == 0:
return None
return null_count / (null_count + count)
return null_count / total
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ def compute(self):
)

rest = self._runner._session.execute(query).first()
if not rest:
return None
if rest.rowCount is None:
# if we don't have any row count, fallback to the base logic
return super().compute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
NULL_BYTE = "\x00"


class HexByteString(TypeDecorator):
Expand Down Expand Up @@ -63,10 +64,22 @@ def process_result_value(self, value: Optional[bytes], dialect) -> Optional[str]
detected_encoding = chardet.detect(bytes_value).get("encoding")
if detected_encoding:
try:
value = bytes_value.decode(encoding=detected_encoding)
return value
# Decode the bytes value with the detected encoding and replace errors with "?"
# if bytes cannot be decoded e.g. b"\x66\x67\x67\x9c", if detected_encoding="utf-8"
# will result in 'foo�' (instead of failing)
str_value = bytes_value.decode(
encoding=detected_encoding, errors="replace"
)
# Replace NULL_BYTE with empty string to avoid errors with
# the database client (should be O(n))
str_value = (
str_value.replace(NULL_BYTE, "")
if NULL_BYTE in str_value
else str_value
)
return str_value
except Exception as exc:
logger.debug("Failed to parse bytes valud as string: %s", exc)
logger.debug("Failed to parse bytes value as string: %s", exc)
logger.debug(traceback.format_exc())

return value.hex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class SnowflakeSampler(SQASampler):
run the query in the whole table.
"""

# pylint: disable=too-many-arguments
def __init__(
self,
client,
Expand Down
Loading

0 comments on commit 781c753

Please sign in to comment.