Skip to content

Commit

Permalink
Merge branch 'main' into fixed-service-ingesion-aut
Browse files Browse the repository at this point in the history
  • Loading branch information
ShaileshParmar11 authored Nov 28, 2024
2 parents 3baa746 + e22fc6d commit 5e0b399
Show file tree
Hide file tree
Showing 219 changed files with 6,831 additions and 2,257 deletions.
26 changes: 26 additions & 0 deletions bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1747,3 +1747,29 @@ WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceConfig.config.processPiiSensitive', '$.sourceConfig.config.confidence', '$.sourceConfig.config.generateSampleData')
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'profiler';

-- Rename 'jobId' to 'jobIds', set 'jobId' as type array in 'jobIds' , add 'projectIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = JSON_SET(
JSON_REMOVE(
json,
'$.connection.config.jobId'
),
'$.connection.config.jobIds',
IF(
JSON_CONTAINS_PATH(json, 'one', '$.connection.config.jobIds'),
JSON_EXTRACT(json, '$.connection.config.jobIds'),
IF(
JSON_EXTRACT(json, '$.connection.config.jobId') IS NOT NULL,
JSON_ARRAY(JSON_UNQUOTE(JSON_EXTRACT(json, '$.connection.config.jobId'))),
JSON_ARRAY()
)
),
'$.connection.config.projectIds',
IF(
JSON_CONTAINS_PATH(json, 'one', '$.connection.config.projectIds'),
JSON_EXTRACT(json, '$.connection.config.projectIds'),
JSON_ARRAY()
)
)
WHERE serviceType = 'DBTCloud';
24 changes: 23 additions & 1 deletion bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1733,4 +1733,26 @@ WHERE json #>> '{pipelineType}' = 'metadata';
-- classification and sampling configs from the profiler pipelines
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,processPiiSensitive}' #- '{sourceConfig,config,confidence}' #- '{sourceConfig,config,generateSampleData}'
WHERE json #>> '{pipelineType}' = 'profiler';
WHERE json #>> '{pipelineType}' = 'profiler';

-- set value of 'jobId' as an array into 'jobIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = (case when json#>>'{connection, config, jobId}' IS NOT null
then
jsonb_set(json, '{connection, config, jobIds}', to_jsonb(ARRAY[json#>>'{connection, config, jobId}']), true)
else
jsonb_set(json, '{connection, config, jobIds}', '[]', true)
end
)
WHERE servicetype = 'DBTCloud';

-- remove 'jobId' after setting 'jobIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = json::jsonb #- '{connection,config,jobId}'
WHERE json#>>'{connection, config, jobId}' IS NOT null
and servicetype = 'DBTCloud';

-- add 'projectIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = jsonb_set(json, '{connection, config, projectIds}', '[]', true)
WHERE servicetype = 'DBTCloud';

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
Validators are test classes (e.g. columnValuesToBeBetween, etc.)
"""

from abc import ABC, abstractmethod
from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING, Set, Type, Union

from metadata.data_quality.validations.base_test_handler import BaseTestValidator
Expand All @@ -25,48 +25,67 @@
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils import importer
from metadata.utils.importer import import_test_case_class

if TYPE_CHECKING:
from pandas import DataFrame


class IValidatorBuilder(ABC):
"""Interface for validator builders"""
class TestCaseImporter:
def import_test_case_validator(
self,
test_type: str,
runner_type: str,
test_definition: str,
) -> Type[BaseTestValidator]:
return import_test_case_class(test_type, runner_type, test_definition)

@property
def test_case(self):
"""Return the test case object"""
return self._test_case

@property
def validator(self):
"""Return the validator object"""
return self._validator
class SourceType(Enum):
PANDAS = "pandas"
SQL = "sqlalchemy"


class ValidatorBuilder(TestCaseImporter):
"""Interface for validator builders"""

def __init__(
self,
runner: Union[QueryRunner, "DataFrame"],
test_case: TestCase,
source_type: SourceType,
entity_type: str,
) -> None:
"""Builder object for SQA validators. This builder is used to create a validator object
Args:
runner (QueryRunner): The runner object
test_case (TestCase): The test case object
source_type (SourceType): The source type
entity_type (str): one of COLUMN or TABLE -- fetched from the test definition
"""
super().__init__()
self._test_case = test_case
self.runner = runner
# TODO this will be removed on https://github.com/open-metadata/OpenMetadata/pull/18716
self.validator_cls: Type[BaseTestValidator] = importer.import_test_case_class(
self.validator_cls: Type[
BaseTestValidator
] = super().import_test_case_validator(
entity_type,
self._get_source_type(),
self.test_case.testDefinition.fullyQualifiedName, # type: ignore
source_type.value,
self.test_case.testDefinition.fullyQualifiedName,
)
self.reset()

@property
def test_case(self):
"""Return the test case object"""
return self._test_case

@property
def validator(self):
"""Return the validator object"""
return self._validator

def set_runtime_params(self, runtime_params_setters: Set[RuntimeParameterSetter]):
"""Set the runtime parameters for the validator object
Expand All @@ -93,8 +112,3 @@ def reset(self):
int(datetime.now(tz=timezone.utc).timestamp() * 1000)
),
)

