From 69966569c839c61baa45c2a024df6f35ffc9d615 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Tue, 23 Jul 2024 15:39:54 +0200 Subject: [PATCH 1/8] Refactor output_handlers to a WorkflowOutputHandler class --- .../dags/airflow_extended_sample_data.py | 4 +- .../dags/airflow_metadata_extraction.py | 4 +- .../airflow/dags/airflow_sample_data.py | 4 +- .../airflow/dags/airflow_sample_usage.py | 4 +- ingestion/operators/docker/main.py | 3 +- ingestion/pyproject.toml | 11 +- ingestion/src/metadata/cli/app.py | 3 +- ingestion/src/metadata/cli/dataquality.py | 8 +- ingestion/src/metadata/cli/ingest.py | 8 +- ingestion/src/metadata/cli/insight.py | 8 +- ingestion/src/metadata/cli/profile.py | 8 +- ingestion/src/metadata/cli/usage.py | 8 +- .../metadata/ingestion/ometa/client_utils.py | 6 +- ingestion/src/metadata/utils/class_helper.py | 7 +- .../workflow/application_output_handler.py | 34 --- ingestion/src/metadata/workflow/base.py | 42 ++- .../src/metadata/workflow/output_handler.py | 248 ---------------- .../workflow/workflow_output_handler.py | 265 +++++++++++++++--- .../workflow/workflow_status_mixin.py | 12 +- .../test_great_expectation_integration.py | 1 - .../system/test_bigquery_system_metrics.py | 1 - .../system/test_redshift_system_metrics.py | 1 - .../system/test_snowflake_system_metrics.py | 1 - .../test_datalake_profiler_e2e.py | 1 - .../orm_profiler/test_orm_profiler_e2e.py | 1 - .../profiler/test_nosql_profiler.py | 1 - ingestion/tests/integration/s3/conftest.py | 3 +- .../workflows/ingestion/application.py | 3 +- .../workflows/ingestion/common.py | 3 +- .../workflows/ingestion/data_insight.py | 3 +- .../workflows/ingestion/profiler.py | 3 +- .../workflows/ingestion/test_suite.py | 3 +- .../workflows/ingestion/usage.py | 3 +- .../v1.5/deployment/run-connectors-class.md | 2 +- .../connectors/ingestion/deployment/index.md | 4 +- .../pipeline/airflow/gcs-composer.md | 4 +- .../deployment/ingestion/external/airflow.md | 8 +- .../ingestion/external/credentials.md | 12 +- .../ingestion/external/gcs-composer.md | 4 +- .../ingestion/external/github-actions.md | 4 +- .../deployment/ingestion/external/index.md | 24 +- .../deployment/ingestion/external/mwaa.md | 8 +- .../deployment/upgrade/versions/110-to-120.md | 4 +- .../data-insights/airflow-sdk.md | 4 +- .../data-insights/elasticsearch-reindex.md | 4 +- .../profiler/external_workflow.md | 4 +- 46 files changed, 361 insertions(+), 440 deletions(-) delete mode 100644 ingestion/src/metadata/workflow/application_output_handler.py delete mode 100644 ingestion/src/metadata/workflow/output_handler.py diff --git a/ingestion/examples/airflow/dags/airflow_extended_sample_data.py b/ingestion/examples/airflow/dags/airflow_extended_sample_data.py index 9c449771bf2e..a593fe5da7a3 100644 --- a/ingestion/examples/airflow/dags/airflow_extended_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_extended_sample_data.py @@ -14,8 +14,6 @@ import yaml from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -63,7 +61,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/examples/airflow/dags/airflow_metadata_extraction.py b/ingestion/examples/airflow/dags/airflow_metadata_extraction.py index 94372e2ff782..e007810bd98f 100644 --- a/ingestion/examples/airflow/dags/airflow_metadata_extraction.py +++ b/ingestion/examples/airflow/dags/airflow_metadata_extraction.py @@ -20,8 +20,6 @@ import yaml from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -72,7 +70,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/examples/airflow/dags/airflow_sample_data.py b/ingestion/examples/airflow/dags/airflow_sample_data.py index f43dad6e1eba..6d2b31ec769b 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_sample_data.py @@ -14,8 +14,6 @@ import yaml from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -62,7 +60,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/examples/airflow/dags/airflow_sample_usage.py b/ingestion/examples/airflow/dags/airflow_sample_usage.py index 306b149ca596..a7420493f115 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_usage.py +++ b/ingestion/examples/airflow/dags/airflow_sample_usage.py @@ -14,8 +14,6 @@ from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -88,7 +86,7 @@ def metadata_ingestion_workflow(): workflow = UsageWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index eb494f727ebf..52b40e96b38d 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -25,7 +25,6 @@ from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import print_status WORKFLOW_MAP = { PipelineType.metadata.value: MetadataWorkflow, @@ -109,7 +108,7 @@ def main(): workflow = workflow_class.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index a7e3273384a7..add81a5aabb3 100644 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -171,7 +171,16 @@ ignore = [ "src/metadata/readers/*", "src/metadata/timer/*", "src/metadata/utils/*", - "src/metadata/workflow/*", + "src/metadata/workflow/base.py", + "src/metadata/workflow/application.py", + "src/metadata/workflow/data_insight.py", + "src/metadata/workflow/data_quality.py", + "src/metadata/workflow/ingestion.py", + "src/metadata/workflow/metadata.py", + "src/metadata/workflow/profiler.py", + "src/metadata/workflow/usage.py", + "src/metadata/workflow/workflow_output_handler.py", + "src/metadata/workflow/workflow_status_mixin.py", ] reportDeprecated = false diff --git a/ingestion/src/metadata/cli/app.py b/ingestion/src/metadata/cli/app.py index 39a4a1b01da4..4d3ed02d1cd7 100644 --- a/ingestion/src/metadata/cli/app.py +++ b/ingestion/src/metadata/cli/app.py @@ -19,7 +19,6 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.application import ApplicationWorkflow -from metadata.workflow.application_output_handler import print_status logger = cli_logger() @@ -42,5 +41,5 @@ def run_app(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index edff3964e25e..8159fb11a633 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -19,11 +19,7 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error logger = cli_logger() @@ -47,5 +43,5 @@ def run_test(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/ingest.py b/ingestion/src/metadata/cli/ingest.py index 82c83f183fbb..41bda71061ed 100644 --- a/ingestion/src/metadata/cli/ingest.py +++ b/ingestion/src/metadata/cli/ingest.py @@ -19,11 +19,7 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error logger = cli_logger() @@ -47,5 +43,5 @@ def run_ingest(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/insight.py b/ingestion/src/metadata/cli/insight.py index fb026e32ddba..97cc3d8e613b 100644 --- a/ingestion/src/metadata/cli/insight.py +++ b/ingestion/src/metadata/cli/insight.py @@ -19,11 +19,7 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error logger = cli_logger() @@ -47,5 +43,5 @@ def run_insight(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/profile.py b/ingestion/src/metadata/cli/profile.py index 106780ef7f08..6cf3acb13171 100644 --- a/ingestion/src/metadata/cli/profile.py +++ b/ingestion/src/metadata/cli/profile.py @@ -19,11 +19,7 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error logger = cli_logger() @@ -47,5 +43,5 @@ def run_profiler(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/usage.py b/ingestion/src/metadata/cli/usage.py index 20a87a562730..f86c0bacff06 100644 --- a/ingestion/src/metadata/cli/usage.py +++ b/ingestion/src/metadata/cli/usage.py @@ -19,11 +19,7 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error logger = cli_logger() @@ -47,5 +43,5 @@ def run_usage(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/ingestion/ometa/client_utils.py b/ingestion/src/metadata/ingestion/ometa/client_utils.py index 85cef0d4f9b7..bc5e54616d96 100644 --- a/ingestion/src/metadata/ingestion/ometa/client_utils.py +++ b/ingestion/src/metadata/ingestion/ometa/client_utils.py @@ -19,7 +19,7 @@ OpenMetadataConnection, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName -from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.ometa_api import C, OpenMetadata, T from metadata.utils import fqn from metadata.utils.logger import ometa_logger @@ -28,7 +28,7 @@ def create_ometa_client( metadata_config: OpenMetadataConnection, -) -> OpenMetadata: +) -> OpenMetadata[T, C]: # pyright: ignore[reportInvalidTypeVarUse] """Create an OpenMetadata client Args: @@ -38,7 +38,7 @@ def create_ometa_client( OpenMetadata: an OM client """ try: - metadata = OpenMetadata(metadata_config) + metadata = OpenMetadata[T, C](metadata_config) metadata.health_check() return metadata except Exception as exc: diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index 951ef4c64c30..8157e252d89a 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -62,6 +62,7 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) +from metadata.generated.schema.metadataIngestion.workflow import SourceConfig SERVICE_TYPE_REF = { ServiceType.Database.value: "databaseService", @@ -102,14 +103,14 @@ def _clean(source_type: str): return source_type -def get_pipeline_type_from_source_config(source_config_type) -> PipelineType: +def get_pipeline_type_from_source_config(source_config: SourceConfig) -> PipelineType: """From the YAML serviceType, get the Ingestion Pipeline Type""" pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get( - source_config_type.__class__.__name__ + source_config.config.__class__.__name__ ) if not pipeline_type: raise ValueError( - f"Cannot find Pipeline Type for SourceConfig {source_config_type}" + f"Cannot find Pipeline Type for SourceConfig {source_config.config}" ) return pipeline_type diff --git a/ingestion/src/metadata/workflow/application_output_handler.py b/ingestion/src/metadata/workflow/application_output_handler.py deleted file mode 100644 index e29c60ec5bb5..000000000000 --- a/ingestion/src/metadata/workflow/application_output_handler.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Module handles the output messages for applications -""" -import time - -from metadata.utils.helpers import pretty_print_time_duration -from metadata.utils.logger import ANSI, log_ansi_encoded_string -from metadata.workflow.output_handler import print_workflow_summary - - -def print_status(workflow: "ApplicationWorkflow") -> None: - """ - Print the workflow results - """ - print_workflow_summary(workflow) - - if workflow.runner.get_status().source_start_time: - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message="Workflow finished in time: " - f"{pretty_print_time_duration(time.time()-workflow.runner.get_status().source_start_time)}", - ) diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index b0bbe0d24e2b..0bed1675c94c 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -12,6 +12,7 @@ Base workflow definition. """ +import traceback import uuid from abc import ABC, abstractmethod from datetime import datetime @@ -47,7 +48,7 @@ from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger, set_loggers_level -from metadata.workflow.output_handler import report_ingestion_status +from metadata.workflow.workflow_output_handler import WorkflowOutputHandler from metadata.workflow.workflow_status_mixin import ( SUCCESS_THRESHOLD_VALUE, WorkflowStatusMixin, @@ -88,6 +89,7 @@ def __init__( """ Disabling pylint to wait for workflow reimplementation as a topology """ + self.output_handler = WorkflowOutputHandler() self.config = config self.service_type = service_type self._timer: Optional[RepeatedTimer] = None @@ -136,7 +138,7 @@ def timer(self) -> RepeatedTimer: """ if not self._timer: self._timer = RepeatedTimer( - REPORTS_INTERVAL_SECONDS, report_ingestion_status, logger, self + REPORTS_INTERVAL_SECONDS, self._report_ingestion_status ) return self._timer @@ -249,7 +251,7 @@ def get_or_create_ingestion_pipeline(self) -> Optional[IngestionPipeline]: ), ), pipelineType=get_pipeline_type_from_source_config( - self.config.source.sourceConfig.config + self.config.source.sourceConfig ), sourceConfig=self.config.source.sourceConfig, airflowConfig=AirflowConfig(), @@ -277,3 +279,37 @@ def _get_ingestion_pipeline_service(self) -> Optional[T]: entity=get_service_class_from_service_type(self.service_type), fqn=self.config.source.serviceName, ) + + def _report_ingestion_status(self): + """ + Given a logger, use it to INFO the workflow status + """ + try: + for step in self.workflow_steps(): + logger.info( + f"{step.name}: Processed {len(step.status.records)} records," + f" filtered {len(step.status.filtered)} records," + f" found {len(step.status.failures)} errors" + ) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Wild exception reporting status - {exc}") + + def _is_debug_enabled(self) -> bool: + return ( + hasattr(self, "config") + and hasattr(self.config, "workflowConfig") + and hasattr(self.config.workflowConfig, "loggerLevel") + and self.config.workflowConfig.loggerLevel is LogLevels.DEBUG + ) + + def print_status(self): + start_time = self.workflow_steps()[0].get_status().source_start_time + + self.output_handler.print_status( + self.result_status(), + self.workflow_steps(), + start_time, + self._is_debug_enabled(), + ) diff --git a/ingestion/src/metadata/workflow/output_handler.py b/ingestion/src/metadata/workflow/output_handler.py deleted file mode 100644 index a8c0e6000d00..000000000000 --- a/ingestion/src/metadata/workflow/output_handler.py +++ /dev/null @@ -1,248 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Common Output Handling methods -""" -import traceback -from enum import Enum -from logging import Logger -from pathlib import Path -from typing import Dict, List - -from pydantic import BaseModel -from tabulate import tabulate - -from metadata.generated.schema.entity.services.ingestionPipelines.status import ( - StackTraceError, -) -from metadata.generated.schema.metadataIngestion.workflow import LogLevels -from metadata.ingestion.api.step import Summary -from metadata.ingestion.lineage.models import QueryParsingFailures -from metadata.utils.execution_time_tracker import ExecutionTimeTracker -from metadata.utils.helpers import pretty_print_time_duration -from metadata.utils.logger import ANSI, log_ansi_encoded_string - -WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" -WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings" -WORKFLOW_SUCCESS_MESSAGE = "Workflow finished successfully" - - -class Failure(BaseModel): - """ - Auxiliary class to print the error per status - """ - - name: str - failures: List[StackTraceError] - - -class WorkflowType(Enum): - """ - Workflow type enums based on the `metadata` CLI commands - """ - - INGEST = "ingest" - PROFILE = "profile" - TEST = "test" - LINEAGE = "lineage" - USAGE = "usage" - INSIGHT = "insight" - APP = "application" - - -EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" - -URLS = { - WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", - WorkflowType.PROFILE: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", - WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", - WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", - WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", -} - -DEFAULT_EXAMPLE_FILE = { - WorkflowType.INGEST: "bigquery", - WorkflowType.PROFILE: "bigquery_profiler", - WorkflowType.TEST: "test_suite", - WorkflowType.LINEAGE: "bigquery_lineage", - WorkflowType.USAGE: "bigquery_usage", -} - - -def print_more_info(workflow_type: WorkflowType) -> None: - """ - Print more information message - """ - log_ansi_encoded_string( - message=f"\nFor more information, please visit: {URLS[workflow_type]}" - "\nOr join us in Slack: https://slack.open-metadata.org/" - ) - - -def print_error_msg(msg: str) -> None: - """ - Print message with error style - """ - log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") - - -def get_failures(failure: Failure) -> List[Dict[str, str]]: - return [ - { - "From": failure.name, - "Entity Name": f.name, - "Message": f.error, - "Stack Trace": f.stackTrace, - } - for f in failure.failures - ] - - -def print_failures_if_apply(failures: List[Failure]) -> None: - # take only the ones that contain failures - failures = [f for f in failures if f.failures] - if failures: - # create a list of dictionaries' list - all_data = [get_failures(failure) for failure in failures] - # create a single of dictionaries - data = [f for fs in all_data for f in fs] - # create a dictionary with a key and a list of values from the list - error_table = {k: [dic[k] for dic in data] for k in data[0]} - if len(list(error_table.items())[0][1]) > 100: - log_ansi_encoded_string( - bold=True, message="Showing only the first 100 failures:" - ) - # truncate list if number of values are over 100 - error_table = {k: v[:100] for k, v in error_table.items()} - else: - log_ansi_encoded_string(bold=True, message="List of failures:") - - log_ansi_encoded_string( - message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}" - ) - - -def is_debug_enabled(workflow) -> bool: - return ( - hasattr(workflow, "config") - and hasattr(workflow.config, "workflowConfig") - and hasattr(workflow.config.workflowConfig, "loggerLevel") - and workflow.config.workflowConfig.loggerLevel is LogLevels.DEBUG - ) - - -def print_execution_time_summary(): - """Log the ExecutionTimeTracker Summary.""" - tracker = ExecutionTimeTracker() - - summary_table = { - "Context": [], - "Execution Time Aggregate": [], - } - - for key in sorted(tracker.state.state.keys()): - summary_table["Context"].append(key) - summary_table["Execution Time Aggregate"].append( - pretty_print_time_duration(tracker.state.state[key]) - ) - - log_ansi_encoded_string(bold=True, message="Execution Time Summary") - log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}") - - -def print_query_parsing_issues(): - """Log the QueryParsingFailures Summary.""" - query_failures = QueryParsingFailures() - - summary_table = { - "Query": [], - "Error": [], - } - - for failure in query_failures: - summary_table["Query"].append(failure.query) - summary_table["Error"].append(failure.error) - - if summary_table["Query"]: - log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary") - log_ansi_encoded_string( - message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}" - ) - - -def print_workflow_summary(workflow: "BaseWorkflow") -> None: - """ - Args: - workflow: the workflow status to be printed - - Returns: - Print Workflow status when the workflow logger level is DEBUG - """ - - if is_debug_enabled(workflow): - print_workflow_status_debug(workflow) - print_execution_time_summary() - print_query_parsing_issues() - - failures = [] - total_records = 0 - total_errors = 0 - for step in workflow.workflow_steps(): - step_summary = Summary.from_step(step) - total_records += step_summary.records - total_errors += step_summary.errors - failures.append(Failure(name=step.name, failures=step.get_status().failures)) - - log_ansi_encoded_string(bold=True, message=f"Workflow {step.name} Summary:") - log_ansi_encoded_string(message=f"Processed records: {step_summary.records}") - log_ansi_encoded_string( - message=f"Updated records: {step_summary.updated_records}" - ) - log_ansi_encoded_string(message=f"Warnings: {step_summary.warnings}") - if step_summary.filtered: - log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}") - log_ansi_encoded_string(message=f"Errors: {step_summary.errors}") - - print_failures_if_apply(failures) - - total_success = max(total_records, 1) - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message=f"Success %: " - f"{round(total_success * 100 / (total_success + total_errors), 2)}", - ) - - -def print_workflow_status_debug(workflow: "BaseWorkflow") -> None: - """Print the statuses from each workflow step""" - log_ansi_encoded_string(bold=True, message="Statuses detailed info:") - for step in workflow.workflow_steps(): - log_ansi_encoded_string(bold=True, message=f"{step.name} Status:") - log_ansi_encoded_string(message=step.get_status().as_string()) - - -def report_ingestion_status(logger: Logger, workflow: "BaseWorkflow") -> None: - """ - Given a logger, use it to INFO the workflow status - """ - try: - for step in workflow.workflow_steps(): - logger.info( - f"{step.name}: Processed {len(step.status.records)} records," - f" filtered {len(step.status.filtered)} records," - f" found {len(step.status.failures)} errors" - ) - - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error(f"Wild exception reporting status - {exc}") diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index eaf0fe18c158..325b586519de 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -15,25 +15,228 @@ import time import traceback -from typing import Type, Union +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional, Type, Union + +from pydantic import BaseModel +from tabulate import tabulate from metadata.config.common import ConfigurationError +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.ingestion.api.parser import ( InvalidWorkflowException, ParsingConfigurationError, ) +from metadata.ingestion.api.step import Step, Summary +from metadata.ingestion.lineage.models import QueryParsingFailures from metadata.utils.constants import UTF_8 +from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string -from metadata.workflow.output_handler import ( - DEFAULT_EXAMPLE_FILE, - EXAMPLES_WORKFLOW_PATH, - WORKFLOW_FAILURE_MESSAGE, - WorkflowType, - print_error_msg, - print_more_info, - print_workflow_summary, -) +from metadata.workflow.workflow_status_mixin import WorkflowResultStatus + +WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" +WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings" +WORKFLOW_SUCCESS_MESSAGE = "Workflow finished successfully" + + +class Failure(BaseModel): + """ + Auxiliary class to print the error per status + """ + + name: str + failures: List[StackTraceError] + + +class WorkflowType(Enum): + """ + Workflow type enums based on the `metadata` CLI commands + """ + + INGEST = "ingest" + PROFILE = "profile" + TEST = "test" + LINEAGE = "lineage" + USAGE = "usage" + INSIGHT = "insight" + APP = "application" + + +EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" + +URLS = { + WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", + WorkflowType.PROFILE: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", + WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", + WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", + WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", +} + +DEFAULT_EXAMPLE_FILE = { + WorkflowType.INGEST: "bigquery", + WorkflowType.PROFILE: "bigquery_profiler", + WorkflowType.TEST: "test_suite", + WorkflowType.LINEAGE: "bigquery_lineage", + WorkflowType.USAGE: "bigquery_usage", +} + + +class WorkflowOutputHandler: + """ Responsible for dealing with the Workflow Outputs """ + def print_status( + self, + result_status: WorkflowResultStatus, + steps: List[Step], + start_time: Optional[Any] = None, + debug: bool = False, + ): + """ + Print the workflow results + """ + self.print_summary(steps, debug) + + if start_time: + log_ansi_encoded_string( + color=ANSI.BRIGHT_CYAN, + bold=True, + message="Workflow finished in time: " + f"{pretty_print_time_duration(time.time() - start_time)}", + ) + + if result_status == WorkflowResultStatus.FAILURE: + log_ansi_encoded_string( + color=ANSI.BRIGHT_RED, + bold=True, + message=WORKFLOW_FAILURE_MESSAGE, + ) + + def print_summary(self, steps: List[Step], debug: bool = False): + if debug: + self._print_debug_summary(steps) + self._print_execution_time_summary() + self._print_query_parsing_issues() + + self._print_summary(steps) + + def _print_summary(self, steps: List[Step]): + failures = [] + total_records: int = 0 + total_errors: int = 0 + + for step in steps: + step_summary = Summary.from_step(step) + + total_records += step_summary.records or 0 + total_errors += step_summary.errors or 0 + failures.append( + Failure(name=step.name, failures=step.get_status().failures) + ) + + log_ansi_encoded_string(bold=True, message=f"Workflow {step.name} Summary:") + log_ansi_encoded_string( + message=f"Processed records: {step_summary.records}" + ) + log_ansi_encoded_string( + message=f"Updated records: {step_summary.updated_records}" + ) + log_ansi_encoded_string(message=f"Warnings: {step_summary.warnings}") + + if step_summary.filtered: + log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}") + + log_ansi_encoded_string(message=f"Errors: {step_summary.errors}") + + self._print_failures_if_apply(failures) + + total_success = max(total_records, 1) + log_ansi_encoded_string( + color=ANSI.BRIGHT_CYAN, + bold=True, + message=f"Success %: " + f"{round(total_success * 100 / (total_success + total_errors), 2)}", + ) + + def _print_debug_summary(self, steps: List[Step]): + log_ansi_encoded_string(bold=True, message="Statuses detailed info:") + + for step in steps: + log_ansi_encoded_string(bold=True, message=f"{step.name} Status:") + log_ansi_encoded_string(message=step.get_status().as_string()) + + def _print_execution_time_summary(self): + """Log the ExecutionTimeTracker Summary.""" + tracker = ExecutionTimeTracker() + + summary_table = { + "Context": [], + "Execution Time Aggregate": [], + } + + for key in sorted(tracker.state.state.keys()): + summary_table["Context"].append(key) + summary_table["Execution Time Aggregate"].append( + pretty_print_time_duration(tracker.state.state[key]) + ) + + log_ansi_encoded_string(bold=True, message="Execution Time Summary") + log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}") + + def _print_query_parsing_issues(self): + """Log the QueryParsingFailures Summary.""" + query_failures = QueryParsingFailures() + + summary_table = { + "Query": [], + "Error": [], + } + + for failure in query_failures: + summary_table["Query"].append(failure.query) + summary_table["Error"].append(failure.error) + + if summary_table["Query"]: + log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary") + log_ansi_encoded_string( + message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}" + ) + + def _get_failures(self, failure: Failure) -> List[Dict[str, str]]: + return [ + { + "From": failure.name, + "Entity Name": f.name, + "Message": f.error, + "Stack Trace": f.stackTrace, + } + for f in failure.failures + ] + + def _print_failures_if_apply(self, failures: List[Failure]) -> None: + # take only the ones that contain failures + failures = [f for f in failures if f.failures] + if failures: + # create a list of dictionaries' list + all_data = [self._get_failures(failure) for failure in failures] + # create a single of dictionaries + data = [f for fs in all_data for f in fs] + # create a dictionary with a key and a list of values from the list + error_table = {k: [dic[k] for dic in data] for k in data[0]} + if len(list(error_table.items())[0][1]) > 100: + log_ansi_encoded_string( + bold=True, message="Showing only the first 100 failures:" + ) + # truncate list if number of values are over 100 + error_table = {k: v[:100] for k, v in error_table.items()} + else: + log_ansi_encoded_string(bold=True, message="List of failures:") + + log_ansi_encoded_string( + message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}" + ) def calculate_ingestion_type(source_type_name: str) -> WorkflowType: @@ -81,6 +284,23 @@ def print_file_example(source_type_name: str, workflow_type: WorkflowType): log_ansi_encoded_string(message="------------") +def print_more_info(workflow_type: WorkflowType) -> None: + """ + Print more information message + """ + log_ansi_encoded_string( + message=f"\nFor more information, please visit: {URLS[workflow_type]}" + "\nOr join us in Slack: https://slack.open-metadata.org/" + ) + + +def print_error_msg(msg: str) -> None: + """ + Print message with error style + """ + log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") + + def print_init_error( exc: Union[Exception, Type[Exception]], config: dict, @@ -113,28 +333,3 @@ def print_init_error( print_error_msg(f"\nError initializing {workflow_type.name}: {exc}") print_error_msg(traceback.format_exc()) print_more_info(workflow_type) - - -def print_status(workflow: "IngestionWorkflow") -> None: - """ - Print the workflow results - """ - - print_workflow_summary(workflow) - - # Get the time to execute the first step - first_step = workflow.workflow_steps()[0] - if first_step.get_status().source_start_time: - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message="Workflow finished in time: " - f"{pretty_print_time_duration(time.time()-first_step.get_status().source_start_time)}", - ) - - if workflow.result_status() == 1: - log_ansi_encoded_string( - color=ANSI.BRIGHT_RED, - bold=True, - message=WORKFLOW_FAILURE_MESSAGE, - ) diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index 9772857a1a8a..4e85791423a9 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -14,6 +14,7 @@ import traceback import uuid from datetime import datetime +from enum import Enum, auto from typing import Optional, Tuple from metadata.config.common import WorkflowExecutionError @@ -39,6 +40,11 @@ SUCCESS_THRESHOLD_VALUE = 90 +class WorkflowResultStatus(Enum): + SUCCESS = auto() + FAILURE = auto() + + class WorkflowStatusMixin: """ Helper methods to manage IngestionPipeline status @@ -127,13 +133,13 @@ def raise_from_status(self, raise_warnings=False): self.set_ingestion_pipeline_status(PipelineState.failed) raise err - def result_status(self) -> int: + def result_status(self) -> WorkflowResultStatus: """ Returns 1 if source status is failed, 0 otherwise. """ if self.get_failures(): - return 1 - return 0 + return WorkflowResultStatus.FAILURE + return WorkflowResultStatus.SUCCESS def build_ingestion_status(self) -> Optional[IngestionStatus]: """ diff --git a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py index 62c727794e02..633bc803d5ff 100644 --- a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py +++ b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py @@ -38,7 +38,6 @@ get_end_of_day_timestamp_mill, ) from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status Base = declarative_base() diff --git a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py index 06227197698d..5faa22644ce4 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py @@ -61,7 +61,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent BIGQUERY_CONFIG_FILE = "cli_e2e/database/bigquery/bigquery.yaml" diff --git a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py index 734b588e88f5..0b953bbc85a3 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py @@ -47,7 +47,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent REDSHIFT_CONFIG_FILE = "cli_e2e/database/redshift/redshift.yaml" diff --git a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py index d64a08a065d7..961d0c399144 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py @@ -66,7 +66,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent SNOWFLAKE_CONFIG_FILE = "cli_e2e/database/snowflake/snowflake.yaml" diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py index b7d58cb09b88..c1bae7b23399 100644 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py @@ -41,7 +41,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status SERVICE_NAME = Path(__file__).stem REGION = "us-west-1" diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index b1b5001a3e69..28d069f08fda 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -44,7 +44,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status logging.basicConfig(level=logging.WARN) logger = logging.getLogger(__name__) diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index 7a428cbd08cc..e1c51e8de7a1 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -45,7 +45,6 @@ from metadata.utils.time_utils import get_end_of_day_timestamp_mill from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status SERVICE_NAME = Path(__file__).stem diff --git a/ingestion/tests/integration/s3/conftest.py b/ingestion/tests/integration/s3/conftest.py index 615d742d7297..2c96e1b1cba9 100644 --- a/ingestion/tests/integration/s3/conftest.py +++ b/ingestion/tests/integration/s3/conftest.py @@ -20,7 +20,6 @@ from _openmetadata_testutils.ometa import OM_JWT, int_admin_ometa from metadata.generated.schema.entity.services.storageService import StorageService from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status from ..containers import MinioContainerConfigs, get_minio_container @@ -112,7 +111,7 @@ def ingest_s3_storage(minio, metadata, service_name, create_data): workflow = MetadataWorkflow.create(yaml.safe_load(config)) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() yield diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py index 2e344a064441..17a6c9e13751 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py @@ -34,7 +34,6 @@ ApplicationPipeline, ) from metadata.workflow.application import ApplicationWorkflow -from metadata.workflow.application_output_handler import print_status def application_workflow(workflow_config: OpenMetadataApplicationConfig): @@ -54,7 +53,7 @@ def application_workflow(workflow_config: OpenMetadataApplicationConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index e92b3f786b55..7b219873b69d 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -36,7 +36,6 @@ from metadata.generated.schema.type.basic import Timestamp, Uuid from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from metadata.workflow.workflow_output_handler import print_status # pylint: disable=ungrouped-imports try: @@ -207,7 +206,7 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index d180e0896259..8fa9a40ed012 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -42,7 +42,6 @@ ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import print_status def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -63,7 +62,7 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 891460368298..1053912be158 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -28,7 +28,6 @@ WorkflowConfig, ) from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -48,7 +47,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 38263010e92b..8ed7d76f0ef8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -28,7 +28,6 @@ WorkflowConfig, ) from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import print_status def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -48,7 +47,7 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py index e4094f2d02e0..27d0292a26f8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py @@ -32,7 +32,6 @@ Stage, ) from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import print_status def usage_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -52,7 +51,7 @@ def usage_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md b/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md index 2182296b5769..3e397e7d78fa 100644 --- a/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md +++ b/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md @@ -15,7 +15,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md index 25b6430a23e3..b042a842d22e 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md @@ -327,7 +327,7 @@ we just need the following few lines of Python code: ```python from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + config = """ @@ -337,7 +337,7 @@ workflow_config = yaml.safe_load(config) workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() -print_status(workflow) +workflow.print_status() workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md index fbd565104d5a..c676cbb29b77 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md @@ -65,7 +65,7 @@ from airflow.utils.dates import days_ago from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + default_args = { "owner": "user_name", @@ -108,7 +108,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md index 4f309f3f66bf..bbc7fffd5f90 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md @@ -54,7 +54,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + from airflow.utils.dates import days_ago @@ -76,7 +76,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( @@ -225,7 +225,7 @@ default_args = { def metadata_ingestion_workflow(): from metadata.workflow.metadata import MetadataWorkflow - from metadata.workflow.workflow_output_handler import print_status + import yaml config = """ ... @@ -235,7 +235,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md index 040fc5a2a93a..07e76f2bb116 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md @@ -163,7 +163,7 @@ import yaml from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + CONFIG = f""" source: @@ -182,7 +182,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() @@ -286,7 +286,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + # Import the hook from airflow.hooks.base import BaseHook @@ -326,7 +326,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( @@ -363,7 +363,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + config = """ source: @@ -388,7 +388,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md index f869d8a73a48..f181e83ab610 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md @@ -52,7 +52,7 @@ from airflow.utils.dates import days_ago from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + default_args = { "owner": "user_name", @@ -73,7 +73,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md index a048b4e98649..17741a2e9202 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md @@ -38,7 +38,7 @@ import yaml from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + CONFIG = f""" source: @@ -57,7 +57,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md index a5b2961aebe8..b7d659c2edaa 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md @@ -23,7 +23,7 @@ ingestion from within a simple Python script: ```python from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + # Specify your YAML configuration CONFIG = """ @@ -42,7 +42,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() @@ -353,7 +353,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -392,7 +392,7 @@ def run(): workflow = MetadataWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -464,7 +464,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -499,7 +499,7 @@ def run(): workflow = MetadataWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -570,7 +570,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -613,7 +613,7 @@ def run(): workflow = UsageWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -686,7 +686,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -723,7 +723,7 @@ def run(): workflow = ProfilerWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -800,7 +800,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -836,7 +836,7 @@ def run(): workflow = TestSuiteWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md index acff0d0cd0a6..63f00344c06b 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md @@ -54,7 +54,7 @@ except ModuleNotFoundError: from airflow.utils.dates import days_ago from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + default_args = { "retries": 3, @@ -71,7 +71,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( @@ -392,7 +392,7 @@ default_args = { def metadata_ingestion_workflow(): from metadata.workflow.metadata import MetadataWorkflow - from metadata.workflow.workflow_output_handler import print_status + import yaml @@ -403,7 +403,7 @@ YAML config workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md index c4d742d006b5..f34b6430d36e 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md @@ -230,12 +230,12 @@ We have reorganized the structure of the `Workflow` classes, which requires upda The `Workflow` class that you import can then be called as follows: ```python -from metadata.workflow.workflow_output_handler import print_status + workflow = workflow_class.create(workflow_config) workflow.execute() workflow.raise_from_status() -print_status(workflow) # This method has been updated. Before it was `workflow.print_status()` +workflow.print_status() # This method has been updated. Before it was `workflow.print_status()` workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md index c1c0a36442eb..309a3ca10b11 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md @@ -70,7 +70,7 @@ import yaml from datetime import timedelta from airflow import DAG from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import print_status + try: from airflow.operators.python import PythonOperator @@ -98,7 +98,7 @@ def metadata_ingestion_workflow(): workflow = DataInsightWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md index 8cbb29d0a6a8..1f139292e7ea 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md @@ -50,7 +50,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + from airflow.utils.dates import days_ago default_args = { @@ -71,7 +71,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md index 963ed15fe6cd..21aa5e9021c2 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md @@ -169,7 +169,7 @@ If you'd rather have a Python script taking care of the execution, you can use: ```python from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status + # Specify your YAML configuration CONFIG = """ @@ -188,7 +188,7 @@ def run(): workflow = ProfilerWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() From b1c9e8140e97b87fc308af72eb440ed3d8c95a5d Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 24 Jul 2024 10:10:12 +0200 Subject: [PATCH 2/8] Add old methods as deprecated to avoid breaking changes --- .../workflow/application_output_handler.py | 21 +++++++++++++++++++ .../workflow/workflow_output_handler.py | 9 +++++++- .../test_great_expectation_integration.py | 2 +- .../system/test_bigquery_system_metrics.py | 2 +- .../system/test_redshift_system_metrics.py | 2 +- .../system/test_snowflake_system_metrics.py | 6 +++--- .../test_datalake_profiler_e2e.py | 2 +- .../orm_profiler/test_orm_profiler_e2e.py | 2 +- .../profiler/test_nosql_profiler.py | 2 +- 9 files changed, 38 insertions(+), 10 deletions(-) create mode 100644 ingestion/src/metadata/workflow/application_output_handler.py diff --git a/ingestion/src/metadata/workflow/application_output_handler.py b/ingestion/src/metadata/workflow/application_output_handler.py new file mode 100644 index 000000000000..ec5177b8fece --- /dev/null +++ b/ingestion/src/metadata/workflow/application_output_handler.py @@ -0,0 +1,21 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Generic Workflow entrypoint to execute Applications +""" + +from metadata.utils.deprecation import deprecated +from metadata.workflow.base import BaseWorkflow + + +@deprecated(message="Use 'workflow.print_status()' instead.", release="1.8") +def print_status(workflow: BaseWorkflow): + workflow.print_status() diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index 325b586519de..468bddac8c37 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -33,6 +33,7 @@ from metadata.ingestion.api.step import Step, Summary from metadata.ingestion.lineage.models import QueryParsingFailures from metadata.utils.constants import UTF_8 +from metadata.utils.deprecation import deprecated from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string @@ -85,8 +86,14 @@ class WorkflowType(Enum): } +@deprecated(message="Use 'workflow.print_status()' instead.", release="1.8") +def print_status(workflow: "BaseWorkflow"): # pyright: ignore[ReportUndefinedVariable] + workflow.print_status() + + class WorkflowOutputHandler: - """ Responsible for dealing with the Workflow Outputs """ + """Responsible for dealing with the Workflow Outputs""" + def print_status( self, result_status: WorkflowResultStatus, diff --git a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py index 633bc803d5ff..6a8529f74899 100644 --- a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py +++ b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py @@ -148,7 +148,7 @@ def setUpClass(cls): ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() @classmethod diff --git a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py index 5faa22644ce4..50cf2bba2ec8 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py @@ -142,7 +142,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(cls.config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() # get table fqn diff --git a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py index 0b953bbc85a3..0743911e8622 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py @@ -109,7 +109,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(cls.config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() # get table fqn diff --git a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py index 961d0c399144..6172f70869b0 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py @@ -23,12 +23,12 @@ We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema. query example: ``` - INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES + INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES (1, 'FOO'), (2, 'BAR'), (3, 'BAZZ') - INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES + INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES (4, 'FOOBAR'), (5, 'FOOBAZZ'), (6, 'BARBAZZ') @@ -133,7 +133,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(cls.config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() # get table fqn diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py index c1bae7b23399..c2092669d6a4 100644 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py @@ -138,7 +138,7 @@ def setUp(self) -> None: ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() def test_datalake_profiler_workflow(self): diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index 28d069f08fda..c2b2e0835abc 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -184,7 +184,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(ingestion_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() @classmethod diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index e1c51e8de7a1..c86b1bb64e10 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -143,7 +143,7 @@ def setUpClass(cls) -> None: ) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() @classmethod From 0943c2625a5d1c207961cbcfa66fcbda77253008 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 24 Jul 2024 11:34:55 +0200 Subject: [PATCH 3/8] Extract WorkflowInitErrorHandler from workflow_output_handler --- ingestion/pyproject.toml | 2 +- ingestion/src/metadata/cli/dataquality.py | 9 +- ingestion/src/metadata/cli/ingest.py | 7 +- ingestion/src/metadata/cli/insight.py | 9 +- ingestion/src/metadata/cli/lineage.py | 7 +- ingestion/src/metadata/cli/profile.py | 9 +- ingestion/src/metadata/cli/usage.py | 7 +- .../metadata/utils/execution_time_tracker.py | 4 +- ingestion/src/metadata/workflow/base.py | 12 +- .../workflow/workflow_init_error_handler.py | 183 +++++++++++++++++ .../workflow/workflow_output_handler.py | 184 ++++-------------- 11 files changed, 266 insertions(+), 167 deletions(-) create mode 100644 ingestion/src/metadata/workflow/workflow_init_error_handler.py diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index add81a5aabb3..9eca4ddd05cf 100644 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -179,9 +179,9 @@ ignore = [ "src/metadata/workflow/metadata.py", "src/metadata/workflow/profiler.py", "src/metadata/workflow/usage.py", - "src/metadata/workflow/workflow_output_handler.py", "src/metadata/workflow/workflow_status_mixin.py", ] reportDeprecated = false reportMissingTypeStubs = false +reportAny = false diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index 8159fb11a633..b0eb28a61207 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -19,7 +19,10 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) logger = cli_logger() @@ -38,7 +41,9 @@ def run_test(config_path: Path) -> None: workflow = TestSuiteWorkflow.create(workflow_config_dict) except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, workflow_config_dict, WorkflowType.TEST) + WorkflowInitErrorHandler.print_init_error( + exc, workflow_config_dict, WorkflowType.TEST + ) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/cli/ingest.py b/ingestion/src/metadata/cli/ingest.py index 41bda71061ed..22cf8575c136 100644 --- a/ingestion/src/metadata/cli/ingest.py +++ b/ingestion/src/metadata/cli/ingest.py @@ -19,7 +19,10 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) logger = cli_logger() @@ -38,7 +41,7 @@ def run_ingest(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/cli/insight.py b/ingestion/src/metadata/cli/insight.py index 97cc3d8e613b..0daf7ab5addb 100644 --- a/ingestion/src/metadata/cli/insight.py +++ b/ingestion/src/metadata/cli/insight.py @@ -19,7 +19,10 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) logger = cli_logger() @@ -38,7 +41,9 @@ def run_insight(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INSIGHT) + WorkflowInitErrorHandler.print_init_error( + exc, config_dict, WorkflowType.INSIGHT + ) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/cli/lineage.py b/ingestion/src/metadata/cli/lineage.py index dba0ed612582..98af1900db63 100644 --- a/ingestion/src/metadata/cli/lineage.py +++ b/ingestion/src/metadata/cli/lineage.py @@ -25,7 +25,10 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import UTF_8 from metadata.utils.logger import cli_logger -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) logger = cli_logger() @@ -53,7 +56,7 @@ def run_lineage(config_path: Path) -> None: except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST) sys.exit(1) if workflow.filePath: diff --git a/ingestion/src/metadata/cli/profile.py b/ingestion/src/metadata/cli/profile.py index 6cf3acb13171..8c1db551c130 100644 --- a/ingestion/src/metadata/cli/profile.py +++ b/ingestion/src/metadata/cli/profile.py @@ -19,7 +19,10 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) logger = cli_logger() @@ -38,7 +41,9 @@ def run_profiler(config_path: Path) -> None: workflow = ProfilerWorkflow.create(workflow_config_dict) except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE) + WorkflowInitErrorHandler.print_init_error( + exc, workflow_config_dict, WorkflowType.PROFILE + ) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/cli/usage.py b/ingestion/src/metadata/cli/usage.py index f86c0bacff06..aaeea33366c0 100644 --- a/ingestion/src/metadata/cli/usage.py +++ b/ingestion/src/metadata/cli/usage.py @@ -19,7 +19,10 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) logger = cli_logger() @@ -38,7 +41,7 @@ def run_usage(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/utils/execution_time_tracker.py b/ingestion/src/metadata/utils/execution_time_tracker.py index bdac1676c0ed..e9f0e733808f 100644 --- a/ingestion/src/metadata/utils/execution_time_tracker.py +++ b/ingestion/src/metadata/utils/execution_time_tracker.py @@ -17,7 +17,7 @@ from copy import deepcopy from functools import wraps from time import perf_counter -from typing import List, Optional +from typing import Dict, List, Optional from pydantic import BaseModel @@ -81,7 +81,7 @@ class ExecutionTimeTrackerState(metaclass=Singleton): def __init__(self): """Initializes the state and the lock.""" - self.state = {} + self.state: Dict[str, float] = {} self.lock = threading.Lock() def add(self, context: ExecutionTimeTrackerContext, elapsed: float): diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 0bed1675c94c..79b506a859f3 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -89,7 +89,7 @@ def __init__( """ Disabling pylint to wait for workflow reimplementation as a topology """ - self.output_handler = WorkflowOutputHandler() + self._output_handler = None self.config = config self.service_type = service_type self._timer: Optional[RepeatedTimer] = None @@ -108,6 +108,12 @@ def __init__( self.post_init() + @property + def output_handler(self) -> WorkflowOutputHandler: + if not self._output_handler: + self._output_handler = WorkflowOutputHandler() + return self._output_handler + @property def ingestion_pipeline(self) -> Optional[IngestionPipeline]: """Get or create the Ingestion Pipeline from the configuration""" @@ -116,6 +122,10 @@ def ingestion_pipeline(self) -> Optional[IngestionPipeline]: return self._ingestion_pipeline + def with_output_handler(self, output_handler: WorkflowOutputHandler): + self._output_handler = output_handler + return self + def stop(self) -> None: """ Main stopping logic diff --git a/ingestion/src/metadata/workflow/workflow_init_error_handler.py b/ingestion/src/metadata/workflow/workflow_init_error_handler.py new file mode 100644 index 000000000000..d0f79abbaecf --- /dev/null +++ b/ingestion/src/metadata/workflow/workflow_init_error_handler.py @@ -0,0 +1,183 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module handles the init error messages from different workflows +""" +import traceback +from enum import Enum +from pathlib import Path +from typing import Any, Dict, Optional, Type, Union + +from metadata.config.common import ConfigurationError +from metadata.ingestion.api.parser import ( + InvalidWorkflowException, + ParsingConfigurationError, +) +from metadata.utils.constants import UTF_8 +from metadata.utils.logger import ANSI, log_ansi_encoded_string + + +class WorkflowType(Enum): + """ + Workflow type enums based on the `metadata` CLI commands + """ + + INGEST = "ingest" + PROFILE = "profile" + TEST = "test" + LINEAGE = "lineage" + USAGE = "usage" + INSIGHT = "insight" + APP = "application" + + +EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" + + +URLS = { + WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", + WorkflowType.PROFILE: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", + WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", + WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", + WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", +} + + +DEFAULT_EXAMPLE_FILE = { + WorkflowType.INGEST: "bigquery", + WorkflowType.PROFILE: "bigquery_profiler", + WorkflowType.TEST: "test_suite", + WorkflowType.LINEAGE: "bigquery_lineage", + WorkflowType.USAGE: "bigquery_usage", +} + + +class WorkflowInitErrorHandler: + """Resonsible for handling the InitError flow, when a Workflow errors before even initializing.""" + + @staticmethod + def print_init_error( + exc: Union[Exception, Type[Exception]], + config: Dict[str, Any], + workflow_type: WorkflowType = WorkflowType.INGEST, + ) -> None: + """ + Print a workflow initialization error + """ + source_type_name = WorkflowInitErrorHandler._get_source_type_name(config) + workflow_type = WorkflowInitErrorHandler._update_workflow_type( + source_type_name, workflow_type + ) + + if isinstance( + exc, + (ParsingConfigurationError, ConfigurationError, InvalidWorkflowException), + ): + WorkflowInitErrorHandler._print_error_msg( + f"Error loading {workflow_type.name} configuration: {exc}" + ) + WorkflowInitErrorHandler._print_file_example( + source_type_name, workflow_type + ) + else: + WorkflowInitErrorHandler._print_error_msg( + f"\nError initializing {workflow_type.name}: {exc}" + ) + WorkflowInitErrorHandler._print_error_msg(traceback.format_exc()) + + WorkflowInitErrorHandler._print_more_info(workflow_type) + + @staticmethod + def _get_source_type_name(config: Dict[str, Any]) -> Optional[str]: + """Returns the Source Type Name based on the Configuration passed.""" + source_type_name = None + + if ( + config + and config.get("source", None) is not None + and config["source"].get("type", None) is not None + ): + source_type_name = config["source"].get("type") + source_type_name = source_type_name.replace("-", "-") + + return source_type_name + + @staticmethod + def _update_workflow_type( + source_type_name: Optional[str], workflow_type: WorkflowType + ) -> WorkflowType: + """Updates the WorkflowType if needed. + When WorkflowType.INGEST is received, it can be algo LINEAGE or USAGE, depending on the Source Type Name. + """ + if source_type_name and workflow_type == WorkflowType.INGEST: + if source_type_name.endswith("lineage"): + return WorkflowType.LINEAGE + elif source_type_name.endswith("usage"): + return WorkflowType.USAGE + return workflow_type + + @staticmethod + def _print_file_example( + source_type_name: Optional[str], workflow_type: WorkflowType + ): + """ + Print an example file for a given configuration + """ + if source_type_name is not None: + example_file = WorkflowInitErrorHandler._calculate_example_file( + source_type_name, workflow_type + ) + example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" + if not example_path.exists(): + example_file = DEFAULT_EXAMPLE_FILE[workflow_type] + example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" + log_ansi_encoded_string( + message=f"\nMake sure you are following the following format e.g. '{example_file}':" + ) + log_ansi_encoded_string(message="------------") + with open(example_path, encoding=UTF_8) as file: + log_ansi_encoded_string(message=file.read()) + log_ansi_encoded_string(message="------------") + + @staticmethod + def _calculate_example_file( + source_type_name: str, workflow_type: WorkflowType + ) -> str: + """ + Calculates the ingestion type depending on the source type name and workflow_type + """ + if workflow_type == WorkflowType.USAGE: + return f"{source_type_name}_usage" + if workflow_type == WorkflowType.LINEAGE: + return f"{source_type_name}_lineage" + if workflow_type == WorkflowType.PROFILE: + return f"{source_type_name}_profiler" + if workflow_type == WorkflowType.TEST: + return DEFAULT_EXAMPLE_FILE[workflow_type] + return source_type_name + + @staticmethod + def _print_more_info(workflow_type: WorkflowType) -> None: + """ + Print more information message + """ + log_ansi_encoded_string( + message=f"\nFor more information, please visit: {URLS[workflow_type]}" + + "\nOr join us in Slack: https://slack.open-metadata.org/" + ) + + @staticmethod + def _print_error_msg(msg: str) -> None: + """ + Print message with error style + """ + log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index 468bddac8c37..c88f03f3a618 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -14,29 +14,22 @@ """ import time -import traceback -from enum import Enum -from pathlib import Path from typing import Any, Dict, List, Optional, Type, Union from pydantic import BaseModel from tabulate import tabulate -from metadata.config.common import ConfigurationError -from metadata.generated.schema.entity.services.ingestionPipelines.status import ( - StackTraceError, -) -from metadata.ingestion.api.parser import ( - InvalidWorkflowException, - ParsingConfigurationError, -) +from metadata.ingestion.api.status import TruncatedStackTraceError from metadata.ingestion.api.step import Step, Summary from metadata.ingestion.lineage.models import QueryParsingFailures -from metadata.utils.constants import UTF_8 from metadata.utils.deprecation import deprecated from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string +from metadata.workflow.workflow_init_error_handler import ( + WorkflowInitErrorHandler, + WorkflowType, +) from metadata.workflow.workflow_status_mixin import WorkflowResultStatus WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" @@ -50,45 +43,29 @@ class Failure(BaseModel): """ name: str - failures: List[StackTraceError] - - -class WorkflowType(Enum): - """ - Workflow type enums based on the `metadata` CLI commands - """ - - INGEST = "ingest" - PROFILE = "profile" - TEST = "test" - LINEAGE = "lineage" - USAGE = "usage" - INSIGHT = "insight" - APP = "application" - - -EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" - -URLS = { - WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", - WorkflowType.PROFILE: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", - WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", - WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", - WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", -} - -DEFAULT_EXAMPLE_FILE = { - WorkflowType.INGEST: "bigquery", - WorkflowType.PROFILE: "bigquery_profiler", - WorkflowType.TEST: "test_suite", - WorkflowType.LINEAGE: "bigquery_lineage", - WorkflowType.USAGE: "bigquery_usage", -} + failures: List[TruncatedStackTraceError] @deprecated(message="Use 'workflow.print_status()' instead.", release="1.8") -def print_status(workflow: "BaseWorkflow"): # pyright: ignore[ReportUndefinedVariable] - workflow.print_status() +def print_status( + workflow: "BaseWorkflow", +): # pyright: ignore[reportUndefinedVariable, reportUnknownParameterType] + workflow.print_status() # pyright: ignore[reportUnknownMemberType] + + +@deprecated( + message=( + "Use 'WorkflowInitErrorHandler.print_init_error(exc, config, workflow_type)'" + " from 'metadata.workflow.workflow_init_error_handler'" + ), + release="1.8", +) +def print_init_error( + exc: Union[Exception, Type[Exception]], + config: Dict[str, Any], + workflow_type: WorkflowType = WorkflowType.INGEST, +): + WorkflowInitErrorHandler.print_init_error(exc, config, workflow_type) class WorkflowOutputHandler: @@ -111,7 +88,7 @@ def print_status( color=ANSI.BRIGHT_CYAN, bold=True, message="Workflow finished in time: " - f"{pretty_print_time_duration(time.time() - start_time)}", + + f"{pretty_print_time_duration(time.time() - start_time)}", ) if result_status == WorkflowResultStatus.FAILURE: @@ -122,6 +99,7 @@ def print_status( ) def print_summary(self, steps: List[Step], debug: bool = False): + """Prints the summary information for a Workflow Execution.""" if debug: self._print_debug_summary(steps) self._print_execution_time_summary() @@ -130,7 +108,7 @@ def print_summary(self, steps: List[Step], debug: bool = False): self._print_summary(steps) def _print_summary(self, steps: List[Step]): - failures = [] + failures: List[Failure] = [] total_records: int = 0 total_errors: int = 0 @@ -164,7 +142,7 @@ def _print_summary(self, steps: List[Step]): color=ANSI.BRIGHT_CYAN, bold=True, message=f"Success %: " - f"{round(total_success * 100 / (total_success + total_errors), 2)}", + + f"{round(total_success * 100 / (total_success + total_errors), 2)}", ) def _print_debug_summary(self, steps: List[Step]): @@ -178,7 +156,7 @@ def _print_execution_time_summary(self): """Log the ExecutionTimeTracker Summary.""" tracker = ExecutionTimeTracker() - summary_table = { + summary_table: Dict[str, List[Union[str, float]]] = { "Context": [], "Execution Time Aggregate": [], } @@ -196,7 +174,7 @@ def _print_query_parsing_issues(self): """Log the QueryParsingFailures Summary.""" query_failures = QueryParsingFailures() - summary_table = { + summary_table: Dict[str, List[Optional[str]]] = { "Query": [], "Error": [], } @@ -208,10 +186,10 @@ def _print_query_parsing_issues(self): if summary_table["Query"]: log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary") log_ansi_encoded_string( - message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}" + message=f"\n{tabulate(summary_table, tablefmt='grid', headers=list(summary_table.keys()))}" ) - def _get_failures(self, failure: Failure) -> List[Dict[str, str]]: + def _get_failures(self, failure: Failure) -> List[Dict[str, Optional[str]]]: return [ { "From": failure.name, @@ -244,99 +222,3 @@ def _print_failures_if_apply(self, failures: List[Failure]) -> None: log_ansi_encoded_string( message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}" ) - - -def calculate_ingestion_type(source_type_name: str) -> WorkflowType: - """ - Calculates the ingestion type depending on the source type name - """ - if source_type_name.endswith("lineage"): - return WorkflowType.LINEAGE - if source_type_name.endswith("usage"): - return WorkflowType.USAGE - return WorkflowType.INGEST - - -def calculate_example_file(source_type_name: str, workflow_type: WorkflowType) -> str: - """ - Calculates the ingestion type depending on the source type name and workflow_type - """ - if workflow_type == WorkflowType.USAGE: - return f"{source_type_name}_usage" - if workflow_type == WorkflowType.LINEAGE: - return f"{source_type_name}_lineage" - if workflow_type == WorkflowType.PROFILE: - return f"{source_type_name}_profiler" - if workflow_type == WorkflowType.TEST: - return DEFAULT_EXAMPLE_FILE[workflow_type] - return source_type_name - - -def print_file_example(source_type_name: str, workflow_type: WorkflowType): - """ - Print an example file for a given configuration - """ - if source_type_name is not None: - example_file = calculate_example_file(source_type_name, workflow_type) - example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" - if not example_path.exists(): - example_file = DEFAULT_EXAMPLE_FILE[workflow_type] - example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" - log_ansi_encoded_string( - message=f"\nMake sure you are following the following format e.g. '{example_file}':" - ) - log_ansi_encoded_string(message="------------") - with open(example_path, encoding=UTF_8) as file: - log_ansi_encoded_string(message=file.read()) - log_ansi_encoded_string(message="------------") - - -def print_more_info(workflow_type: WorkflowType) -> None: - """ - Print more information message - """ - log_ansi_encoded_string( - message=f"\nFor more information, please visit: {URLS[workflow_type]}" - "\nOr join us in Slack: https://slack.open-metadata.org/" - ) - - -def print_error_msg(msg: str) -> None: - """ - Print message with error style - """ - log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") - - -def print_init_error( - exc: Union[Exception, Type[Exception]], - config: dict, - workflow_type: WorkflowType = WorkflowType.INGEST, -) -> None: - """ - Print a workflow initialization error - """ - source_type_name = None - if ( - config - and config.get("source", None) is not None - and config["source"].get("type", None) is not None - ): - source_type_name = config["source"].get("type") - source_type_name = source_type_name.replace("-", "-") - workflow_type = ( - calculate_ingestion_type(source_type_name) - if workflow_type == WorkflowType.INGEST - else workflow_type - ) - - if isinstance( - exc, (ParsingConfigurationError, ConfigurationError, InvalidWorkflowException) - ): - print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}") - print_file_example(source_type_name, workflow_type) - print_more_info(workflow_type) - else: - print_error_msg(f"\nError initializing {workflow_type.name}: {exc}") - print_error_msg(traceback.format_exc()) - print_more_info(workflow_type) From 7795560a074e6301feede490cd4e658954d74535 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 24 Jul 2024 12:47:35 +0200 Subject: [PATCH 4/8] Fix static checks --- .../src/metadata/workflow/workflow_init_error_handler.py | 2 +- ingestion/src/metadata/workflow/workflow_output_handler.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/workflow/workflow_init_error_handler.py b/ingestion/src/metadata/workflow/workflow_init_error_handler.py index d0f79abbaecf..b6ded8223050 100644 --- a/ingestion/src/metadata/workflow/workflow_init_error_handler.py +++ b/ingestion/src/metadata/workflow/workflow_init_error_handler.py @@ -121,7 +121,7 @@ def _update_workflow_type( if source_type_name and workflow_type == WorkflowType.INGEST: if source_type_name.endswith("lineage"): return WorkflowType.LINEAGE - elif source_type_name.endswith("usage"): + if source_type_name.endswith("usage"): return WorkflowType.USAGE return workflow_type diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index c88f03f3a618..87e64c4f48ef 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -48,8 +48,8 @@ class Failure(BaseModel): @deprecated(message="Use 'workflow.print_status()' instead.", release="1.8") def print_status( - workflow: "BaseWorkflow", -): # pyright: ignore[reportUndefinedVariable, reportUnknownParameterType] + workflow: "BaseWorkflow", # pyright: ignore[reportUndefinedVariable,reportUnknownParameterType] +): workflow.print_status() # pyright: ignore[reportUnknownMemberType] @@ -141,7 +141,7 @@ def _print_summary(self, steps: List[Step]): log_ansi_encoded_string( color=ANSI.BRIGHT_CYAN, bold=True, - message=f"Success %: " + message="Success %: " + f"{round(total_success * 100 / (total_success + total_errors), 2)}", ) From 0038b8c3e946f16d366a624b664c62aebd2b706a Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 24 Jul 2024 14:50:35 +0200 Subject: [PATCH 5/8] Fix tests --- .../system/test_bigquery_system_metrics.py | 2 +- .../system/test_redshift_system_metrics.py | 2 +- .../system/test_snowflake_system_metrics.py | 2 +- .../orm_profiler/test_orm_profiler_e2e.py | 14 +++++++------- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py index 50cf2bba2ec8..17349416ac6e 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py @@ -167,7 +167,7 @@ def test_bigquery_system_metrics(self): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py index 0743911e8622..64779d1f5ca9 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py @@ -134,7 +134,7 @@ def test_redshift_system_metrics(self): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py index 6172f70869b0..20ef0e47502b 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py @@ -158,7 +158,7 @@ def test_snowflake_system_metrics(self): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index c2b2e0835abc..8b8c6ec34e8d 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -308,7 +308,7 @@ def test_workflow_sample_profile(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -360,7 +360,7 @@ def test_workflow_datetime_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -399,7 +399,7 @@ def test_workflow_datetime_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -447,7 +447,7 @@ def test_workflow_integer_range_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -487,7 +487,7 @@ def test_workflow_integer_range_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -534,7 +534,7 @@ def test_workflow_values_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -574,7 +574,7 @@ def test_workflow_values_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( From 00611e34968da3446f01064a3a45745ec93306c1 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 24 Jul 2024 17:14:35 +0200 Subject: [PATCH 6/8] Fix tests --- .../src/metadata/workflow/workflow_status_mixin.py | 6 +++--- .../orm_profiler/test_datalake_profiler_e2e.py | 11 ++++++----- .../integration/orm_profiler/test_orm_profiler_e2e.py | 9 +++++---- .../tests/integration/profiler/test_nosql_profiler.py | 3 ++- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index 4e85791423a9..e648ed00d439 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -14,7 +14,7 @@ import traceback import uuid from datetime import datetime -from enum import Enum, auto +from enum import Enum from typing import Optional, Tuple from metadata.config.common import WorkflowExecutionError @@ -41,8 +41,8 @@ class WorkflowResultStatus(Enum): - SUCCESS = auto() - FAILURE = auto() + SUCCESS = 0 + FAILURE = 1 class WorkflowStatusMixin: diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py index c2092669d6a4..7106be330ee0 100644 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py @@ -41,6 +41,7 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import WorkflowResultStatus SERVICE_NAME = Path(__file__).stem REGION = "us-west-1" @@ -158,7 +159,7 @@ def test_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table_profile = self.metadata.get_profile_data( f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', @@ -206,7 +207,7 @@ def test_values_partitioned_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -251,7 +252,7 @@ def test_datetime_partitioned_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -297,7 +298,7 @@ def test_integer_range_partitioned_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -354,7 +355,7 @@ def test_datalake_profiler_workflow_with_custom_profiler_config(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index 8b8c6ec34e8d..21d08bd95cbb 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -44,6 +44,7 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import WorkflowResultStatus logging.basicConfig(level=logging.WARN) logger = logging.getLogger(__name__) @@ -252,7 +253,7 @@ def test_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -277,7 +278,7 @@ def test_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -632,7 +633,7 @@ def test_datalake_profiler_workflow_with_custom_profiler_config(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -725,7 +726,7 @@ def test_sample_data_ingestion(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index c86b1bb64e10..08cea47bc782 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -45,6 +45,7 @@ from metadata.utils.time_utils import get_end_of_day_timestamp_mill from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import WorkflowResultStatus SERVICE_NAME = Path(__file__).stem @@ -175,7 +176,7 @@ def run_profiler_workflow(self, config): profiler_workflow.execute() status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS def test_simple(self): workflow_config = deepcopy(self.ingestion_config) From a8f4e25705f9cc0c4d2356a4d26fade6026b33be Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Fri, 26 Jul 2024 10:31:52 +0200 Subject: [PATCH 7/8] Update code based on comments from PR --- ingestion/src/metadata/cli/dataquality.py | 10 +- ingestion/src/metadata/cli/ingest.py | 12 ++- ingestion/src/metadata/cli/insight.py | 10 +- ingestion/src/metadata/cli/lineage.py | 12 ++- ingestion/src/metadata/cli/profile.py | 10 +- ingestion/src/metadata/cli/usage.py | 10 +- .../workflow/application_output_handler.py | 2 +- ingestion/src/metadata/workflow/base.py | 13 +-- .../src/metadata/workflow/output_handler.py | 69 ++++++++++++++ .../workflow/workflow_init_error_handler.py | 95 +++++++------------ .../workflow/workflow_output_handler.py | 17 +++- .../test_deprecated_workflow_functions.py | 27 ++++++ 12 files changed, 178 insertions(+), 109 deletions(-) create mode 100644 ingestion/src/metadata/workflow/output_handler.py create mode 100644 ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index b0eb28a61207..b4b53799008b 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -17,12 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, - WorkflowType, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,7 +42,7 @@ def run_test(config_path: Path) -> None: except Exception as exc: logger.debug(traceback.format_exc()) WorkflowInitErrorHandler.print_init_error( - exc, workflow_config_dict, WorkflowType.TEST + exc, workflow_config_dict, PipelineType.TestSuite ) sys.exit(1) diff --git a/ingestion/src/metadata/cli/ingest.py b/ingestion/src/metadata/cli/ingest.py index 22cf8575c136..e9ee62c00af6 100644 --- a/ingestion/src/metadata/cli/ingest.py +++ b/ingestion/src/metadata/cli/ingest.py @@ -17,12 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, - WorkflowType, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -41,7 +41,9 @@ def run_ingest(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error( + exc, config_dict, PipelineType.metadata + ) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/cli/insight.py b/ingestion/src/metadata/cli/insight.py index 0daf7ab5addb..6e54dac787bc 100644 --- a/ingestion/src/metadata/cli/insight.py +++ b/ingestion/src/metadata/cli/insight.py @@ -17,12 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, - WorkflowType, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,7 +42,7 @@ def run_insight(config_path: Path) -> None: except Exception as exc: logger.debug(traceback.format_exc()) WorkflowInitErrorHandler.print_init_error( - exc, config_dict, WorkflowType.INSIGHT + exc, config_dict, PipelineType.dataInsight ) sys.exit(1) diff --git a/ingestion/src/metadata/cli/lineage.py b/ingestion/src/metadata/cli/lineage.py index 98af1900db63..f2246fa1cfaa 100644 --- a/ingestion/src/metadata/cli/lineage.py +++ b/ingestion/src/metadata/cli/lineage.py @@ -21,14 +21,14 @@ from metadata.config.common import load_config_file from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import UTF_8 from metadata.utils.logger import cli_logger -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, - WorkflowType, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -56,7 +56,9 @@ def run_lineage(config_path: Path) -> None: except Exception as exc: logger.debug(traceback.format_exc()) - WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error( + exc, config_dict, PipelineType.lineage + ) sys.exit(1) if workflow.filePath: diff --git a/ingestion/src/metadata/cli/profile.py b/ingestion/src/metadata/cli/profile.py index 8c1db551c130..473f35ef983f 100644 --- a/ingestion/src/metadata/cli/profile.py +++ b/ingestion/src/metadata/cli/profile.py @@ -17,12 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, - WorkflowType, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,7 +42,7 @@ def run_profiler(config_path: Path) -> None: except Exception as exc: logger.debug(traceback.format_exc()) WorkflowInitErrorHandler.print_init_error( - exc, workflow_config_dict, WorkflowType.PROFILE + exc, workflow_config_dict, PipelineType.profiler ) sys.exit(1) diff --git a/ingestion/src/metadata/cli/usage.py b/ingestion/src/metadata/cli/usage.py index aaeea33366c0..251b67b367c0 100644 --- a/ingestion/src/metadata/cli/usage.py +++ b/ingestion/src/metadata/cli/usage.py @@ -17,12 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, - WorkflowType, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -41,7 +41,7 @@ def run_usage(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.usage) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/workflow/application_output_handler.py b/ingestion/src/metadata/workflow/application_output_handler.py index ec5177b8fece..9cc55c410d0e 100644 --- a/ingestion/src/metadata/workflow/application_output_handler.py +++ b/ingestion/src/metadata/workflow/application_output_handler.py @@ -16,6 +16,6 @@ from metadata.workflow.base import BaseWorkflow -@deprecated(message="Use 'workflow.print_status()' instead.", release="1.8") +@deprecated(message="Use 'workflow.print_status()' instead.", release="1.6") def print_status(workflow: BaseWorkflow): workflow.print_status() diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 79b506a859f3..573f970de2d9 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -85,11 +85,12 @@ def __init__( log_level: LogLevels, metadata_config: OpenMetadataConnection, service_type: ServiceType, + output_handler: WorkflowOutputHandler = WorkflowOutputHandler(), ): """ Disabling pylint to wait for workflow reimplementation as a topology """ - self._output_handler = None + self.output_handler = output_handler self.config = config self.service_type = service_type self._timer: Optional[RepeatedTimer] = None @@ -108,12 +109,6 @@ def __init__( self.post_init() - @property - def output_handler(self) -> WorkflowOutputHandler: - if not self._output_handler: - self._output_handler = WorkflowOutputHandler() - return self._output_handler - @property def ingestion_pipeline(self) -> Optional[IngestionPipeline]: """Get or create the Ingestion Pipeline from the configuration""" @@ -122,10 +117,6 @@ def ingestion_pipeline(self) -> Optional[IngestionPipeline]: return self._ingestion_pipeline - def with_output_handler(self, output_handler: WorkflowOutputHandler): - self._output_handler = output_handler - return self - def stop(self) -> None: """ Main stopping logic diff --git a/ingestion/src/metadata/workflow/output_handler.py b/ingestion/src/metadata/workflow/output_handler.py new file mode 100644 index 000000000000..94286a337b4e --- /dev/null +++ b/ingestion/src/metadata/workflow/output_handler.py @@ -0,0 +1,69 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module that handles the legacy WorkflowType until deprecation +""" +from enum import Enum +from typing import Optional + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) +from metadata.utils.deprecation import deprecated + + +@deprecated( + message="Use 'PipelineType' in 'metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline'", + release="1.6", +) +class WorkflowType(Enum): + """ + Workflow type enums based on the `metadata` CLI commands + """ + + INGEST = "ingest" + PROFILE = "profile" + TEST = "test" + LINEAGE = "lineage" + USAGE = "usage" + INSIGHT = "insight" + APP = "application" + + +# TODO: Delete this method after the removal of WorkflowType in release 1.6 +# Remember to remove it where it is being used +def workflow_type_to_pipeline_type( + workflow_type: WorkflowType, source_type_name: Optional[str] +) -> PipelineType: + """Helper Function to Map between the Deprecated WorkflowType to PipelineType.""" + + def _fix_ingest_type() -> PipelineType: + """Helper Function to Map between the Deprecated WorkflowType.INGESTION and the + correct PipelineType.""" + if source_type_name: + if source_type_name.endswith("lineage"): + return PipelineType.lineage + if source_type_name.endswith("usage"): + return PipelineType.usage + return PipelineType.metadata + + map_ = { + WorkflowType.INGEST: _fix_ingest_type(), + WorkflowType.PROFILE: PipelineType.profiler, + WorkflowType.TEST: PipelineType.TestSuite, + WorkflowType.LINEAGE: PipelineType.lineage, + WorkflowType.USAGE: PipelineType.usage, + WorkflowType.INSIGHT: PipelineType.dataInsight, + WorkflowType.APP: PipelineType.application, + } + + return map_.get(workflow_type, PipelineType.metadata) diff --git a/ingestion/src/metadata/workflow/workflow_init_error_handler.py b/ingestion/src/metadata/workflow/workflow_init_error_handler.py index b6ded8223050..ff1cffa26e0f 100644 --- a/ingestion/src/metadata/workflow/workflow_init_error_handler.py +++ b/ingestion/src/metadata/workflow/workflow_init_error_handler.py @@ -13,11 +13,13 @@ Module handles the init error messages from different workflows """ import traceback -from enum import Enum from pathlib import Path from typing import Any, Dict, Optional, Type, Union from metadata.config.common import ConfigurationError +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.ingestion.api.parser import ( InvalidWorkflowException, ParsingConfigurationError, @@ -25,39 +27,25 @@ from metadata.utils.constants import UTF_8 from metadata.utils.logger import ANSI, log_ansi_encoded_string - -class WorkflowType(Enum): - """ - Workflow type enums based on the `metadata` CLI commands - """ - - INGEST = "ingest" - PROFILE = "profile" - TEST = "test" - LINEAGE = "lineage" - USAGE = "usage" - INSIGHT = "insight" - APP = "application" - - EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" -URLS = { - WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", - WorkflowType.PROFILE: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", - WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", - WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", - WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", +URLS: Dict[PipelineType, str] = { + PipelineType.metadata: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", + PipelineType.profiler: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", + PipelineType.TestSuite: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", + PipelineType.lineage: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", + PipelineType.usage: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", + PipelineType.dbt: "https://docs.open-metadata.org/connectors/ingestion/workflows/dbt", } -DEFAULT_EXAMPLE_FILE = { - WorkflowType.INGEST: "bigquery", - WorkflowType.PROFILE: "bigquery_profiler", - WorkflowType.TEST: "test_suite", - WorkflowType.LINEAGE: "bigquery_lineage", - WorkflowType.USAGE: "bigquery_usage", +DEFAULT_EXAMPLE_FILE: Dict[PipelineType, str] = { + PipelineType.metadata: "bigquery", + PipelineType.profiler: "bigquery_profiler", + PipelineType.TestSuite: "test_suite", + PipelineType.lineage: "bigquery_lineage", + PipelineType.usage: "bigquery_usage", } @@ -68,33 +56,30 @@ class WorkflowInitErrorHandler: def print_init_error( exc: Union[Exception, Type[Exception]], config: Dict[str, Any], - workflow_type: WorkflowType = WorkflowType.INGEST, - ) -> None: + pipeline_type: PipelineType = PipelineType.metadata, + ): """ Print a workflow initialization error """ source_type_name = WorkflowInitErrorHandler._get_source_type_name(config) - workflow_type = WorkflowInitErrorHandler._update_workflow_type( - source_type_name, workflow_type - ) if isinstance( exc, (ParsingConfigurationError, ConfigurationError, InvalidWorkflowException), ): WorkflowInitErrorHandler._print_error_msg( - f"Error loading {workflow_type.name} configuration: {exc}" + f"Error loading {pipeline_type.name} configuration: {exc}" ) WorkflowInitErrorHandler._print_file_example( - source_type_name, workflow_type + source_type_name, pipeline_type ) else: WorkflowInitErrorHandler._print_error_msg( - f"\nError initializing {workflow_type.name}: {exc}" + f"\nError initializing {pipeline_type.name}: {exc}" ) WorkflowInitErrorHandler._print_error_msg(traceback.format_exc()) - WorkflowInitErrorHandler._print_more_info(workflow_type) + WorkflowInitErrorHandler._print_more_info(pipeline_type) @staticmethod def _get_source_type_name(config: Dict[str, Any]) -> Optional[str]: @@ -111,34 +96,20 @@ def _get_source_type_name(config: Dict[str, Any]) -> Optional[str]: return source_type_name - @staticmethod - def _update_workflow_type( - source_type_name: Optional[str], workflow_type: WorkflowType - ) -> WorkflowType: - """Updates the WorkflowType if needed. - When WorkflowType.INGEST is received, it can be algo LINEAGE or USAGE, depending on the Source Type Name. - """ - if source_type_name and workflow_type == WorkflowType.INGEST: - if source_type_name.endswith("lineage"): - return WorkflowType.LINEAGE - if source_type_name.endswith("usage"): - return WorkflowType.USAGE - return workflow_type - @staticmethod def _print_file_example( - source_type_name: Optional[str], workflow_type: WorkflowType + source_type_name: Optional[str], pipeline_type: PipelineType ): """ Print an example file for a given configuration """ if source_type_name is not None: example_file = WorkflowInitErrorHandler._calculate_example_file( - source_type_name, workflow_type + source_type_name, pipeline_type ) example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" if not example_path.exists(): - example_file = DEFAULT_EXAMPLE_FILE[workflow_type] + example_file = DEFAULT_EXAMPLE_FILE[pipeline_type] example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" log_ansi_encoded_string( message=f"\nMake sure you are following the following format e.g. '{example_file}':" @@ -150,28 +121,28 @@ def _print_file_example( @staticmethod def _calculate_example_file( - source_type_name: str, workflow_type: WorkflowType + source_type_name: str, pipeline_type: PipelineType ) -> str: """ Calculates the ingestion type depending on the source type name and workflow_type """ - if workflow_type == WorkflowType.USAGE: + if pipeline_type == PipelineType.usage: return f"{source_type_name}_usage" - if workflow_type == WorkflowType.LINEAGE: + if pipeline_type == PipelineType.lineage: return f"{source_type_name}_lineage" - if workflow_type == WorkflowType.PROFILE: + if pipeline_type == PipelineType.profiler: return f"{source_type_name}_profiler" - if workflow_type == WorkflowType.TEST: - return DEFAULT_EXAMPLE_FILE[workflow_type] + if pipeline_type == PipelineType.TestSuite: + return DEFAULT_EXAMPLE_FILE[pipeline_type] return source_type_name @staticmethod - def _print_more_info(workflow_type: WorkflowType) -> None: + def _print_more_info(pipeline_type: PipelineType) -> None: """ Print more information message """ log_ansi_encoded_string( - message=f"\nFor more information, please visit: {URLS[workflow_type]}" + message=f"\nFor more information, please visit: {URLS[pipeline_type]}" + "\nOr join us in Slack: https://slack.open-metadata.org/" ) diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index 87e64c4f48ef..c5b0469b14cc 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -26,10 +26,11 @@ from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string -from metadata.workflow.workflow_init_error_handler import ( - WorkflowInitErrorHandler, +from metadata.workflow.output_handler import ( WorkflowType, + workflow_type_to_pipeline_type, ) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler from metadata.workflow.workflow_status_mixin import WorkflowResultStatus WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" @@ -46,7 +47,7 @@ class Failure(BaseModel): failures: List[TruncatedStackTraceError] -@deprecated(message="Use 'workflow.print_status()' instead.", release="1.8") +@deprecated(message="Use 'workflow.print_status()' instead.", release="1.6") def print_status( workflow: "BaseWorkflow", # pyright: ignore[reportUndefinedVariable,reportUnknownParameterType] ): @@ -58,14 +59,20 @@ def print_status( "Use 'WorkflowInitErrorHandler.print_init_error(exc, config, workflow_type)'" " from 'metadata.workflow.workflow_init_error_handler'" ), - release="1.8", + release="1.6", ) def print_init_error( exc: Union[Exception, Type[Exception]], config: Dict[str, Any], workflow_type: WorkflowType = WorkflowType.INGEST, ): - WorkflowInitErrorHandler.print_init_error(exc, config, workflow_type) + # pylint: disable=W0212 + source_type_name = WorkflowInitErrorHandler._get_source_type_name( # type: ignore[reportPrivateUsage] + config + ) + WorkflowInitErrorHandler.print_init_error( + exc, config, workflow_type_to_pipeline_type(workflow_type, source_type_name) + ) class WorkflowOutputHandler: diff --git a/ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py b/ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py new file mode 100644 index 000000000000..a5639218639e --- /dev/null +++ b/ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py @@ -0,0 +1,27 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Validate the deprecated functions still work. +""" +from metadata.workflow.workflow_output_handler import print_init_error, print_status + +from .test_base_workflow import SimpleWorkflow, config + + +# TODO: remove after the print_status and print_init_error functions are removed in Release 1.6 +class TestDeprecatedSimpleWorkflow: + def test_workflow_print_status(self): + workflow = SimpleWorkflow(config=config) + workflow.execute() + print_status(workflow) + + def test_workflow_print_init_error(self): + print_init_error(Exception(), config.model_dump()) From 8b9a89e29c12348d9d08da86fe4463d4d7cd53ba Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Fri, 26 Jul 2024 11:00:43 +0200 Subject: [PATCH 8/8] Update comment --- ingestion/src/metadata/workflow/workflow_output_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index c5b0469b14cc..d1a2070e2a7e 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -67,7 +67,7 @@ def print_init_error( workflow_type: WorkflowType = WorkflowType.INGEST, ): # pylint: disable=W0212 - source_type_name = WorkflowInitErrorHandler._get_source_type_name( # type: ignore[reportPrivateUsage] + source_type_name = WorkflowInitErrorHandler._get_source_type_name( # pyright: ignore[reportPrivateUsage] config ) WorkflowInitErrorHandler.print_init_error(