Skip to content

Commit

Permalink
Merge branch 'main' into alert-bar
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin-chaurasiya authored Aug 1, 2024
2 parents 2e6ad18 + 5784f54 commit feae113
Show file tree
Hide file tree
Showing 51 changed files with 422 additions and 655 deletions.
5 changes: 5 additions & 0 deletions bootstrap/sql/migrations/native/1.5.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,8 @@ SET json = JSON_SET(
)
WHERE JSON_CONTAINS_PATH(json, 'one', '$.owner')
AND JSON_TYPE(JSON_EXTRACT(json, '$.owner')) <> 'ARRAY';

-- set templates to fetch emailTemplates
UPDATE openmetadata_settings
SET json = JSON_SET(json, '$.templates', 'openmetadata')
WHERE configType = 'emailConfiguration';
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,8 @@ SET json = jsonb_set(
WHERE jsonb_path_exists(json, '$.owner')
AND jsonb_path_query_first(json, '$.owner ? (@ != null)') IS NOT null
AND jsonb_typeof(json->'owner') <> 'array';

-- set templates to fetch emailTemplates
UPDATE openmetadata_settings
SET json = jsonb_set(json, '{templates}', '"openmetadata"')
WHERE configType = 'emailConfiguration';
1 change: 1 addition & 0 deletions conf/openmetadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ email:
username: ${SMTP_SERVER_USERNAME:-""}
password: ${SMTP_SERVER_PWD:-""}
transportationStrategy: ${SMTP_SERVER_STRATEGY:-"SMTP_TLS"}
templates: ${TEMPLATES:-"openmetadata"}

limits:
enable: ${LIMITS_ENABLED:-false}
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/examples/workflows/saperp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ source:
apiKey: api_key
databaseName: databaseName
databaseSchema: databaseSchema
paginationLimit: 10
paginationLimit: 100
sourceConfig:
config:
type: DatabaseMetadata
Expand Down
25 changes: 16 additions & 9 deletions ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ class OMetaLineageMixin(Generic[T]):
def _merge_column_lineage(
self, original: List[Dict[str, Any]], updated: List[Dict[str, Any]]
):
temp_result = []
flat_original_result = set()
flat_updated_result = set()
try:
for column in original or []:
if column.get("toColumn") and column.get("fromColumns"):
temp_result.append(
flat_original_result.add(
(*column.get("fromColumns", []), column.get("toColumn"))
)
for column in updated or []:
Expand All @@ -65,15 +66,18 @@ def _merge_column_lineage(
else:
data = column
if data.get("toColumn") and data.get("fromColumns"):
temp_result.append(
flat_updated_result.add(
(*data.get("fromColumns", []), data.get("toColumn"))
)
except Exception as exc:
logger.debug(f"Error while merging column lineage: {exc}")
logger.debug(traceback.format_exc())
union_result = flat_original_result.union(flat_updated_result)
if flat_original_result == union_result:
return original
return [
{"fromColumns": list(col_data[:-1]), "toColumn": col_data[-1]}
for col_data in set(temp_result)
for col_data in union_result
]

def _update_cache(self, request: AddLineageRequest, response: Dict[str, Any]):
Expand Down Expand Up @@ -119,7 +123,10 @@ def add_lineage(
"columnsLineage", []
)
original.edge.lineageDetails.pipeline = (
EntityReference(**edge["edge"].get("pipeline"))
EntityReference(
id=edge["edge"]["pipeline"]["id"],
type=edge["edge"]["pipeline"]["type"],
)
if edge["edge"].get("pipeline")
else None
)
Expand All @@ -141,7 +148,7 @@ def add_lineage(
original.edge.lineageDetails.pipeline
)
patch = self.patch_lineage_edge(original=original, updated=data)
if patch is not None:
if patch:
patch_op_success = True

if patch_op_success is False:
Expand Down Expand Up @@ -203,7 +210,7 @@ def patch_lineage_edge(
self,
original: AddLineageRequest,
updated: AddLineageRequest,
) -> Optional[str]:
) -> Optional[bool]:
"""
Patches a lineage edge between two entities.
Expand All @@ -229,14 +236,14 @@ def patch_lineage_edge(
f"/{original.edge.toEntity.id.root}",
data=str(patch),
)
return str(patch)
return True
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error Patching Lineage Edge {err.status_code} "
f"for {original.edge.fromEntity.fullyQualifiedName}"
)
return None
return False

def get_lineage_by_id(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MSSQL constants
"""

DEFAULT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
MSSQL_DATEFORMAT_DATETIME_MAP = {
"dmy": "%d-%m-%Y %H:%M:%S",
"dym": "%d-%Y-%m %H:%M:%S",
"ymd": DEFAULT_DATETIME_FORMAT,
"ydm": "%Y-%d-%m %H:%M:%S",
"mdy": "%m-%d-%Y %H:%M:%S",
"myd": "%m-%Y-%d %H:%M:%S",
}
24 changes: 24 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/mssql/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@
"""
MSSQL lineage module
"""
from datetime import datetime

from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.mssql.constants import (
DEFAULT_DATETIME_FORMAT,
MSSQL_DATEFORMAT_DATETIME_MAP,
)
from metadata.ingestion.source.database.mssql.queries import MSSQL_SQL_STATEMENT
from metadata.ingestion.source.database.mssql.query_parser import MssqlQueryParserSource
from metadata.ingestion.source.database.mssql.utils import (
get_sqlalchemy_engine_dateformat,
)


class MssqlLineageSource(MssqlQueryParserSource, LineageSource):
Expand All @@ -35,3 +44,18 @@ def __init__(self, *args, **kwargs):
AND lower(t.text) NOT LIKE '%%create%%function%%'
AND lower(t.text) NOT LIKE '%%declare%%'
"""

def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
"""
returns sql statement to fetch query logs.
"""
server_date_format = get_sqlalchemy_engine_dateformat(self.engine)
current_datetime_format = MSSQL_DATEFORMAT_DATETIME_MAP.get(
server_date_format, DEFAULT_DATETIME_FORMAT
)
return self.sql_stmt.format(
start_time=start_time.strftime(current_datetime_format),
end_time=end_time.strftime(current_datetime_format),
filters=self.get_filters(),
result_limit=self.source_config.resultLimit,
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.constants import (
DEFAULT_DATETIME_FORMAT,
MSSQL_DATEFORMAT_DATETIME_MAP,
)
from metadata.ingestion.source.database.mssql.models import (
STORED_PROC_LANGUAGE_MAP,
MssqlStoredProcedure,
Expand All @@ -48,6 +52,7 @@
get_columns,
get_foreign_keys,
get_pk_constraint,
get_sqlalchemy_engine_dateformat,
get_table_comment,
get_table_names,
get_unique_constraints,
Expand Down Expand Up @@ -215,8 +220,13 @@ def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
server_date_format = get_sqlalchemy_engine_dateformat(self.engine)
current_datetime_format = MSSQL_DATEFORMAT_DATETIME_MAP.get(
server_date_format, DEFAULT_DATETIME_FORMAT
)
start = start.replace(tzinfo=None).strftime(current_datetime_format)
query = MSSQL_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start.replace(tzinfo=None),
start_date=start,
)
try:
queries_dict = self.procedure_queries_dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,5 @@
;
"""
)

GET_DB_CONFIGS = textwrap.dedent("DBCC USEROPTIONS;")
17 changes: 16 additions & 1 deletion ingestion/src/metadata/ingestion/source/database/mssql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
MSSQL SQLAlchemy Helper Methods
"""

from typing import Optional

from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql, text
from sqlalchemy import types as sqltypes
from sqlalchemy import util
Expand All @@ -29,12 +31,13 @@
_switch_db,
update_wrapper,
)
from sqlalchemy.engine import reflection
from sqlalchemy.engine import Engine, reflection
from sqlalchemy.sql import func
from sqlalchemy.types import NVARCHAR
from sqlalchemy.util import compat

from metadata.ingestion.source.database.mssql.queries import (
GET_DB_CONFIGS,
MSSQL_ALL_VIEW_DEFINITIONS,
MSSQL_GET_FOREIGN_KEY,
MSSQL_GET_TABLE_COMMENTS,
Expand Down Expand Up @@ -488,3 +491,15 @@ def get_view_names(
)
view_names = [r[0] for r in connection.execute(query_)]
return view_names


def get_sqlalchemy_engine_dateformat(engine: Engine) -> Optional[str]:
"""
returns sqlaclhemdy engine date format by running config query
"""
result = engine.execute(GET_DB_CONFIGS)
for row in result:
row_dict = dict(row)
if row_dict.get("Set Option") == "dateformat":
return row_dict.get("Value")
return
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, config: SapErpConnection):
api_version="v1",
allow_redirects=True,
retry_codes=[500, 504],
retry_wait=5,
retry_wait=2,
verify=get_verify_ssl(config.sslConfig),
)
self.client = REST(client_config)
Expand Down
46 changes: 29 additions & 17 deletions ingestion/src/metadata/ingestion/source/database/saperp/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
SAP ERP source module
"""
import traceback
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Tuple

from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
Expand Down Expand Up @@ -127,23 +127,20 @@ def get_tables_name_and_type(self) -> Optional[Iterable[SapErpTable]]:
)

def _check_col_length( # pylint: disable=arguments-differ
self, datatype: str, col_length: Optional[str]
) -> Optional[int]:
self, datatype: str, col_length: Optional[str], col_decimals: Optional[str]
) -> Tuple[Optional[int], Optional[int]]:
"""
return the column length for the dataLength attribute
"""
try:
if datatype and datatype.upper() in {
"CHAR",
"VARCHAR",
"BINARY",
"VARBINARY",
}:
return int(col_length) if col_length else None
return (
int(col_length) if col_length else None,
int(col_decimals) if col_decimals else None,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch column length: {exc}")
return None
return None, None

def _get_table_constraints(
self, columns: Optional[List[Column]]
Expand Down Expand Up @@ -190,14 +187,21 @@ def _get_column_constraint(self, column: SapErpColumn, pk_columns: List[str]):
return Constraint.NOT_NULL if column.notnull == "X" else Constraint.NULL

def _get_display_datatype( # pylint: disable=arguments-differ
self, column_type: str, col_data_length: Optional[int]
self,
column_type: str,
col_data_length: Optional[int],
decimals: Optional[int],
sap_column_type: Optional[str],
) -> str:
"""
Method to get the display datatype
"""
column_type_name = sap_column_type if sap_column_type else column_type
if col_data_length and decimals:
return f"{column_type_name}({str(col_data_length)},{str(decimals)})"
if col_data_length:
return f"{column_type}({str(col_data_length)})"
return column_type
return f"{column_type_name}({str(col_data_length)})"
return column_type_name

def get_columns_and_constraints( # pylint: disable=arguments-differ
self, table_name: str
Expand All @@ -211,8 +215,10 @@ def get_columns_and_constraints( # pylint: disable=arguments-differ
for sap_column in sap_columns or []:
try:
column_type = ColumnTypeParser.get_column_type(sap_column.datatype)
col_data_length = self._check_col_length(
datatype=column_type, col_length=sap_column.leng
col_data_length, col_decimal_length = self._check_col_length(
datatype=column_type,
col_length=sap_column.leng,
col_decimals=sap_column.decimals,
)
column_name = (
f"{sap_column.fieldname}({sap_column.precfield})"
Expand All @@ -226,7 +232,10 @@ def get_columns_and_constraints( # pylint: disable=arguments-differ
f"Unknown type {repr(sap_column.datatype)}: {sap_column.fieldname}"
)
data_type_display = self._get_display_datatype(
column_type, col_data_length
column_type,
col_data_length,
col_decimal_length,
sap_column.datatype,
)
col_data_length = 1 if col_data_length is None else col_data_length
om_column = Column(
Expand All @@ -251,6 +260,9 @@ def get_columns_and_constraints( # pylint: disable=arguments-differ
)
if column_type == DataType.ARRAY.value:
om_column.arrayDataType = DataType.UNKNOWN
if col_data_length and col_decimal_length:
om_column.precision = col_data_length
om_column.scale = col_decimal_length
om_columns.append(om_column)
except Exception as exc:
logger.debug(traceback.format_exc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class SapErpColumn(BaseModel):
i_ddtext: Optional[str] = None
dd_text: Optional[str] = None
leng: Optional[str] = None
decimals: Optional[str] = None


class SapErpTableList(BaseModel):
Expand Down
4 changes: 2 additions & 2 deletions ingestion/tests/cli_e2e/base/test_cli_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def test_entities(self) -> None:
self.assertIsNotNone(data_model.upstream)
self.assertIsNotNone(data_model.description)
self.assertIsNotNone(table.description)
self.assertIsNotNone(data_model.owner)
self.assertIsNotNone(table.owner)
self.assertIsNotNone(data_model.owners)
self.assertIsNotNone(table.owners)
self.assertTrue(len(data_model.tags) > 0)
self.assertTrue(len(table.tags) > 0)

Expand Down
Loading

0 comments on commit feae113

Please sign in to comment.