@abstractmethod
def _get_source_type(self):
"""Get the source type"""
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.pandas_validator_builder import (
PandasValidatorBuilder,

from metadata.data_quality.builders.validator_builder import (
SourceType,
ValidatorBuilder,
)
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table
Expand Down Expand Up @@ -44,27 +45,30 @@ def __init__(
ometa_client: OpenMetadata,
sampler: SamplerInterface,
table_entity: Table,
**__,
**kwargs,
):
super().__init__(
service_connection_config,
ometa_client,
sampler,
table_entity,
**kwargs,
)

(
self.table_sample_query,
self.table_sample_config,
self.table_partition_config,
self.sample_query,
self.profile_sample_config,
self.partition_details,
) = self._get_table_config()

# add partition logic to test suite
self.dfs = self.sampler.table
if self.dfs and self.table_partition_config:
self.dfs = self.get_partitioned_df(self.dfs)
self.dataset = self.sampler.get_dataset()

def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
return PandasValidatorBuilder(self.dfs, test_case, entity_type)
) -> ValidatorBuilder:
return self.validator_builder_class(
runner=self.dataset,
test_case=test_case,
entity_type=entity_type,
source_type=SourceType.PANDAS,
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.orm.util import AliasedClass

from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.sqa_validator_builder import SQAValidatorBuilder
from metadata.data_quality.builders.validator_builder import (
SourceType,
ValidatorBuilder,
)
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
Expand Down Expand Up @@ -51,16 +53,13 @@ def __init__(
ometa_client: OpenMetadata,
sampler: SamplerInterface,
table_entity: Table = None,
orm_table=None,
**kwargs,
):
super().__init__(
service_connection_config,
ometa_client,
sampler,
table_entity,
service_connection_config, ometa_client, sampler, table_entity, **kwargs
)
self.source_type = SourceType.SQL
self.create_session()
self._table = orm_table

(
self.table_sample_query,
Expand All @@ -76,7 +75,7 @@ def create_session(self):
)

@property
def sample(self) -> Union[DeclarativeMeta, AliasedClass]:
def dataset(self) -> Union[DeclarativeMeta, AliasedClass]:
"""_summary_
Returns:
Expand All @@ -87,7 +86,7 @@ def sample(self) -> Union[DeclarativeMeta, AliasedClass]:
"You must create a sampler first `<instance>.create_sampler(...)`."
)

return self.sampler.random_sample()
return self.sampler.get_dataset()

@property
def runner(self) -> QueryRunner:
Expand All @@ -98,29 +97,24 @@ def runner(self) -> QueryRunner:
"""
return self._runner

@property
def table(self):
"""getter method for the table object
Returns:
Table: table object
"""
return self._table

def _create_runner(self) -> None:
def _create_runner(self) -> QueryRunner:
"""Create a QueryRunner Instance"""

return cls_timeout(TEN_MIN)(
QueryRunner(
session=self.session,
table=self.table,
sample=self.sample,
dataset=self.dataset,
partition_details=self.table_partition_config,
profile_sample_query=self.table_sample_query,
)
)

def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
return SQAValidatorBuilder(self.runner, test_case, entity_type)
) -> ValidatorBuilder:
return self.validator_builder_class(
runner=self.runner,
test_case=test_case,
entity_type=entity_type,
source_type=self.source_type,
)
Loading

0 comments on commit 5e0b399

Please sign in to comment.