Skip to content

Commit

Permalink
copy db
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed Nov 14, 2024
1 parent da82e91 commit c8956cb
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ProfilerSource(ProfilerSourceInterface):
def __init__(
self,
config: OpenMetadataWorkflowConfig,
database: DatabaseService,
database: Database,
ometa_client: OpenMetadata,
global_profiler_configuration: ProfilerConfiguration,
):
Expand Down Expand Up @@ -114,7 +114,7 @@ def _build_table_orm(self, entity: Table) -> Optional[DeclarativeMeta]:
return None

def _copy_service_config(
self, config: OpenMetadataWorkflowConfig, database: DatabaseService
self, config: OpenMetadataWorkflowConfig, database: Database
) -> DatabaseConnection:
"""Make a copy of the service config and update the database name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

from copy import deepcopy

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
Expand All @@ -34,14 +34,14 @@ class BigQueryProfilerSource(ProfilerSource):
"""override the base profiler source to handle BigQuery specific connection configs"""

def _copy_service_config(
self, config: OpenMetadataWorkflowConfig, database: DatabaseService
self, config: OpenMetadataWorkflowConfig, database: Database
) -> BigQueryConnection:
"""Make a copy of the database connection config. If MultiProjectId is used, replace it
with SingleProjectId with the database name being profiled. We iterate over all non filtered
database in workflow.py `def execute`.
Args:
database (DatabaseService): a database entity
database (Database): a database entity
Returns:
DatabaseConnection
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""Extend the ProfilerSource class to add support for Databricks is_disconnect SQA method"""

from metadata.generated.schema.configuration.profilerConfiguration import (
ProfilerConfiguration,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
Expand All @@ -25,7 +24,7 @@ class DataBricksProfilerSource(ProfilerSource):
def __init__(
self,
config: OpenMetadataWorkflowConfig,
database: DatabaseService,
database: Database,
ometa_client: OpenMetadata,
global_profiler_config: ProfilerConfiguration,
):
Expand Down
14 changes: 7 additions & 7 deletions ingestion/src/metadata/sampler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeMeta

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
Expand Down Expand Up @@ -84,10 +82,12 @@ def _run(self, record: ProfilerSourceAndEntity) -> Either[SamplerResponse]:

try:
entity = cast(Table, record.entity)
schema_entity, database_entity, db_service = get_context_entities(
schema_entity, database_entity, _ = get_context_entities(
entity=entity, metadata=self.metadata
)
service_conn_config = self._copy_service_config(self.config, db_service)
service_conn_config = self._copy_service_config(
self.config, database_entity
)
sqa_metadata = _get_sqa_metadata(str(service_conn_config.type.value))

_orm = self._build_table_orm(entity, sqa_metadata)
Expand Down Expand Up @@ -147,7 +147,7 @@ def _build_table_orm(
)

def _copy_service_config(
self, config: OpenMetadataWorkflowConfig, database: DatabaseService
self, config: OpenMetadataWorkflowConfig, database: Database
) -> DatabaseConnection:
"""Make a copy of the service config and update the database name
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/sampler/sampler_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,4 @@ def generate_sample_data(self) -> Optional[TableData]:
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching sample data: {err}")
return None
raise err

0 comments on commit c8956cb

Please sign in to comment.