Skip to content

Commit

Permalink
MINOR: Refactor output_handlers to a WorkflowOutputHandler class (#17149
Browse files Browse the repository at this point in the history
)

* Refactor output_handlers to a WorkflowOutputHandler class

* Add old methods as deprecated to avoid breaking changes

* Extract WorkflowInitErrorHandler from workflow_output_handler

* Fix static checks

* Fix tests

* Fix tests

* Update code based on comments from PR

* Update comment
  • Loading branch information
IceS2 authored Jul 29, 2024
1 parent 20754ab commit c522f14
Show file tree
Hide file tree
Showing 50 changed files with 612 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import yaml
from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -63,7 +61,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import yaml
from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -72,7 +70,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
4 changes: 1 addition & 3 deletions ingestion/examples/airflow/dags/airflow_sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import yaml
from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -62,7 +60,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
4 changes: 1 addition & 3 deletions ingestion/examples/airflow/dags/airflow_sample_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -88,7 +86,7 @@ def metadata_ingestion_workflow():
workflow = UsageWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
3 changes: 1 addition & 2 deletions ingestion/operators/docker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import print_status

WORKFLOW_MAP = {
PipelineType.metadata.value: MetadataWorkflow,
Expand Down Expand Up @@ -109,7 +108,7 @@ def main():
workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
11 changes: 10 additions & 1 deletion ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,17 @@ ignore = [
"src/metadata/readers/*",
"src/metadata/timer/*",
"src/metadata/utils/*",
"src/metadata/workflow/*",
"src/metadata/workflow/base.py",
"src/metadata/workflow/application.py",
"src/metadata/workflow/data_insight.py",
"src/metadata/workflow/data_quality.py",
"src/metadata/workflow/ingestion.py",
"src/metadata/workflow/metadata.py",
"src/metadata/workflow/profiler.py",
"src/metadata/workflow/usage.py",
"src/metadata/workflow/workflow_status_mixin.py",
]

reportDeprecated = false
reportMissingTypeStubs = false
reportAny = false
3 changes: 1 addition & 2 deletions ingestion/src/metadata/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.application import ApplicationWorkflow
from metadata.workflow.application_output_handler import print_status

logger = cli_logger()

Expand All @@ -42,5 +41,5 @@ def run_app(config_path: Path) -> None:

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
15 changes: 8 additions & 7 deletions ingestion/src/metadata/cli/dataquality.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()

Expand All @@ -42,10 +41,12 @@ def run_test(config_path: Path) -> None:
workflow = TestSuiteWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.TEST)
WorkflowInitErrorHandler.print_init_error(
exc, workflow_config_dict, PipelineType.TestSuite
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
15 changes: 8 additions & 7 deletions ingestion/src/metadata/cli/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()

Expand All @@ -42,10 +41,12 @@ def run_ingest(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.metadata
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
15 changes: 8 additions & 7 deletions ingestion/src/metadata/cli/insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()

Expand All @@ -42,10 +41,12 @@ def run_insight(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INSIGHT)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.dataInsight
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
9 changes: 7 additions & 2 deletions ingestion/src/metadata/cli/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.logger import cli_logger
from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()

Expand Down Expand Up @@ -53,7 +56,9 @@ def run_lineage(config_path: Path) -> None:

except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.lineage
)
sys.exit(1)

if workflow.filePath:
Expand Down
15 changes: 8 additions & 7 deletions ingestion/src/metadata/cli/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()

Expand All @@ -42,10 +41,12 @@ def run_profiler(config_path: Path) -> None:
workflow = ProfilerWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE)
WorkflowInitErrorHandler.print_init_error(
exc, workflow_config_dict, PipelineType.profiler
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
13 changes: 6 additions & 7 deletions ingestion/src/metadata/cli/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()

Expand All @@ -42,10 +41,10 @@ def run_usage(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.usage)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
6 changes: 3 additions & 3 deletions ingestion/src/metadata/ingestion/ometa/client_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
OpenMetadataConnection,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.ometa_api import C, OpenMetadata, T
from metadata.utils import fqn
from metadata.utils.logger import ometa_logger

Expand All @@ -28,7 +28,7 @@

def create_ometa_client(
metadata_config: OpenMetadataConnection,
) -> OpenMetadata:
) -> OpenMetadata[T, C]: # pyright: ignore[reportInvalidTypeVarUse]
"""Create an OpenMetadata client
Args:
Expand All @@ -38,7 +38,7 @@ def create_ometa_client(
OpenMetadata: an OM client
"""
try:
metadata = OpenMetadata(metadata_config)
metadata = OpenMetadata[T, C](metadata_config)
metadata.health_check()
return metadata
except Exception as exc:
Expand Down
7 changes: 4 additions & 3 deletions ingestion/src/metadata/utils/class_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig

SERVICE_TYPE_REF = {
ServiceType.Database.value: "databaseService",
Expand Down Expand Up @@ -102,14 +103,14 @@ def _clean(source_type: str):
return source_type


def get_pipeline_type_from_source_config(source_config_type) -> PipelineType:
def get_pipeline_type_from_source_config(source_config: SourceConfig) -> PipelineType:
"""From the YAML serviceType, get the Ingestion Pipeline Type"""
pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get(
source_config_type.__class__.__name__
source_config.config.__class__.__name__
)
if not pipeline_type:
raise ValueError(
f"Cannot find Pipeline Type for SourceConfig {source_config_type}"
f"Cannot find Pipeline Type for SourceConfig {source_config.config}"
)
return pipeline_type

Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/utils/execution_time_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from copy import deepcopy
from functools import wraps
from time import perf_counter
from typing import List, Optional
from typing import Dict, List, Optional

from pydantic import BaseModel

Expand Down Expand Up @@ -81,7 +81,7 @@ class ExecutionTimeTrackerState(metaclass=Singleton):

def __init__(self):
"""Initializes the state and the lock."""
self.state = {}
self.state: Dict[str, float] = {}
self.lock = threading.Lock()

def add(self, context: ExecutionTimeTrackerContext, elapsed: float):
Expand Down
Loading

0 comments on commit c522f14

Please sign in to comment.