diff --git a/ingestion/src/metadata/cli/insight.py b/ingestion/src/metadata/cli/insight.py index ff87bd045a4c..2ed508d594df 100644 --- a/ingestion/src/metadata/cli/insight.py +++ b/ingestion/src/metadata/cli/insight.py @@ -21,8 +21,8 @@ from metadata.workflow.data_insight import DataInsightWorkflow from metadata.workflow.workflow_output_handler import ( WorkflowType, - print_data_insight_status, print_init_error, + print_status, ) logger = cli_logger() @@ -48,5 +48,5 @@ def run_insight(config_path: str) -> None: workflow.execute() workflow.stop() - print_data_insight_status(workflow) + print_status(workflow) workflow.raise_from_status() diff --git a/ingestion/src/metadata/data_insight/processor/reports/data_processor.py b/ingestion/src/metadata/data_insight/processor/reports/data_processor.py index 8dbd29a6da9d..eb869844bf45 100644 --- a/ingestion/src/metadata/data_insight/processor/reports/data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/reports/data_processor.py @@ -20,7 +20,6 @@ from typing import Callable, Iterable, Optional from metadata.generated.schema.analytics.reportData import ReportData -from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.api.status import Status from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -68,5 +67,5 @@ def yield_refined_data(self) -> Iterable[ReportData]: raise NotImplementedError @abc.abstractmethod - def get_status(self) -> ProcessorStatus: + def get_status(self) -> Status: raise NotImplementedError diff --git a/ingestion/src/metadata/data_insight/processor/reports/web_analytic_report_data_processor.py b/ingestion/src/metadata/data_insight/processor/reports/web_analytic_report_data_processor.py index 7e8d49cbac20..12c288a3e2c8 100644 --- a/ingestion/src/metadata/data_insight/processor/reports/web_analytic_report_data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/reports/web_analytic_report_data_processor.py @@ -41,7 +41,6 @@ topic, ) from metadata.generated.schema.entity.teams.user import User -from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.api.status import Status from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.helpers import get_entity_tier_from_tags @@ -370,5 +369,5 @@ def refine(self, entity: WebAnalyticEventData) -> None: """Refine data""" self._refined_data = self.refine_user_event.send(entity) - def get_status(self) -> ProcessorStatus: + def get_status(self) -> Status: return self.processor_status diff --git a/ingestion/src/metadata/ingestion/api/processor.py b/ingestion/src/metadata/ingestion/api/processor.py deleted file mode 100644 index e1703507abf4..000000000000 --- a/ingestion/src/metadata/ingestion/api/processor.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Abstract Processor definition to build a Workflow -""" -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from typing import Any, Generic, List, Optional - -from pydantic import Field - -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.closeable import Closeable -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.status import Status -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -class ProcessorStatus(Status): - records: List[str] = Field(default_factory=list) - - def processed(self, record: Any): - self.records.append(record) - - # disabling pylint until we remove this - def warning(self, info: Any) -> None: # pylint: disable=W0221 - self.warnings.append(info) - - -class ProfilerProcessorStatus(Status): - entity: Optional[str] = None - - def scanned(self, record: Any) -> None: - self.records.append(record) - - def failed_profiler(self, error: str, stack_trace: Optional[str] = None) -> None: - self.failed( - StackTraceError( - name=self.entity if self.entity else "", - error=error, - stack_trace=stack_trace, - ) - ) - - -@dataclass -class Processor(Closeable, Generic[Entity], metaclass=ABCMeta): - """ - Processor class - """ - - status: ProcessorStatus - - def __init__(self): - self.status = ProcessorStatus() - - @classmethod - @abstractmethod - def create( - cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs - ) -> "Processor": - pass - - @abstractmethod - def process(self, *args, **kwargs) -> Entity: - pass - - def get_status(self) -> ProcessorStatus: - return self.status - - @abstractmethod - def close(self) -> None: - pass diff --git a/ingestion/src/metadata/ingestion/api/sink.py b/ingestion/src/metadata/ingestion/api/sink.py deleted file mode 100644 index 6fe0cbcd9e2c..000000000000 --- a/ingestion/src/metadata/ingestion/api/sink.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Abstract Sink definition to build a Workflow -""" -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from typing import Any, Generic, List - -from pydantic import Field - -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.closeable import Closeable -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.status import Status - - -class SinkStatus(Status): - records: List[str] = Field(default_factory=list) - - def records_written(self, record: str) -> None: - self.records.append(record) - - # Disable pylint until this is removed - def warning(self, info: Any) -> None: # pylint: disable=W0221 - self.warnings.append(info) - - -@dataclass # type: ignore[misc] -class Sink(Closeable, Generic[Entity], metaclass=ABCMeta): - """All Sinks must inherit this base class.""" - - status: SinkStatus - - def __init__(self): - self.status = SinkStatus() - - @classmethod - @abstractmethod - def create( - cls, config_dict: dict, metadata_config: OpenMetadataConnection - ) -> "Sink": - pass - - @abstractmethod - def write_record(self, record: Entity) -> None: - # must call callback when done. - pass - - def get_status(self) -> SinkStatus: - return self.status - - @abstractmethod - def close(self) -> None: - pass diff --git a/ingestion/src/metadata/ingestion/api/stage.py b/ingestion/src/metadata/ingestion/api/stage.py deleted file mode 100644 index 65234b3a0311..000000000000 --- a/ingestion/src/metadata/ingestion/api/stage.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Abstract Stage definition to build a Workflow -""" -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from typing import Generic - -from metadata.ingestion.api.closeable import Closeable -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.status import Status - - -class StageStatus(Status): - pass - - -@dataclass # type: ignore[misc] -class Stage(Closeable, Generic[Entity], metaclass=ABCMeta): - """ - Stage class - """ - - status: StageStatus - - def __init__(self): - self.status = StageStatus() - - @classmethod - @abstractmethod - def create(cls, config_dict: dict, metadata_config: dict) -> "Stage": - pass - - @abstractmethod - def stage_record(self, record: Entity): - pass - - def get_status(self) -> StageStatus: - return self.status - - @abstractmethod - def close(self) -> None: - pass diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface.py b/ingestion/src/metadata/profiler/interface/profiler_interface.py index b55e8c45b8c7..b7c1755c98ee 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface.py @@ -15,7 +15,7 @@ """ from abc import ABC, abstractmethod -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from sqlalchemy import Column from typing_extensions import Self @@ -32,7 +32,8 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) -from metadata.ingestion.api.processor import ProfilerProcessorStatus +from metadata.ingestion.api.models import StackTraceError +from metadata.ingestion.api.status import Status from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.profiler.api.models import ProfileSampleConfig, TableConfig @@ -42,6 +43,24 @@ from metadata.utils.partition import get_partition_details +class ProfilerProcessorStatus(Status): + """Keep track of the entity being processed""" + + entity: Optional[str] = None + + def scanned(self, record: Any) -> None: + self.records.append(record) + + def failed_profiler(self, error: str, stack_trace: Optional[str] = None) -> None: + self.failed( + StackTraceError( + name=self.entity if self.entity else "", + error=error, + stack_trace=stack_trace, + ) + ) + + class ProfilerInterface(ABC): """Protocol interface for the profiler processor""" diff --git a/ingestion/src/metadata/utils/importer.py b/ingestion/src/metadata/utils/importer.py index 008c303404f1..da3c5ecbf16e 100644 --- a/ingestion/src/metadata/utils/importer.py +++ b/ingestion/src/metadata/utils/importer.py @@ -23,10 +23,8 @@ ) from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink -from metadata.ingestion.api.processor import Processor -from metadata.ingestion.api.stage import Stage from metadata.ingestion.api.step import Step -from metadata.ingestion.api.steps import BulkSink, Sink, Source +from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage from metadata.utils.class_helper import get_service_type_from_source_type from metadata.utils.logger import utils_logger diff --git a/ingestion/src/metadata/workflow/README.md b/ingestion/src/metadata/workflow/README.md new file mode 100644 index 000000000000..24feccab97b5 --- /dev/null +++ b/ingestion/src/metadata/workflow/README.md @@ -0,0 +1,135 @@ +# Base Workflow + +The goal of the `BaseWorkflow` is to define a unique class that that controls the logic flow of executions. This means: +- Having a consensus on how our executions are organized (steps) +- Centralizing in a single place all the exception management. We don't want individual - and wrongly uncaught - exceptions + to blow up the full executions. +- Centralizing the `Status` handling: how do we report processed assets & failures and send them back to the `IngestionPipeline`. + +## Steps + +Each `Workflow` can be built by using `Steps` as lego pieces. Each of these pieces - steps - are a generic abstraction +on which operations we can expect to happen inside. Currently, the `BaseWorkflow` accepts any number of sequential `Steps`, +each of them taking care of a specific part of the business logic. + +![base-workflow.steps.drawio.png](../../../../openmetadata-docs/images/readme/ingestion/base-workflow.steps.drawio.png) + +We mainly have four types of steps, iterative steps and return steps: + +1. `IterStep`s are in charge of starting the workflow. They will read data from the external world and `yield` the elements + that need to be processed down the pipeline. +2. `ReturnStep`s accept one input, which they will further process and return one output. +3. `StageStep`s accept one input, and they will write - stage - them somewhere. They are expected to be used together with the `BulkStep`. +4. `BulkStep`s iterate over an input - produced by the `StageStep` - and will return nothing. + +These names might be explanatory, but harder to imagine/read. Therefore, we have specific classes based on these steps +that help us discuss better on the Workflow structure: + +1. `IterStep` -> `Source` +2. `ReturnStep` -> `Processor` & `Sink` +3. `StageStep` -> `Step` +4. `BulkStep` -> `BulkSink` + +When developing each of this steps, we'll just need to implement their execution method (either `_iter` or `_run`), where in +the `IterStep` the method is expected to `yield` results, and the rest to `return`. + +We'll explain specific examples of these `Step`s in the next section. + +## Workflows + +Now that we have our pieces, we can define the `Workflow` structures. While the `Steps` could be joined together +somewhat arbitrarily, there are specific recipes that we follow depending on our goals. + +Each `Workflow` then can be build by defining its steps (starting with a `Source`, adding `Processor`s, etc.) and +registering the steps in the `BaseWorkflow.set_steps` method. + +The `BaseWorkflow` will take care of common logic, such as initializing the `metadata` object, the `timer` logger and +sending the status to the `IngestionPipeline` when needed. + +A couple of examples: + +### Metadata Ingestion + +Here we have two steps: +- `Source`: that will list the metadata of the origin (Dashboards, Tables, Pipelines,...), and translate them to the OpenMetadata + standard. +- `REST Sink`: that will pick up the Create Requests of the above entities and send them to the OpenMetadata server. + +What does the workflow do here? Group together the steps and streamline the execution. The workflow itself is the one +that will know how to get each of the elements produced on the `Source` and pass them to the `Sink`. + +### Profiler Ingestion + +In this case we have 4 steps: +- `Source`: that will pick up the tables from the OpenMetadata API that need to be profiled. +- `Profiler Processor`: to execute the metrics and gather the results for each table. +- `PII Processor`: that will get the result of the profiler, and add any classification that needs to be applied to the tables using NLP models. +- `REST Sink`: to send the results to the OpenMetadata API. + +Here again, the `Workflow` class will move the elements from `Source` -> `Profiler Processor` -> `PII processor` -> `REST Sink`. + +## Status & Exceptions + +While the `Workflow` controls the execution flow, the most important part is in terms of status handling & exception management. + +### Status + +Each `Step` has its own `Status`, storing what has been processed and what has failed. The overall `Workflow` Status is based +on the statuses of the individual steps. + +### Exceptions + +To ensure that all the exception are caught, each `Step` executes its `run` methods of inside a `try/catch` block. It will +only blow things up if we encounter a `WorkflowFatalError`. Any other exception will just be logged. + +However, how do we want to handle exceptions that can happen in every different component? By treating exceptions as data. + +Each `Step` will `yield` or `return` an `Either` object, meaning that processing a single element can either be `right` - +and contain the expected results - or `left` - and contain the raised exception. + +This consensus helps us ensure that we are keeping notes of the logged exceptions in the `Status` of each `Step`, so that +all the errors can properly be logged at the end of the execution. + +For example, this is the `run` method of the `IterStep`: + +```python +def run(self) -> Iterable[Optional[Entity]]: + """ + Run the step and handle the status and exceptions + + Note that we are overwriting the default run implementation + in order to create a generator with `yield`. + """ + try: + for result in self._iter(): + if result.left is not None: + self.status.failed(result.left) + yield None + + if result.right is not None: + self.status.scanned(result.right) + yield result.right + except WorkflowFatalError as err: + logger.error(f"Fatal error running step [{self}]: [{err}]") + raise err + except Exception as exc: + error = f"Encountered exception running step [{self}]: [{exc}]" + logger.warning(error) + self.status.failed( + StackTraceError( + name="Unhandled", error=error, stack_trace=traceback.format_exc() + ) + ) +``` + +By tracking `Unhandled` exceptions, we then know which pieces of the code need to be treated more carefully to control +scenarios that we might not be aware of. + +Then each `Step` control its own `Status` and exceptions (wrapped in the `Either`), and just push down the workflow +the actual `right` response. + +> OBS: We can think of this `Workflow` execution as a `flatMap` implementation. + +![base-workflow.workflow.drawio.png](../../../../openmetadata-docs/images/readme/ingestion/base-workflow.workflow.drawio.png) + +Note how in theory, we can keep building the steps together. diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index 5444739b1887..302316d8b024 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -18,7 +18,7 @@ from enum import Enum from logging import Logger from pathlib import Path -from typing import Dict, List, Optional, Type, Union +from typing import Dict, List, Type, Union from pydantic import BaseModel from tabulate import tabulate @@ -230,55 +230,6 @@ def print_status(workflow: "BaseWorkflow") -> None: ) -def print_test_suite_status(workflow) -> None: - """ - Print the test suite workflow results - """ - print_workflow_summary_legacy( - workflow, processor=True, processor_status=workflow.status - ) - - if workflow.result_status() == 1: - log_ansi_encoded_string( - color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE - ) - else: - log_ansi_encoded_string( - color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE - ) - - -def print_data_insight_status(workflow) -> None: - """ - Print the test suite workflow results - Args: - workflow (DataInsightWorkflow): workflow object - """ - # TODO: fixme - print_workflow_summary_legacy( - workflow, - processor=True, - processor_status=workflow.source.get_status(), - ) - - if workflow.source.get_status().source_start_time: - log_ansi_encoded_string( - message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)} ", # pylint: disable=line-too-long - ) - - if workflow.result_status() == 1: - log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE) - elif workflow.source.get_status().warnings or ( - hasattr(workflow, "sink") and workflow.sink.get_status().warnings - ): - log_ansi_encoded_string(message=WORKFLOW_WARNING_MESSAGE) - else: - log_ansi_encoded_string(message=WORKFLOW_SUCCESS_MESSAGE) - log_ansi_encoded_string( - color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE - ) - - def is_debug_enabled(workflow) -> bool: return ( hasattr(workflow, "config") @@ -357,118 +308,6 @@ def print_workflow_status_debug(workflow: "BaseWorkflow") -> None: log_ansi_encoded_string(message=step.get_status().as_string()) -def get_source_status(workflow, source_status: Status) -> Optional[Status]: - if hasattr(workflow, "source"): - return source_status if source_status else workflow.source.get_status() - return source_status - - -def get_processor_status(workflow, processor_status: Status) -> Optional[Status]: - if hasattr(workflow, "processor"): - return processor_status if processor_status else workflow.processor.get_status() - return processor_status - - -def print_workflow_summary_legacy( - workflow, - source: bool = False, - stage: bool = False, - bulk_sink: bool = False, - processor: bool = False, - source_status: Status = None, - processor_status: Status = None, -): - """ - To be removed. All workflows should use the new `print_workflow_summary` - after making the transition to the BaseWorkflow steps with common Status. - """ - source_status = get_source_status(workflow, source_status) - processor_status = get_processor_status(workflow, processor_status) - if is_debug_enabled(workflow): - print_workflow_status_debug_legacy( - workflow, - bulk_sink, - stage, - source_status, - processor_status, - ) - summary = Summary() - failures = [] - if source_status and source: - summary += get_summary(source_status) - failures.append(Failure(name="Source", failures=source_status.failures)) - if hasattr(workflow, "stage") and stage: - summary += get_summary(workflow.stage.get_status()) - failures.append( - Failure(name="Stage", failures=workflow.stage.get_status().failures) - ) - if hasattr(workflow, "sink"): - summary += get_summary(workflow.sink.get_status()) - failures.append( - Failure(name="Sink", failures=workflow.sink.get_status().failures) - ) - if hasattr(workflow, "bulk_sink") and bulk_sink: - summary += get_summary(workflow.bulk_sink.get_status()) - failures.append( - Failure(name="Bulk Sink", failures=workflow.bulk_sink.get_status().failures) - ) - if processor_status and processor: - summary += get_summary(processor_status) - failures.append(Failure(name="Processor", failures=processor_status.failures)) - - print_failures_if_apply(failures) - - log_ansi_encoded_string(bold=True, message="Workflow Summary:") - log_ansi_encoded_string(message=f"Total processed records: {summary.records}") - log_ansi_encoded_string(message=f"Total warnings: {summary.warnings}") - log_ansi_encoded_string(message=f"Total filtered: {summary.filtered}") - log_ansi_encoded_string(message=f"Total errors: {summary.errors}") - - total_success = max(summary.records, 1) - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message=f"Success %: " - f"{round(total_success * 100 / (total_success + summary.errors), 2)}", - ) - - -def print_workflow_status_debug_legacy( - workflow, - bulk_sink: bool = False, - stage: bool = False, - source_status: Status = None, - processor_status: Status = None, -) -> None: - """ - Args: - workflow: the workflow status to be printed - bulk_sink: if bull_sink status must be printed - stage: if stage status must be printed - source_status: source status to be printed - processor_status: processor status to be printed - - Returns: - Print Workflow status when the workflow logger level is DEBUG - """ - log_ansi_encoded_string(bold=True, message="Statuses detailed info:") - if source_status: - log_ansi_encoded_string(bold=True, message="Source Status:") - log_ansi_encoded_string(message=source_status.as_string()) - if hasattr(workflow, "stage") and stage: - log_ansi_encoded_string(bold=True, message="Stage Status:") - log_ansi_encoded_string(message=workflow.stage.get_status().as_string()) - if hasattr(workflow, "sink"): - log_ansi_encoded_string(bold=True, message="Sink Status:") - log_ansi_encoded_string(message=workflow.sink.get_status().as_string()) - if hasattr(workflow, "bulk_sink") and bulk_sink: - log_ansi_encoded_string(bold=True, message="Bulk Sink Status:") - log_ansi_encoded_string(message=workflow.bulk_sink.get_status().as_string()) - if processor_status: - log_ansi_encoded_string(bold=True, message="Processor Status:") - log_ansi_encoded_string(message=processor_status.as_string()) - - def get_summary(status: Status) -> Summary: records = len(status.records) warnings = len(status.warnings) diff --git a/ingestion/tests/cli_e2e/base/test_cli_dashboard.py b/ingestion/tests/cli_e2e/base/test_cli_dashboard.py index e4066834af25..e265ac4ffb7e 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_dashboard.py +++ b/ingestion/tests/cli_e2e/base/test_cli_dashboard.py @@ -18,7 +18,6 @@ import pytest -from metadata.ingestion.api.sink import SinkStatus from metadata.ingestion.api.status import Status from .e2e_types import E2EType @@ -95,17 +94,17 @@ def get_connector_name() -> str: raise NotImplementedError() @abstractmethod - def assert_not_including(self, source_status: Status, sink_status: SinkStatus): + def assert_not_including(self, source_status: Status, sink_status: Status): raise NotImplementedError() @abstractmethod def assert_for_vanilla_ingestion( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ) -> None: raise NotImplementedError() @abstractmethod - def assert_filtered_mix(self, source_status: Status, sink_status: SinkStatus): + def assert_filtered_mix(self, source_status: Status, sink_status: Status): raise NotImplementedError() @staticmethod diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 0a70d1574fc0..7b694405a021 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -19,7 +19,6 @@ import pytest from metadata.generated.schema.entity.data.table import Table -from metadata.ingestion.api.sink import SinkStatus from metadata.ingestion.api.status import Status from .e2e_types import E2EType @@ -243,54 +242,54 @@ def delete_table_and_view(self) -> None: @abstractmethod def assert_for_vanilla_ingestion( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ) -> None: raise NotImplementedError() @abstractmethod def assert_for_table_with_profiler( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod def assert_for_table_with_profiler_time_partition( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod def assert_for_delete_table_is_marked_as_deleted( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod def assert_filtered_schemas_includes( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod def assert_filtered_schemas_excludes( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod def assert_filtered_tables_includes( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod def assert_filtered_tables_excludes( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ): raise NotImplementedError() @abstractmethod - def assert_filtered_mix(self, source_status: Status, sink_status: SinkStatus): + def assert_filtered_mix(self, source_status: Status, sink_status: Status): raise NotImplementedError() @staticmethod diff --git a/ingestion/tests/cli_e2e/base/test_cli_dbt.py b/ingestion/tests/cli_e2e/base/test_cli_dbt.py index 85f1037db7b9..709a6d83789b 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_dbt.py +++ b/ingestion/tests/cli_e2e/base/test_cli_dbt.py @@ -20,7 +20,6 @@ from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform -from metadata.ingestion.api.sink import SinkStatus from metadata.ingestion.api.status import Status from .test_cli import CliBase @@ -107,12 +106,12 @@ def fqn_dbt_tables() -> List[str]: @abstractmethod def assert_for_vanilla_ingestion( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ) -> None: raise NotImplementedError() @abstractmethod def assert_for_dbt_ingestion( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ) -> None: raise NotImplementedError() diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index 01d853370535..61071a85b7b6 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -16,7 +16,6 @@ import pytest -from metadata.ingestion.api.sink import SinkStatus from metadata.ingestion.api.status import Status from .base.e2e_types import E2EType @@ -79,7 +78,7 @@ def get_connector_name() -> str: return "snowflake" def assert_for_vanilla_ingestion( - self, source_status: Status, sink_status: SinkStatus + self, source_status: Status, sink_status: Status ) -> None: self.assertTrue(len(source_status.failures) == 0) self.assertTrue(len(source_status.warnings) == 0) diff --git a/openmetadata-docs/images/readme/ingestion/base-workflow.steps.drawio.png b/openmetadata-docs/images/readme/ingestion/base-workflow.steps.drawio.png new file mode 100644 index 000000000000..7d82bd30f12f Binary files /dev/null and b/openmetadata-docs/images/readme/ingestion/base-workflow.steps.drawio.png differ diff --git a/openmetadata-docs/images/readme/ingestion/base-workflow.workflow.drawio.png b/openmetadata-docs/images/readme/ingestion/base-workflow.workflow.drawio.png new file mode 100644 index 000000000000..f1319f3ff12e Binary files /dev/null and b/openmetadata-docs/images/readme/ingestion/base-workflow.workflow.drawio.png differ