diff --git a/ingestion/examples/airflow/dags/airflow_extended_sample_data.py b/ingestion/examples/airflow/dags/airflow_extended_sample_data.py index 9c449771bf2e..a593fe5da7a3 100644 --- a/ingestion/examples/airflow/dags/airflow_extended_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_extended_sample_data.py @@ -14,8 +14,6 @@ import yaml from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -63,7 +61,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/examples/airflow/dags/airflow_metadata_extraction.py b/ingestion/examples/airflow/dags/airflow_metadata_extraction.py index 94372e2ff782..e007810bd98f 100644 --- a/ingestion/examples/airflow/dags/airflow_metadata_extraction.py +++ b/ingestion/examples/airflow/dags/airflow_metadata_extraction.py @@ -20,8 +20,6 @@ import yaml from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -72,7 +70,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/examples/airflow/dags/airflow_sample_data.py b/ingestion/examples/airflow/dags/airflow_sample_data.py index f43dad6e1eba..6d2b31ec769b 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_sample_data.py @@ -14,8 +14,6 @@ import yaml from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -62,7 +60,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/examples/airflow/dags/airflow_sample_usage.py b/ingestion/examples/airflow/dags/airflow_sample_usage.py index 306b149ca596..a7420493f115 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_usage.py +++ b/ingestion/examples/airflow/dags/airflow_sample_usage.py @@ -14,8 +14,6 @@ from airflow import DAG -from metadata.workflow.workflow_output_handler import print_status - try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: @@ -88,7 +86,7 @@ def metadata_ingestion_workflow(): workflow = UsageWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index eb494f727ebf..52b40e96b38d 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -25,7 +25,6 @@ from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import print_status WORKFLOW_MAP = { PipelineType.metadata.value: MetadataWorkflow, @@ -109,7 +108,7 @@ def main(): workflow = workflow_class.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index a7e3273384a7..9eca4ddd05cf 100644 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -171,8 +171,17 @@ ignore = [ "src/metadata/readers/*", "src/metadata/timer/*", "src/metadata/utils/*", - "src/metadata/workflow/*", + "src/metadata/workflow/base.py", + "src/metadata/workflow/application.py", + "src/metadata/workflow/data_insight.py", + "src/metadata/workflow/data_quality.py", + "src/metadata/workflow/ingestion.py", + "src/metadata/workflow/metadata.py", + "src/metadata/workflow/profiler.py", + "src/metadata/workflow/usage.py", + "src/metadata/workflow/workflow_status_mixin.py", ] reportDeprecated = false reportMissingTypeStubs = false +reportAny = false diff --git a/ingestion/src/metadata/cli/app.py b/ingestion/src/metadata/cli/app.py index 39a4a1b01da4..4d3ed02d1cd7 100644 --- a/ingestion/src/metadata/cli/app.py +++ b/ingestion/src/metadata/cli/app.py @@ -19,7 +19,6 @@ from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger from metadata.workflow.application import ApplicationWorkflow -from metadata.workflow.application_output_handler import print_status logger = cli_logger() @@ -42,5 +41,5 @@ def run_app(config_path: Path) -> None: workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index edff3964e25e..b4b53799008b 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -17,13 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,10 +41,12 @@ def run_test(config_path: Path) -> None: workflow = TestSuiteWorkflow.create(workflow_config_dict) except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, workflow_config_dict, WorkflowType.TEST) + WorkflowInitErrorHandler.print_init_error( + exc, workflow_config_dict, PipelineType.TestSuite + ) sys.exit(1) workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/ingest.py b/ingestion/src/metadata/cli/ingest.py index 82c83f183fbb..e9ee62c00af6 100644 --- a/ingestion/src/metadata/cli/ingest.py +++ b/ingestion/src/metadata/cli/ingest.py @@ -17,13 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,10 +41,12 @@ def run_ingest(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error( + exc, config_dict, PipelineType.metadata + ) sys.exit(1) workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/insight.py b/ingestion/src/metadata/cli/insight.py index fb026e32ddba..6e54dac787bc 100644 --- a/ingestion/src/metadata/cli/insight.py +++ b/ingestion/src/metadata/cli/insight.py @@ -17,13 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,10 +41,12 @@ def run_insight(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INSIGHT) + WorkflowInitErrorHandler.print_init_error( + exc, config_dict, PipelineType.dataInsight + ) sys.exit(1) workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/lineage.py b/ingestion/src/metadata/cli/lineage.py index dba0ed612582..f2246fa1cfaa 100644 --- a/ingestion/src/metadata/cli/lineage.py +++ b/ingestion/src/metadata/cli/lineage.py @@ -21,11 +21,14 @@ from metadata.config.common import load_config_file from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import UTF_8 from metadata.utils.logger import cli_logger -from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -53,7 +56,9 @@ def run_lineage(config_path: Path) -> None: except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error( + exc, config_dict, PipelineType.lineage + ) sys.exit(1) if workflow.filePath: diff --git a/ingestion/src/metadata/cli/profile.py b/ingestion/src/metadata/cli/profile.py index 106780ef7f08..473f35ef983f 100644 --- a/ingestion/src/metadata/cli/profile.py +++ b/ingestion/src/metadata/cli/profile.py @@ -17,13 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,10 +41,12 @@ def run_profiler(config_path: Path) -> None: workflow = ProfilerWorkflow.create(workflow_config_dict) except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE) + WorkflowInitErrorHandler.print_init_error( + exc, workflow_config_dict, PipelineType.profiler + ) sys.exit(1) workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/cli/usage.py b/ingestion/src/metadata/cli/usage.py index 20a87a562730..251b67b367c0 100644 --- a/ingestion/src/metadata/cli/usage.py +++ b/ingestion/src/metadata/cli/usage.py @@ -17,13 +17,12 @@ from pathlib import Path from metadata.config.common import load_config_file +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.utils.logger import cli_logger from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import ( - WorkflowType, - print_init_error, - print_status, -) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler logger = cli_logger() @@ -42,10 +41,10 @@ def run_usage(config_path: Path) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.INGEST) + WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.usage) sys.exit(1) workflow.execute() workflow.stop() - print_status(workflow) + workflow.print_status() workflow.raise_from_status() diff --git a/ingestion/src/metadata/ingestion/ometa/client_utils.py b/ingestion/src/metadata/ingestion/ometa/client_utils.py index 85cef0d4f9b7..bc5e54616d96 100644 --- a/ingestion/src/metadata/ingestion/ometa/client_utils.py +++ b/ingestion/src/metadata/ingestion/ometa/client_utils.py @@ -19,7 +19,7 @@ OpenMetadataConnection, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName -from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.ometa_api import C, OpenMetadata, T from metadata.utils import fqn from metadata.utils.logger import ometa_logger @@ -28,7 +28,7 @@ def create_ometa_client( metadata_config: OpenMetadataConnection, -) -> OpenMetadata: +) -> OpenMetadata[T, C]: # pyright: ignore[reportInvalidTypeVarUse] """Create an OpenMetadata client Args: @@ -38,7 +38,7 @@ def create_ometa_client( OpenMetadata: an OM client """ try: - metadata = OpenMetadata(metadata_config) + metadata = OpenMetadata[T, C](metadata_config) metadata.health_check() return metadata except Exception as exc: diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index 951ef4c64c30..8157e252d89a 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -62,6 +62,7 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) +from metadata.generated.schema.metadataIngestion.workflow import SourceConfig SERVICE_TYPE_REF = { ServiceType.Database.value: "databaseService", @@ -102,14 +103,14 @@ def _clean(source_type: str): return source_type -def get_pipeline_type_from_source_config(source_config_type) -> PipelineType: +def get_pipeline_type_from_source_config(source_config: SourceConfig) -> PipelineType: """From the YAML serviceType, get the Ingestion Pipeline Type""" pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get( - source_config_type.__class__.__name__ + source_config.config.__class__.__name__ ) if not pipeline_type: raise ValueError( - f"Cannot find Pipeline Type for SourceConfig {source_config_type}" + f"Cannot find Pipeline Type for SourceConfig {source_config.config}" ) return pipeline_type diff --git a/ingestion/src/metadata/utils/execution_time_tracker.py b/ingestion/src/metadata/utils/execution_time_tracker.py index bdac1676c0ed..e9f0e733808f 100644 --- a/ingestion/src/metadata/utils/execution_time_tracker.py +++ b/ingestion/src/metadata/utils/execution_time_tracker.py @@ -17,7 +17,7 @@ from copy import deepcopy from functools import wraps from time import perf_counter -from typing import List, Optional +from typing import Dict, List, Optional from pydantic import BaseModel @@ -81,7 +81,7 @@ class ExecutionTimeTrackerState(metaclass=Singleton): def __init__(self): """Initializes the state and the lock.""" - self.state = {} + self.state: Dict[str, float] = {} self.lock = threading.Lock() def add(self, context: ExecutionTimeTrackerContext, elapsed: float): diff --git a/ingestion/src/metadata/workflow/application_output_handler.py b/ingestion/src/metadata/workflow/application_output_handler.py index e29c60ec5bb5..9cc55c410d0e 100644 --- a/ingestion/src/metadata/workflow/application_output_handler.py +++ b/ingestion/src/metadata/workflow/application_output_handler.py @@ -8,27 +8,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ -Module handles the output messages for applications +Generic Workflow entrypoint to execute Applications """ -import time - -from metadata.utils.helpers import pretty_print_time_duration -from metadata.utils.logger import ANSI, log_ansi_encoded_string -from metadata.workflow.output_handler import print_workflow_summary +from metadata.utils.deprecation import deprecated +from metadata.workflow.base import BaseWorkflow -def print_status(workflow: "ApplicationWorkflow") -> None: - """ - Print the workflow results - """ - print_workflow_summary(workflow) - if workflow.runner.get_status().source_start_time: - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message="Workflow finished in time: " - f"{pretty_print_time_duration(time.time()-workflow.runner.get_status().source_start_time)}", - ) +@deprecated(message="Use 'workflow.print_status()' instead.", release="1.6") +def print_status(workflow: BaseWorkflow): + workflow.print_status() diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index b0bbe0d24e2b..573f970de2d9 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -12,6 +12,7 @@ Base workflow definition. """ +import traceback import uuid from abc import ABC, abstractmethod from datetime import datetime @@ -47,7 +48,7 @@ from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger, set_loggers_level -from metadata.workflow.output_handler import report_ingestion_status +from metadata.workflow.workflow_output_handler import WorkflowOutputHandler from metadata.workflow.workflow_status_mixin import ( SUCCESS_THRESHOLD_VALUE, WorkflowStatusMixin, @@ -84,10 +85,12 @@ def __init__( log_level: LogLevels, metadata_config: OpenMetadataConnection, service_type: ServiceType, + output_handler: WorkflowOutputHandler = WorkflowOutputHandler(), ): """ Disabling pylint to wait for workflow reimplementation as a topology """ + self.output_handler = output_handler self.config = config self.service_type = service_type self._timer: Optional[RepeatedTimer] = None @@ -136,7 +139,7 @@ def timer(self) -> RepeatedTimer: """ if not self._timer: self._timer = RepeatedTimer( - REPORTS_INTERVAL_SECONDS, report_ingestion_status, logger, self + REPORTS_INTERVAL_SECONDS, self._report_ingestion_status ) return self._timer @@ -249,7 +252,7 @@ def get_or_create_ingestion_pipeline(self) -> Optional[IngestionPipeline]: ), ), pipelineType=get_pipeline_type_from_source_config( - self.config.source.sourceConfig.config + self.config.source.sourceConfig ), sourceConfig=self.config.source.sourceConfig, airflowConfig=AirflowConfig(), @@ -277,3 +280,37 @@ def _get_ingestion_pipeline_service(self) -> Optional[T]: entity=get_service_class_from_service_type(self.service_type), fqn=self.config.source.serviceName, ) + + def _report_ingestion_status(self): + """ + Given a logger, use it to INFO the workflow status + """ + try: + for step in self.workflow_steps(): + logger.info( + f"{step.name}: Processed {len(step.status.records)} records," + f" filtered {len(step.status.filtered)} records," + f" found {len(step.status.failures)} errors" + ) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Wild exception reporting status - {exc}") + + def _is_debug_enabled(self) -> bool: + return ( + hasattr(self, "config") + and hasattr(self.config, "workflowConfig") + and hasattr(self.config.workflowConfig, "loggerLevel") + and self.config.workflowConfig.loggerLevel is LogLevels.DEBUG + ) + + def print_status(self): + start_time = self.workflow_steps()[0].get_status().source_start_time + + self.output_handler.print_status( + self.result_status(), + self.workflow_steps(), + start_time, + self._is_debug_enabled(), + ) diff --git a/ingestion/src/metadata/workflow/output_handler.py b/ingestion/src/metadata/workflow/output_handler.py index 8f429f798c92..94286a337b4e 100644 --- a/ingestion/src/metadata/workflow/output_handler.py +++ b/ingestion/src/metadata/workflow/output_handler.py @@ -10,41 +10,21 @@ # limitations under the License. """ -Common Output Handling methods +Module that handles the legacy WorkflowType until deprecation """ -import traceback from enum import Enum -from logging import Logger -from pathlib import Path -from typing import Dict, List +from typing import Optional -from pydantic import BaseModel -from tabulate import tabulate - -from metadata.generated.schema.entity.services.ingestionPipelines.status import ( - StackTraceError, +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, ) -from metadata.generated.schema.metadataIngestion.workflow import LogLevels -from metadata.ingestion.api.step import Summary -from metadata.ingestion.lineage.models import QueryParsingFailures -from metadata.utils.execution_time_tracker import ExecutionTimeTracker -from metadata.utils.helpers import pretty_print_time_duration -from metadata.utils.logger import ANSI, log_ansi_encoded_string - -WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" -WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings" -WORKFLOW_SUCCESS_MESSAGE = "Workflow finished successfully" - - -class Failure(BaseModel): - """ - Auxiliary class to print the error per status - """ - - name: str - failures: List[StackTraceError] +from metadata.utils.deprecation import deprecated +@deprecated( + message="Use 'PipelineType' in 'metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline'", + release="1.6", +) class WorkflowType(Enum): """ Workflow type enums based on the `metadata` CLI commands @@ -59,190 +39,31 @@ class WorkflowType(Enum): APP = "application" -EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" - -URLS = { - WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", - WorkflowType.PROFILE: "https://docs.open-metadata.org/how-to-guides/data-quality-observability/profiler/workflow", - WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", - WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", - WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", -} - -DEFAULT_EXAMPLE_FILE = { - WorkflowType.INGEST: "bigquery", - WorkflowType.PROFILE: "bigquery_profiler", - WorkflowType.TEST: "test_suite", - WorkflowType.LINEAGE: "bigquery_lineage", - WorkflowType.USAGE: "bigquery_usage", -} - - -def print_more_info(workflow_type: WorkflowType) -> None: - """ - Print more information message - """ - log_ansi_encoded_string( - message=f"\nFor more information, please visit: {URLS[workflow_type]}" - "\nOr join us in Slack: https://slack.open-metadata.org/" - ) - - -def print_error_msg(msg: str) -> None: - """ - Print message with error style - """ - log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") - - -def get_failures(failure: Failure) -> List[Dict[str, str]]: - return [ - { - "From": failure.name, - "Entity Name": f.name, - "Message": f.error, - "Stack Trace": f.stackTrace, - } - for f in failure.failures - ] - - -def print_failures_if_apply(failures: List[Failure]) -> None: - # take only the ones that contain failures - failures = [f for f in failures if f.failures] - if failures: - # create a list of dictionaries' list - all_data = [get_failures(failure) for failure in failures] - # create a single of dictionaries - data = [f for fs in all_data for f in fs] - # create a dictionary with a key and a list of values from the list - error_table = {k: [dic[k] for dic in data] for k in data[0]} - if len(list(error_table.items())[0][1]) > 100: - log_ansi_encoded_string( - bold=True, message="Showing only the first 100 failures:" - ) - # truncate list if number of values are over 100 - error_table = {k: v[:100] for k, v in error_table.items()} - else: - log_ansi_encoded_string(bold=True, message="List of failures:") - - log_ansi_encoded_string( - message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}" - ) - - -def is_debug_enabled(workflow) -> bool: - return ( - hasattr(workflow, "config") - and hasattr(workflow.config, "workflowConfig") - and hasattr(workflow.config.workflowConfig, "loggerLevel") - and workflow.config.workflowConfig.loggerLevel is LogLevels.DEBUG - ) - - -def print_execution_time_summary(): - """Log the ExecutionTimeTracker Summary.""" - tracker = ExecutionTimeTracker() - - summary_table = { - "Context": [], - "Execution Time Aggregate": [], +# TODO: Delete this method after the removal of WorkflowType in release 1.6 +# Remember to remove it where it is being used +def workflow_type_to_pipeline_type( + workflow_type: WorkflowType, source_type_name: Optional[str] +) -> PipelineType: + """Helper Function to Map between the Deprecated WorkflowType to PipelineType.""" + + def _fix_ingest_type() -> PipelineType: + """Helper Function to Map between the Deprecated WorkflowType.INGESTION and the + correct PipelineType.""" + if source_type_name: + if source_type_name.endswith("lineage"): + return PipelineType.lineage + if source_type_name.endswith("usage"): + return PipelineType.usage + return PipelineType.metadata + + map_ = { + WorkflowType.INGEST: _fix_ingest_type(), + WorkflowType.PROFILE: PipelineType.profiler, + WorkflowType.TEST: PipelineType.TestSuite, + WorkflowType.LINEAGE: PipelineType.lineage, + WorkflowType.USAGE: PipelineType.usage, + WorkflowType.INSIGHT: PipelineType.dataInsight, + WorkflowType.APP: PipelineType.application, } - for key in sorted(tracker.state.state.keys()): - summary_table["Context"].append(key) - summary_table["Execution Time Aggregate"].append( - pretty_print_time_duration(tracker.state.state[key]) - ) - - log_ansi_encoded_string(bold=True, message="Execution Time Summary") - log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}") - - -def print_query_parsing_issues(): - """Log the QueryParsingFailures Summary.""" - query_failures = QueryParsingFailures() - - summary_table = { - "Query": [], - "Error": [], - } - - for failure in query_failures: - summary_table["Query"].append(failure.query) - summary_table["Error"].append(failure.error) - - if summary_table["Query"]: - log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary") - log_ansi_encoded_string( - message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}" - ) - - -def print_workflow_summary(workflow: "BaseWorkflow") -> None: - """ - Args: - workflow: the workflow status to be printed - - Returns: - Print Workflow status when the workflow logger level is DEBUG - """ - - if is_debug_enabled(workflow): - print_workflow_status_debug(workflow) - print_execution_time_summary() - print_query_parsing_issues() - - failures = [] - total_records = 0 - total_errors = 0 - for step in workflow.workflow_steps(): - step_summary = Summary.from_step(step) - total_records += step_summary.records - total_errors += step_summary.errors - failures.append(Failure(name=step.name, failures=step.get_status().failures)) - - log_ansi_encoded_string(bold=True, message=f"Workflow {step.name} Summary:") - log_ansi_encoded_string(message=f"Processed records: {step_summary.records}") - log_ansi_encoded_string( - message=f"Updated records: {step_summary.updated_records}" - ) - log_ansi_encoded_string(message=f"Warnings: {step_summary.warnings}") - if step_summary.filtered: - log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}") - log_ansi_encoded_string(message=f"Errors: {step_summary.errors}") - - print_failures_if_apply(failures) - - total_success = max(total_records, 1) - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message=f"Success %: " - f"{round(total_success * 100 / (total_success + total_errors), 2)}", - ) - - -def print_workflow_status_debug(workflow: "BaseWorkflow") -> None: - """Print the statuses from each workflow step""" - log_ansi_encoded_string(bold=True, message="Statuses detailed info:") - for step in workflow.workflow_steps(): - log_ansi_encoded_string(bold=True, message=f"{step.name} Status:") - log_ansi_encoded_string(message=step.get_status().as_string()) - - -def report_ingestion_status(logger: Logger, workflow: "BaseWorkflow") -> None: - """ - Given a logger, use it to INFO the workflow status - """ - try: - for step in workflow.workflow_steps(): - logger.info( - f"{step.name}: Processed {len(step.status.records)} records," - f" filtered {len(step.status.filtered)} records," - f" found {len(step.status.failures)} errors" - ) - - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error(f"Wild exception reporting status - {exc}") + return map_.get(workflow_type, PipelineType.metadata) diff --git a/ingestion/src/metadata/workflow/workflow_init_error_handler.py b/ingestion/src/metadata/workflow/workflow_init_error_handler.py new file mode 100644 index 000000000000..ff1cffa26e0f --- /dev/null +++ b/ingestion/src/metadata/workflow/workflow_init_error_handler.py @@ -0,0 +1,154 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module handles the init error messages from different workflows +""" +import traceback +from pathlib import Path +from typing import Any, Dict, Optional, Type, Union + +from metadata.config.common import ConfigurationError +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) +from metadata.ingestion.api.parser import ( + InvalidWorkflowException, + ParsingConfigurationError, +) +from metadata.utils.constants import UTF_8 +from metadata.utils.logger import ANSI, log_ansi_encoded_string + +EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" + + +URLS: Dict[PipelineType, str] = { + PipelineType.metadata: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata", + PipelineType.profiler: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler", + PipelineType.TestSuite: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality", + PipelineType.lineage: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage", + PipelineType.usage: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage", + PipelineType.dbt: "https://docs.open-metadata.org/connectors/ingestion/workflows/dbt", +} + + +DEFAULT_EXAMPLE_FILE: Dict[PipelineType, str] = { + PipelineType.metadata: "bigquery", + PipelineType.profiler: "bigquery_profiler", + PipelineType.TestSuite: "test_suite", + PipelineType.lineage: "bigquery_lineage", + PipelineType.usage: "bigquery_usage", +} + + +class WorkflowInitErrorHandler: + """Resonsible for handling the InitError flow, when a Workflow errors before even initializing.""" + + @staticmethod + def print_init_error( + exc: Union[Exception, Type[Exception]], + config: Dict[str, Any], + pipeline_type: PipelineType = PipelineType.metadata, + ): + """ + Print a workflow initialization error + """ + source_type_name = WorkflowInitErrorHandler._get_source_type_name(config) + + if isinstance( + exc, + (ParsingConfigurationError, ConfigurationError, InvalidWorkflowException), + ): + WorkflowInitErrorHandler._print_error_msg( + f"Error loading {pipeline_type.name} configuration: {exc}" + ) + WorkflowInitErrorHandler._print_file_example( + source_type_name, pipeline_type + ) + else: + WorkflowInitErrorHandler._print_error_msg( + f"\nError initializing {pipeline_type.name}: {exc}" + ) + WorkflowInitErrorHandler._print_error_msg(traceback.format_exc()) + + WorkflowInitErrorHandler._print_more_info(pipeline_type) + + @staticmethod + def _get_source_type_name(config: Dict[str, Any]) -> Optional[str]: + """Returns the Source Type Name based on the Configuration passed.""" + source_type_name = None + + if ( + config + and config.get("source", None) is not None + and config["source"].get("type", None) is not None + ): + source_type_name = config["source"].get("type") + source_type_name = source_type_name.replace("-", "-") + + return source_type_name + + @staticmethod + def _print_file_example( + source_type_name: Optional[str], pipeline_type: PipelineType + ): + """ + Print an example file for a given configuration + """ + if source_type_name is not None: + example_file = WorkflowInitErrorHandler._calculate_example_file( + source_type_name, pipeline_type + ) + example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" + if not example_path.exists(): + example_file = DEFAULT_EXAMPLE_FILE[pipeline_type] + example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" + log_ansi_encoded_string( + message=f"\nMake sure you are following the following format e.g. '{example_file}':" + ) + log_ansi_encoded_string(message="------------") + with open(example_path, encoding=UTF_8) as file: + log_ansi_encoded_string(message=file.read()) + log_ansi_encoded_string(message="------------") + + @staticmethod + def _calculate_example_file( + source_type_name: str, pipeline_type: PipelineType + ) -> str: + """ + Calculates the ingestion type depending on the source type name and workflow_type + """ + if pipeline_type == PipelineType.usage: + return f"{source_type_name}_usage" + if pipeline_type == PipelineType.lineage: + return f"{source_type_name}_lineage" + if pipeline_type == PipelineType.profiler: + return f"{source_type_name}_profiler" + if pipeline_type == PipelineType.TestSuite: + return DEFAULT_EXAMPLE_FILE[pipeline_type] + return source_type_name + + @staticmethod + def _print_more_info(pipeline_type: PipelineType) -> None: + """ + Print more information message + """ + log_ansi_encoded_string( + message=f"\nFor more information, please visit: {URLS[pipeline_type]}" + + "\nOr join us in Slack: https://slack.open-metadata.org/" + ) + + @staticmethod + def _print_error_msg(msg: str) -> None: + """ + Print message with error style + """ + log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index eaf0fe18c158..d1a2070e2a7e 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -14,127 +14,218 @@ """ import time -import traceback -from typing import Type, Union +from typing import Any, Dict, List, Optional, Type, Union -from metadata.config.common import ConfigurationError -from metadata.ingestion.api.parser import ( - InvalidWorkflowException, - ParsingConfigurationError, -) -from metadata.utils.constants import UTF_8 +from pydantic import BaseModel +from tabulate import tabulate + +from metadata.ingestion.api.status import TruncatedStackTraceError +from metadata.ingestion.api.step import Step, Summary +from metadata.ingestion.lineage.models import QueryParsingFailures +from metadata.utils.deprecation import deprecated +from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string from metadata.workflow.output_handler import ( - DEFAULT_EXAMPLE_FILE, - EXAMPLES_WORKFLOW_PATH, - WORKFLOW_FAILURE_MESSAGE, WorkflowType, - print_error_msg, - print_more_info, - print_workflow_summary, + workflow_type_to_pipeline_type, ) +from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler +from metadata.workflow.workflow_status_mixin import WorkflowResultStatus + +WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" +WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings" +WORKFLOW_SUCCESS_MESSAGE = "Workflow finished successfully" -def calculate_ingestion_type(source_type_name: str) -> WorkflowType: +class Failure(BaseModel): """ - Calculates the ingestion type depending on the source type name + Auxiliary class to print the error per status """ - if source_type_name.endswith("lineage"): - return WorkflowType.LINEAGE - if source_type_name.endswith("usage"): - return WorkflowType.USAGE - return WorkflowType.INGEST + name: str + failures: List[TruncatedStackTraceError] -def calculate_example_file(source_type_name: str, workflow_type: WorkflowType) -> str: - """ - Calculates the ingestion type depending on the source type name and workflow_type - """ - if workflow_type == WorkflowType.USAGE: - return f"{source_type_name}_usage" - if workflow_type == WorkflowType.LINEAGE: - return f"{source_type_name}_lineage" - if workflow_type == WorkflowType.PROFILE: - return f"{source_type_name}_profiler" - if workflow_type == WorkflowType.TEST: - return DEFAULT_EXAMPLE_FILE[workflow_type] - return source_type_name - - -def print_file_example(source_type_name: str, workflow_type: WorkflowType): - """ - Print an example file for a given configuration - """ - if source_type_name is not None: - example_file = calculate_example_file(source_type_name, workflow_type) - example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" - if not example_path.exists(): - example_file = DEFAULT_EXAMPLE_FILE[workflow_type] - example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" - log_ansi_encoded_string( - message=f"\nMake sure you are following the following format e.g. '{example_file}':" - ) - log_ansi_encoded_string(message="------------") - with open(example_path, encoding=UTF_8) as file: - log_ansi_encoded_string(message=file.read()) - log_ansi_encoded_string(message="------------") +@deprecated(message="Use 'workflow.print_status()' instead.", release="1.6") +def print_status( + workflow: "BaseWorkflow", # pyright: ignore[reportUndefinedVariable,reportUnknownParameterType] +): + workflow.print_status() # pyright: ignore[reportUnknownMemberType] + +@deprecated( + message=( + "Use 'WorkflowInitErrorHandler.print_init_error(exc, config, workflow_type)'" + " from 'metadata.workflow.workflow_init_error_handler'" + ), + release="1.6", +) def print_init_error( exc: Union[Exception, Type[Exception]], - config: dict, + config: Dict[str, Any], workflow_type: WorkflowType = WorkflowType.INGEST, -) -> None: - """ - Print a workflow initialization error - """ - source_type_name = None - if ( +): + # pylint: disable=W0212 + source_type_name = WorkflowInitErrorHandler._get_source_type_name( # pyright: ignore[reportPrivateUsage] config - and config.get("source", None) is not None - and config["source"].get("type", None) is not None - ): - source_type_name = config["source"].get("type") - source_type_name = source_type_name.replace("-", "-") - workflow_type = ( - calculate_ingestion_type(source_type_name) - if workflow_type == WorkflowType.INGEST - else workflow_type - ) - - if isinstance( - exc, (ParsingConfigurationError, ConfigurationError, InvalidWorkflowException) - ): - print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}") - print_file_example(source_type_name, workflow_type) - print_more_info(workflow_type) - else: - print_error_msg(f"\nError initializing {workflow_type.name}: {exc}") - print_error_msg(traceback.format_exc()) - print_more_info(workflow_type) - + ) + WorkflowInitErrorHandler.print_init_error( + exc, config, workflow_type_to_pipeline_type(workflow_type, source_type_name) + ) -def print_status(workflow: "IngestionWorkflow") -> None: - """ - Print the workflow results - """ - print_workflow_summary(workflow) +class WorkflowOutputHandler: + """Responsible for dealing with the Workflow Outputs""" - # Get the time to execute the first step - first_step = workflow.workflow_steps()[0] - if first_step.get_status().source_start_time: + def print_status( + self, + result_status: WorkflowResultStatus, + steps: List[Step], + start_time: Optional[Any] = None, + debug: bool = False, + ): + """ + Print the workflow results + """ + self.print_summary(steps, debug) + + if start_time: + log_ansi_encoded_string( + color=ANSI.BRIGHT_CYAN, + bold=True, + message="Workflow finished in time: " + + f"{pretty_print_time_duration(time.time() - start_time)}", + ) + + if result_status == WorkflowResultStatus.FAILURE: + log_ansi_encoded_string( + color=ANSI.BRIGHT_RED, + bold=True, + message=WORKFLOW_FAILURE_MESSAGE, + ) + + def print_summary(self, steps: List[Step], debug: bool = False): + """Prints the summary information for a Workflow Execution.""" + if debug: + self._print_debug_summary(steps) + self._print_execution_time_summary() + self._print_query_parsing_issues() + + self._print_summary(steps) + + def _print_summary(self, steps: List[Step]): + failures: List[Failure] = [] + total_records: int = 0 + total_errors: int = 0 + + for step in steps: + step_summary = Summary.from_step(step) + + total_records += step_summary.records or 0 + total_errors += step_summary.errors or 0 + failures.append( + Failure(name=step.name, failures=step.get_status().failures) + ) + + log_ansi_encoded_string(bold=True, message=f"Workflow {step.name} Summary:") + log_ansi_encoded_string( + message=f"Processed records: {step_summary.records}" + ) + log_ansi_encoded_string( + message=f"Updated records: {step_summary.updated_records}" + ) + log_ansi_encoded_string(message=f"Warnings: {step_summary.warnings}") + + if step_summary.filtered: + log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}") + + log_ansi_encoded_string(message=f"Errors: {step_summary.errors}") + + self._print_failures_if_apply(failures) + + total_success = max(total_records, 1) log_ansi_encoded_string( color=ANSI.BRIGHT_CYAN, bold=True, - message="Workflow finished in time: " - f"{pretty_print_time_duration(time.time()-first_step.get_status().source_start_time)}", + message="Success %: " + + f"{round(total_success * 100 / (total_success + total_errors), 2)}", ) - if workflow.result_status() == 1: - log_ansi_encoded_string( - color=ANSI.BRIGHT_RED, - bold=True, - message=WORKFLOW_FAILURE_MESSAGE, - ) + def _print_debug_summary(self, steps: List[Step]): + log_ansi_encoded_string(bold=True, message="Statuses detailed info:") + + for step in steps: + log_ansi_encoded_string(bold=True, message=f"{step.name} Status:") + log_ansi_encoded_string(message=step.get_status().as_string()) + + def _print_execution_time_summary(self): + """Log the ExecutionTimeTracker Summary.""" + tracker = ExecutionTimeTracker() + + summary_table: Dict[str, List[Union[str, float]]] = { + "Context": [], + "Execution Time Aggregate": [], + } + + for key in sorted(tracker.state.state.keys()): + summary_table["Context"].append(key) + summary_table["Execution Time Aggregate"].append( + pretty_print_time_duration(tracker.state.state[key]) + ) + + log_ansi_encoded_string(bold=True, message="Execution Time Summary") + log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}") + + def _print_query_parsing_issues(self): + """Log the QueryParsingFailures Summary.""" + query_failures = QueryParsingFailures() + + summary_table: Dict[str, List[Optional[str]]] = { + "Query": [], + "Error": [], + } + + for failure in query_failures: + summary_table["Query"].append(failure.query) + summary_table["Error"].append(failure.error) + + if summary_table["Query"]: + log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary") + log_ansi_encoded_string( + message=f"\n{tabulate(summary_table, tablefmt='grid', headers=list(summary_table.keys()))}" + ) + + def _get_failures(self, failure: Failure) -> List[Dict[str, Optional[str]]]: + return [ + { + "From": failure.name, + "Entity Name": f.name, + "Message": f.error, + "Stack Trace": f.stackTrace, + } + for f in failure.failures + ] + + def _print_failures_if_apply(self, failures: List[Failure]) -> None: + # take only the ones that contain failures + failures = [f for f in failures if f.failures] + if failures: + # create a list of dictionaries' list + all_data = [self._get_failures(failure) for failure in failures] + # create a single of dictionaries + data = [f for fs in all_data for f in fs] + # create a dictionary with a key and a list of values from the list + error_table = {k: [dic[k] for dic in data] for k in data[0]} + if len(list(error_table.items())[0][1]) > 100: + log_ansi_encoded_string( + bold=True, message="Showing only the first 100 failures:" + ) + # truncate list if number of values are over 100 + error_table = {k: v[:100] for k, v in error_table.items()} + else: + log_ansi_encoded_string(bold=True, message="List of failures:") + + log_ansi_encoded_string( + message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}" + ) diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index 9772857a1a8a..e648ed00d439 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -14,6 +14,7 @@ import traceback import uuid from datetime import datetime +from enum import Enum from typing import Optional, Tuple from metadata.config.common import WorkflowExecutionError @@ -39,6 +40,11 @@ SUCCESS_THRESHOLD_VALUE = 90 +class WorkflowResultStatus(Enum): + SUCCESS = 0 + FAILURE = 1 + + class WorkflowStatusMixin: """ Helper methods to manage IngestionPipeline status @@ -127,13 +133,13 @@ def raise_from_status(self, raise_warnings=False): self.set_ingestion_pipeline_status(PipelineState.failed) raise err - def result_status(self) -> int: + def result_status(self) -> WorkflowResultStatus: """ Returns 1 if source status is failed, 0 otherwise. """ if self.get_failures(): - return 1 - return 0 + return WorkflowResultStatus.FAILURE + return WorkflowResultStatus.SUCCESS def build_ingestion_status(self) -> Optional[IngestionStatus]: """ diff --git a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py index 62c727794e02..6a8529f74899 100644 --- a/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py +++ b/ingestion/tests/integration/great_expectations/test_great_expectation_integration.py @@ -38,7 +38,6 @@ get_end_of_day_timestamp_mill, ) from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status Base = declarative_base() @@ -149,7 +148,7 @@ def setUpClass(cls): ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() @classmethod diff --git a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py index 06227197698d..17349416ac6e 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py @@ -61,7 +61,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent BIGQUERY_CONFIG_FILE = "cli_e2e/database/bigquery/bigquery.yaml" @@ -143,7 +142,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(cls.config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() # get table fqn @@ -168,7 +167,7 @@ def test_bigquery_system_metrics(self): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py index 734b588e88f5..64779d1f5ca9 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py @@ -47,7 +47,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent REDSHIFT_CONFIG_FILE = "cli_e2e/database/redshift/redshift.yaml" @@ -110,7 +109,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(cls.config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() # get table fqn @@ -135,7 +134,7 @@ def test_redshift_system_metrics(self): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py index d64a08a065d7..20ef0e47502b 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py @@ -23,12 +23,12 @@ We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema. query example: ``` - INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES + INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES (1, 'FOO'), (2, 'BAR'), (3, 'BAZZ') - INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES + INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES (4, 'FOOBAR'), (5, 'FOOBAZZ'), (6, 'BARBAZZ') @@ -66,7 +66,6 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent SNOWFLAKE_CONFIG_FILE = "cli_e2e/database/snowflake/snowflake.yaml" @@ -134,7 +133,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(cls.config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() # get table fqn @@ -159,7 +158,7 @@ def test_snowflake_system_metrics(self): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py index b7d58cb09b88..7106be330ee0 100644 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py @@ -41,7 +41,7 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status +from metadata.workflow.workflow_output_handler import WorkflowResultStatus SERVICE_NAME = Path(__file__).stem REGION = "us-west-1" @@ -139,7 +139,7 @@ def setUp(self) -> None: ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() def test_datalake_profiler_workflow(self): @@ -159,7 +159,7 @@ def test_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table_profile = self.metadata.get_profile_data( f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', @@ -207,7 +207,7 @@ def test_values_partitioned_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -252,7 +252,7 @@ def test_datetime_partitioned_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -298,7 +298,7 @@ def test_integer_range_partitioned_datalake_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -355,7 +355,7 @@ def test_datalake_profiler_workflow_with_custom_profiler_config(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index b1b5001a3e69..21d08bd95cbb 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -44,7 +44,7 @@ ) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status +from metadata.workflow.workflow_output_handler import WorkflowResultStatus logging.basicConfig(level=logging.WARN) logger = logging.getLogger(__name__) @@ -185,7 +185,7 @@ def setUpClass(cls) -> None: ingestion_workflow = MetadataWorkflow.create(ingestion_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() @classmethod @@ -253,7 +253,7 @@ def test_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -278,7 +278,7 @@ def test_profiler_workflow(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -309,7 +309,7 @@ def test_workflow_sample_profile(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -361,7 +361,7 @@ def test_workflow_datetime_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -400,7 +400,7 @@ def test_workflow_datetime_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -448,7 +448,7 @@ def test_workflow_integer_range_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -488,7 +488,7 @@ def test_workflow_integer_range_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -535,7 +535,7 @@ def test_workflow_values_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -575,7 +575,7 @@ def test_workflow_values_partition(self): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - print_status(profiler_workflow) + profiler_workflow.print_status() profiler_workflow.stop() table = self.metadata.get_by_name( @@ -633,7 +633,7 @@ def test_datalake_profiler_workflow_with_custom_profiler_config(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, @@ -726,7 +726,7 @@ def test_sample_data_ingestion(self): status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS table = self.metadata.get_by_name( entity=Table, diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index 7a428cbd08cc..08cea47bc782 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -45,7 +45,7 @@ from metadata.utils.time_utils import get_end_of_day_timestamp_mill from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status +from metadata.workflow.workflow_output_handler import WorkflowResultStatus SERVICE_NAME = Path(__file__).stem @@ -144,7 +144,7 @@ def setUpClass(cls) -> None: ) ingestion_workflow.execute() ingestion_workflow.raise_from_status() - print_status(ingestion_workflow) + ingestion_workflow.print_status() ingestion_workflow.stop() @classmethod @@ -176,7 +176,7 @@ def run_profiler_workflow(self, config): profiler_workflow.execute() status = profiler_workflow.result_status() profiler_workflow.stop() - assert status == 0 + assert status == WorkflowResultStatus.SUCCESS def test_simple(self): workflow_config = deepcopy(self.ingestion_config) diff --git a/ingestion/tests/integration/s3/conftest.py b/ingestion/tests/integration/s3/conftest.py index 615d742d7297..2c96e1b1cba9 100644 --- a/ingestion/tests/integration/s3/conftest.py +++ b/ingestion/tests/integration/s3/conftest.py @@ -20,7 +20,6 @@ from _openmetadata_testutils.ometa import OM_JWT, int_admin_ometa from metadata.generated.schema.entity.services.storageService import StorageService from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status from ..containers import MinioContainerConfigs, get_minio_container @@ -112,7 +111,7 @@ def ingest_s3_storage(minio, metadata, service_name, create_data): workflow = MetadataWorkflow.create(yaml.safe_load(config)) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() yield diff --git a/ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py b/ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py new file mode 100644 index 000000000000..a5639218639e --- /dev/null +++ b/ingestion/tests/unit/workflow/test_deprecated_workflow_functions.py @@ -0,0 +1,27 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Validate the deprecated functions still work. +""" +from metadata.workflow.workflow_output_handler import print_init_error, print_status + +from .test_base_workflow import SimpleWorkflow, config + + +# TODO: remove after the print_status and print_init_error functions are removed in Release 1.6 +class TestDeprecatedSimpleWorkflow: + def test_workflow_print_status(self): + workflow = SimpleWorkflow(config=config) + workflow.execute() + print_status(workflow) + + def test_workflow_print_init_error(self): + print_init_error(Exception(), config.model_dump()) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py index 2e344a064441..17a6c9e13751 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py @@ -34,7 +34,6 @@ ApplicationPipeline, ) from metadata.workflow.application import ApplicationWorkflow -from metadata.workflow.application_output_handler import print_status def application_workflow(workflow_config: OpenMetadataApplicationConfig): @@ -54,7 +53,7 @@ def application_workflow(workflow_config: OpenMetadataApplicationConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index e92b3f786b55..7b219873b69d 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -36,7 +36,6 @@ from metadata.generated.schema.type.basic import Timestamp, Uuid from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from metadata.workflow.workflow_output_handler import print_status # pylint: disable=ungrouped-imports try: @@ -207,7 +206,7 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index d180e0896259..8fa9a40ed012 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -42,7 +42,6 @@ ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import print_status def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -63,7 +62,7 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 891460368298..1053912be158 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -28,7 +28,6 @@ WorkflowConfig, ) from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -48,7 +47,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 38263010e92b..8ed7d76f0ef8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -28,7 +28,6 @@ WorkflowConfig, ) from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import print_status def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -48,7 +47,7 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py index e4094f2d02e0..27d0292a26f8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py @@ -32,7 +32,6 @@ Stage, ) from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import print_status def usage_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -52,7 +51,7 @@ def usage_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md b/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md index 2182296b5769..3e397e7d78fa 100644 --- a/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md +++ b/openmetadata-docs/content/partials/v1.5/deployment/run-connectors-class.md @@ -15,7 +15,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md index 25b6430a23e3..b042a842d22e 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/deployment/index.md @@ -327,7 +327,7 @@ we just need the following few lines of Python code: ```python from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + config = """ @@ -337,7 +337,7 @@ workflow_config = yaml.safe_load(config) workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() -print_status(workflow) +workflow.print_status() workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md index fbd565104d5a..c676cbb29b77 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/pipeline/airflow/gcs-composer.md @@ -65,7 +65,7 @@ from airflow.utils.dates import days_ago from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + default_args = { "owner": "user_name", @@ -108,7 +108,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md index 4f309f3f66bf..bbc7fffd5f90 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/airflow.md @@ -54,7 +54,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + from airflow.utils.dates import days_ago @@ -76,7 +76,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( @@ -225,7 +225,7 @@ default_args = { def metadata_ingestion_workflow(): from metadata.workflow.metadata import MetadataWorkflow - from metadata.workflow.workflow_output_handler import print_status + import yaml config = """ ... @@ -235,7 +235,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md index 040fc5a2a93a..07e76f2bb116 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/credentials.md @@ -163,7 +163,7 @@ import yaml from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + CONFIG = f""" source: @@ -182,7 +182,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() @@ -286,7 +286,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + # Import the hook from airflow.hooks.base import BaseHook @@ -326,7 +326,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( @@ -363,7 +363,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + config = """ source: @@ -388,7 +388,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md index f869d8a73a48..f181e83ab610 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/gcs-composer.md @@ -52,7 +52,7 @@ from airflow.utils.dates import days_ago from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + default_args = { "owner": "user_name", @@ -73,7 +73,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md index a048b4e98649..17741a2e9202 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/github-actions.md @@ -38,7 +38,7 @@ import yaml from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + CONFIG = f""" source: @@ -57,7 +57,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md index a5b2961aebe8..b7d659c2edaa 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/index.md @@ -23,7 +23,7 @@ ingestion from within a simple Python script: ```python from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + # Specify your YAML configuration CONFIG = """ @@ -42,7 +42,7 @@ def run(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() @@ -353,7 +353,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -392,7 +392,7 @@ def run(): workflow = MetadataWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -464,7 +464,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -499,7 +499,7 @@ def run(): workflow = MetadataWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -570,7 +570,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.usage import UsageWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -613,7 +613,7 @@ def run(): workflow = UsageWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -686,7 +686,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -723,7 +723,7 @@ def run(): workflow = ProfilerWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` @@ -800,7 +800,7 @@ import yaml ```python {% srNumber=1 %} from metadata.workflow.data_quality import TestSuiteWorkflow -from metadata.workflow.workflow_output_handler import print_status + ``` @@ -836,7 +836,7 @@ def run(): workflow = TestSuiteWorkflow.create(CONFIG) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md index acff0d0cd0a6..63f00344c06b 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/ingestion/external/mwaa.md @@ -54,7 +54,7 @@ except ModuleNotFoundError: from airflow.utils.dates import days_ago from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + default_args = { "retries": 3, @@ -71,7 +71,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( @@ -392,7 +392,7 @@ default_args = { def metadata_ingestion_workflow(): from metadata.workflow.metadata import MetadataWorkflow - from metadata.workflow.workflow_output_handler import print_status + import yaml @@ -403,7 +403,7 @@ YAML config workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md index c4d742d006b5..f34b6430d36e 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/deployment/upgrade/versions/110-to-120.md @@ -230,12 +230,12 @@ We have reorganized the structure of the `Workflow` classes, which requires upda The `Workflow` class that you import can then be called as follows: ```python -from metadata.workflow.workflow_output_handler import print_status + workflow = workflow_class.create(workflow_config) workflow.execute() workflow.raise_from_status() -print_status(workflow) # This method has been updated. Before it was `workflow.print_status()` +workflow.print_status() # This method has been updated. Before it was `workflow.print_status()` workflow.stop() ``` diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md index c1c0a36442eb..309a3ca10b11 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/airflow-sdk.md @@ -70,7 +70,7 @@ import yaml from datetime import timedelta from airflow import DAG from metadata.workflow.data_insight import DataInsightWorkflow -from metadata.workflow.workflow_output_handler import print_status + try: from airflow.operators.python import PythonOperator @@ -98,7 +98,7 @@ def metadata_ingestion_workflow(): workflow = DataInsightWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md index 8cbb29d0a6a8..1f139292e7ea 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/how-to-guides/data-insights/elasticsearch-reindex.md @@ -50,7 +50,7 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.workflow_output_handler import print_status + from airflow.utils.dates import days_ago default_args = { @@ -71,7 +71,7 @@ def metadata_ingestion_workflow(): workflow = MetadataWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop() with DAG( diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md index 963ed15fe6cd..21aa5e9021c2 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/quality-and-observability/profiler/external_workflow.md @@ -169,7 +169,7 @@ If you'd rather have a Python script taking care of the execution, you can use: ```python from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_status + # Specify your YAML configuration CONFIG = """ @@ -188,7 +188,7 @@ def run(): workflow = ProfilerWorkflow.create(workflow_config) workflow.execute() workflow.raise_from_status() - print_status(workflow) + workflow.print_status() workflow.stop()