Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Refactor output_handlers to a WorkflowOutputHandler class #17149

Merged
merged 9 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import yaml
from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -63,7 +61,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import yaml
from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -72,7 +70,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
4 changes: 1 addition & 3 deletions ingestion/examples/airflow/dags/airflow_sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import yaml
from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -62,7 +60,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
4 changes: 1 addition & 3 deletions ingestion/examples/airflow/dags/airflow_sample_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

from airflow import DAG

from metadata.workflow.workflow_output_handler import print_status

try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -88,7 +86,7 @@ def metadata_ingestion_workflow():
workflow = UsageWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
3 changes: 1 addition & 2 deletions ingestion/operators/docker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import print_status

WORKFLOW_MAP = {
PipelineType.metadata.value: MetadataWorkflow,
Expand Down Expand Up @@ -109,7 +108,7 @@ def main():
workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()


Expand Down
11 changes: 10 additions & 1 deletion ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,17 @@ ignore = [
"src/metadata/readers/*",
"src/metadata/timer/*",
"src/metadata/utils/*",
"src/metadata/workflow/*",
"src/metadata/workflow/base.py",
"src/metadata/workflow/application.py",
"src/metadata/workflow/data_insight.py",
"src/metadata/workflow/data_quality.py",
"src/metadata/workflow/ingestion.py",
"src/metadata/workflow/metadata.py",
"src/metadata/workflow/profiler.py",
"src/metadata/workflow/usage.py",
"src/metadata/workflow/workflow_status_mixin.py",
]

reportDeprecated = false
reportMissingTypeStubs = false
reportAny = false
3 changes: 1 addition & 2 deletions ingestion/src/metadata/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.application import ApplicationWorkflow
from metadata.workflow.application_output_handler import print_status

logger = cli_logger()

Expand All @@ -42,5 +41,5 @@ def run_app(config_path: Path) -> None:

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
11 changes: 6 additions & 5 deletions ingestion/src/metadata/cli/dataquality.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import (
from metadata.workflow.workflow_init_error_handler import (
WorkflowInitErrorHandler,
WorkflowType,
print_init_error,
print_status,
)

logger = cli_logger()
Expand All @@ -42,10 +41,12 @@ def run_test(config_path: Path) -> None:
workflow = TestSuiteWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.TEST)
WorkflowInitErrorHandler.print_init_error(
exc, workflow_config_dict, WorkflowType.TEST
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
9 changes: 4 additions & 5 deletions ingestion/src/metadata/cli/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import (
from metadata.workflow.workflow_init_error_handler import (
WorkflowInitErrorHandler,
WorkflowType,
print_init_error,
print_status,
)

logger = cli_logger()
Expand All @@ -42,10 +41,10 @@ def run_ingest(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
11 changes: 6 additions & 5 deletions ingestion/src/metadata/cli/insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import (
from metadata.workflow.workflow_init_error_handler import (
WorkflowInitErrorHandler,
WorkflowType,
print_init_error,
print_status,
)

logger = cli_logger()
Expand All @@ -42,10 +41,12 @@ def run_insight(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INSIGHT)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, WorkflowType.INSIGHT
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
7 changes: 5 additions & 2 deletions ingestion/src/metadata/cli/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.logger import cli_logger
from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error
from metadata.workflow.workflow_init_error_handler import (
WorkflowInitErrorHandler,
WorkflowType,
)

logger = cli_logger()

Expand Down Expand Up @@ -53,7 +56,7 @@ def run_lineage(config_path: Path) -> None:

except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST)
sys.exit(1)

if workflow.filePath:
Expand Down
11 changes: 6 additions & 5 deletions ingestion/src/metadata/cli/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import (
from metadata.workflow.workflow_init_error_handler import (
WorkflowInitErrorHandler,
WorkflowType,
print_init_error,
print_status,
)

logger = cli_logger()
Expand All @@ -42,10 +41,12 @@ def run_profiler(config_path: Path) -> None:
workflow = ProfilerWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE)
WorkflowInitErrorHandler.print_init_error(
exc, workflow_config_dict, WorkflowType.PROFILE
)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
9 changes: 4 additions & 5 deletions ingestion/src/metadata/cli/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import (
from metadata.workflow.workflow_init_error_handler import (
WorkflowInitErrorHandler,
WorkflowType,
print_init_error,
print_status,
)

logger = cli_logger()
Expand All @@ -42,10 +41,10 @@ def run_usage(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(exc, config_dict, WorkflowType.INGEST)
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()
6 changes: 3 additions & 3 deletions ingestion/src/metadata/ingestion/ometa/client_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
OpenMetadataConnection,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.ometa_api import C, OpenMetadata, T
from metadata.utils import fqn
from metadata.utils.logger import ometa_logger

Expand All @@ -28,7 +28,7 @@

def create_ometa_client(
metadata_config: OpenMetadataConnection,
) -> OpenMetadata:
) -> OpenMetadata[T, C]: # pyright: ignore[reportInvalidTypeVarUse]
IceS2 marked this conversation as resolved.
Show resolved Hide resolved
"""Create an OpenMetadata client

Args:
Expand All @@ -38,7 +38,7 @@ def create_ometa_client(
OpenMetadata: an OM client
"""
try:
metadata = OpenMetadata(metadata_config)
metadata = OpenMetadata[T, C](metadata_config)
metadata.health_check()
return metadata
except Exception as exc:
Expand Down
7 changes: 4 additions & 3 deletions ingestion/src/metadata/utils/class_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig

SERVICE_TYPE_REF = {
ServiceType.Database.value: "databaseService",
Expand Down Expand Up @@ -102,14 +103,14 @@ def _clean(source_type: str):
return source_type


def get_pipeline_type_from_source_config(source_config_type) -> PipelineType:
def get_pipeline_type_from_source_config(source_config: SourceConfig) -> PipelineType:
"""From the YAML serviceType, get the Ingestion Pipeline Type"""
pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get(
source_config_type.__class__.__name__
source_config.config.__class__.__name__
)
if not pipeline_type:
raise ValueError(
f"Cannot find Pipeline Type for SourceConfig {source_config_type}"
f"Cannot find Pipeline Type for SourceConfig {source_config.config}"
)
return pipeline_type

Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/utils/execution_time_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from copy import deepcopy
from functools import wraps
from time import perf_counter
from typing import List, Optional
from typing import Dict, List, Optional

from pydantic import BaseModel

Expand Down Expand Up @@ -81,7 +81,7 @@ class ExecutionTimeTrackerState(metaclass=Singleton):

def __init__(self):
"""Initializes the state and the lock."""
self.state = {}
self.state: Dict[str, float] = {}
self.lock = threading.Lock()

def add(self, context: ExecutionTimeTrackerContext, elapsed: float):
Expand Down
25 changes: 6 additions & 19 deletions ingestion/src/metadata/workflow/application_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module handles the output messages for applications
Generic Workflow entrypoint to execute Applications
"""
import time

from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
from metadata.workflow.output_handler import print_workflow_summary

from metadata.utils.deprecation import deprecated
from metadata.workflow.base import BaseWorkflow

def print_status(workflow: "ApplicationWorkflow") -> None:
"""
Print the workflow results
"""
print_workflow_summary(workflow)

if workflow.runner.get_status().source_start_time:
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-workflow.runner.get_status().source_start_time)}",
)
@deprecated(message="Use 'workflow.print_status()' instead.", release="1.8")
def print_status(workflow: BaseWorkflow):
workflow.print_status()
IceS2 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading