Skip to content

Commit

Permalink
Create ometa client once and pass it around & improve pycln config (#…
Browse files Browse the repository at this point in the history
…13310)

* Create ometa client once and pass it around & improve pycln config

* Fix

* Fix

* Fix tests

* Fix maven ci

* Fix tests

* Fix tests

* Fix tests

* Format

* Fix DI
  • Loading branch information
pmbrull authored Oct 4, 2023
1 parent 1a90c5c commit 0282574
Show file tree
Hide file tree
Showing 134 changed files with 452 additions and 711 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ lint: ## Run pylint on the Python sources to analyze the codebase

.PHONY: py_format
py_format: ## Run black and isort to format the Python codebase
pycln ingestion/ openmetadata-airflow-apis/ --extend-exclude $(PY_SOURCE)/metadata/generated
pycln ingestion/ openmetadata-airflow-apis/ --extend-exclude $(PY_SOURCE)/metadata/generated --all
isort ingestion/ openmetadata-airflow-apis/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/env --skip ingestion/build --skip openmetadata-airflow-apis/build --profile black --multi-line 3
black ingestion/ openmetadata-airflow-apis/ --extend-exclude $(PY_SOURCE)/metadata/generated

.PHONY: py_format_check
py_format_check: ## Check if Python sources are correctly formatted
pycln ingestion/ openmetadata-airflow-apis/ --diff --extend-exclude $(PY_SOURCE)/metadata/generated
isort --check-only ingestion/ openmetadata-airflow-apis/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3
pycln ingestion/ openmetadata-airflow-apis/ --diff --extend-exclude $(PY_SOURCE)/metadata/generated --all
isort --check-only ingestion/ openmetadata-airflow-apis/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --skip openmetadata-airflow-apis/build --profile black --multi-line 3
black --check --diff ingestion/ openmetadata-airflow-apis/ --extend-exclude $(PY_SOURCE)/metadata/generated
PYTHONPATH="${PYTHONPATH}:./ingestion/plugins" pylint --fail-under=10 $(PY_SOURCE)/metadata --ignore-paths $(PY_SOURCE)/metadata/generated || (echo "PyLint error code $$?"; exit 1)

Expand Down
8 changes: 2 additions & 6 deletions ingestion/src/metadata/ingestion/api/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
from abc import ABC, abstractmethod
from typing import Iterable, Optional

from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.closeable import Closeable
from metadata.ingestion.api.models import Either, Entity, StackTraceError
from metadata.ingestion.api.status import Status
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -44,9 +42,7 @@ def __init__(self):

@classmethod
@abstractmethod
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection
) -> "Step":
def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step":
pass

def get_status(self) -> Status:
Expand Down
12 changes: 4 additions & 8 deletions ingestion/src/metadata/ingestion/bulksink/metadata_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
Table,
TableJoins,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.lifeCycle import AccessDetails, LifeCycle
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableUsageCount
Expand Down Expand Up @@ -78,22 +75,21 @@ class MetadataUsageBulkSink(BulkSink):
def __init__(
self,
config: MetadataUsageSinkConfig,
metadata_config: OpenMetadataConnection,
metadata: OpenMetadata,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.service_name = None
self.wrote_something = False
self.metadata = OpenMetadata(self.metadata_config)
self.metadata = metadata
self.table_join_dict = {}
self.table_usage_map = {}
self.today = datetime.today().strftime("%Y-%m-%d")

@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
def create(cls, config_dict: dict, metadata: OpenMetadata):
config = MetadataUsageSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
return cls(config, metadata)

def __populate_table_usage_map(
self, table_entity: Table, table_usage: TableUsageCount
Expand Down
14 changes: 5 additions & 9 deletions ingestion/src/metadata/ingestion/processor/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
from typing import Optional

from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.type.basic import DateTime
from metadata.generated.schema.type.queryParserData import ParsedData, QueryParserData
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import Processor
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds

Expand Down Expand Up @@ -78,21 +76,19 @@ class QueryParserProcessor(Processor):
def __init__(
self,
config: ConfigModel,
metadata_config: OpenMetadataConnection,
metadata: OpenMetadata,
connection_type: str,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = metadata
self.connection_type = connection_type

@classmethod
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs
):
def create(cls, config_dict: dict, metadata: OpenMetadata, **kwargs):
config = ConfigModel.parse_obj(config_dict)
connection_type = kwargs.pop("connection_type", "")
return cls(config, metadata_config, connection_type)
return cls(config, metadata, connection_type)

def _run(self, record: TableQueries) -> Optional[Either[QueryParserData]]:
if record and record.queries:
Expand Down
10 changes: 3 additions & 7 deletions ingestion/src/metadata/ingestion/sink/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
import pathlib

from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import Sink
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.logger import get_log_name, ingestion_logger

Expand All @@ -41,21 +39,19 @@ class FileSink(Sink):
def __init__(
self,
config: FileSinkConfig,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
fpath = pathlib.Path(self.config.filename)
# pylint: disable=consider-using-with
self.file = fpath.open("w", encoding=UTF_8)
self.file.write("[\n")
self.wrote_something = False

@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
def create(cls, config_dict: dict, _: OpenMetadata):
config = FileSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
return cls(config)

def _run(self, record: Entity, *_, **__) -> Either[str]:
if self.wrote_something:
Expand Down
14 changes: 4 additions & 10 deletions ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
)
from metadata.generated.schema.entity.data.table import DataModel, Table
from metadata.generated.schema.entity.data.topic import TopicSampleData
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
Expand Down Expand Up @@ -96,22 +93,19 @@ class MetadataRestSink(Sink):
# We want to catch any errors that might happen during the sink
# pylint: disable=broad-except

def __init__(
self, config: MetadataRestSinkConfig, metadata_config: OpenMetadataConnection
):
def __init__(self, config: MetadataRestSinkConfig, metadata: OpenMetadata):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.wrote_something = False
self.charts_dict = {}
self.metadata = OpenMetadata(self.metadata_config)
self.metadata = metadata
self.role_entities = {}
self.team_entities = {}

@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
def create(cls, config_dict: dict, metadata: OpenMetadata):
config = MetadataRestSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
return cls(config, metadata)

@singledispatchmethod
def _run_dispatch(self, record: Entity) -> Either[Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
DashboardService,
Expand Down Expand Up @@ -200,12 +197,11 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
metadata: OpenMetadata,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.metadata = metadata
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: DashboardServiceMetadataPipeline = (
self.config.sourceConfig.config
Expand All @@ -216,8 +212,6 @@ def __init__(
self.connection_obj = self.client
self.test_connection()

self.metadata_client = OpenMetadata(self.metadata_config)

@abstractmethod
def yield_dashboard(
self, dashboard_details: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
Expand All @@ -57,14 +58,14 @@ class DomodashboardSource(DashboardServiceSource):
metadata_config: OpenMetadataConnection

@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
def create(cls, config_dict, metadata: OpenMetadata):
config = WorkflowSource.parse_obj(config_dict)
connection: DomoDashboardConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DomoDashboardConnection):
raise InvalidSourceException(
f"Expected DomoDashboardConnection, but got {connection}"
)
return cls(config, metadata_config)
return cls(config, metadata)

def get_dashboards_list(self) -> Optional[List[DomoDashboardDetails]]:
dashboards = self.client.domo.page_list()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.lightdash.models import (
LightdashChart,
Expand All @@ -49,21 +50,21 @@ class LightdashSource(DashboardServiceSource):
metadata_config: OpenMetadataConnection

@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
def create(cls, config_dict, metadata: OpenMetadata):
config = WorkflowSource.parse_obj(config_dict)
connection: LightdashConnection = config.serviceConnection.__root__.config
if not isinstance(connection, LightdashConnection):
raise InvalidSourceException(
f"Expected LightdashConnection, but got {connection}"
)
return cls(config, metadata_config)
return cls(config, metadata)

def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
metadata: OpenMetadata,
):
super().__init__(config, metadata_config)
super().__init__(config, metadata)
self.charts: List[LightdashChart] = []

def prepare(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@
from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import (
LookerConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
Expand All @@ -74,6 +71,7 @@
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
DashboardServiceSource,
DashboardUsage,
Expand Down Expand Up @@ -137,15 +135,15 @@ class LookerSource(DashboardServiceSource):
"""

config: WorkflowSource
metadata_config: OpenMetadataConnection
metadata: OpenMetadata
client: Looker40SDK

def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
metadata: OpenMetadata,
):
super().__init__(config, metadata_config)
super().__init__(config, metadata)
self.today = datetime.now().strftime("%Y-%m-%d")

self._explores_cache = {}
Expand All @@ -154,16 +152,14 @@ def __init__(
self._project_parsers: Optional[Dict[str, LkmlParser]] = None

@classmethod
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection
) -> "LookerSource":
def create(cls, config_dict: dict, metadata: OpenMetadata) -> "LookerSource":
config = WorkflowSource.parse_obj(config_dict)
connection: LookerConnection = config.serviceConnection.__root__.config
if not isinstance(connection, LookerConnection):
raise InvalidSourceException(
f"Expected LookerConnection, but got {connection}"
)
return cls(config, metadata_config)
return cls(config, metadata)

@property
def parser(self) -> Optional[Dict[str, LkmlParser]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.metabase.models import (
MetabaseChart,
Expand Down Expand Up @@ -61,21 +62,21 @@ class MetabaseSource(DashboardServiceSource):
metadata_config: OpenMetadataConnection

@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
def create(cls, config_dict, metadata: OpenMetadata):
config = WorkflowSource.parse_obj(config_dict)
connection: MetabaseConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MetabaseConnection):
raise InvalidSourceException(
f"Expected MetabaseConnection, but got {connection}"
)
return cls(config, metadata_config)
return cls(config, metadata)

def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
metadata: OpenMetadata,
):
super().__init__(config, metadata_config)
super().__init__(config, metadata)
self.collections: List[MetabaseCollection] = []

def prepare(self):
Expand Down
Loading

0 comments on commit 0282574

Please sign in to comment.