Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #7272 - BaseWorkflow docs and cleanup #13471

Merged
merged 23 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ingestion/src/metadata/cli/insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_data_insight_status,
print_init_error,
print_status,
)

logger = cli_logger()
Expand All @@ -48,5 +48,5 @@ def run_insight(config_path: str) -> None:

workflow.execute()
workflow.stop()
print_data_insight_status(workflow)
print_status(workflow)
workflow.raise_from_status()
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import Callable, Iterable, Optional

from metadata.generated.schema.analytics.reportData import ReportData
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.api.status import Status
from metadata.ingestion.ometa.ometa_api import OpenMetadata

Expand Down Expand Up @@ -68,5 +67,5 @@ def yield_refined_data(self) -> Iterable[ReportData]:
raise NotImplementedError

@abc.abstractmethod
def get_status(self) -> ProcessorStatus:
def get_status(self) -> Status:
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
topic,
)
from metadata.generated.schema.entity.teams.user import User
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.api.status import Status
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import get_entity_tier_from_tags
Expand Down Expand Up @@ -370,5 +369,5 @@ def refine(self, entity: WebAnalyticEventData) -> None:
"""Refine data"""
self._refined_data = self.refine_user_event.send(entity)

def get_status(self) -> ProcessorStatus:
def get_status(self) -> Status:
return self.processor_status
86 changes: 0 additions & 86 deletions ingestion/src/metadata/ingestion/api/processor.py

This file was deleted.

65 changes: 0 additions & 65 deletions ingestion/src/metadata/ingestion/api/sink.py

This file was deleted.

52 changes: 0 additions & 52 deletions ingestion/src/metadata/ingestion/api/stage.py

This file was deleted.

23 changes: 21 additions & 2 deletions ingestion/src/metadata/profiler/interface/profiler_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from sqlalchemy import Column
from typing_extensions import Self
Expand All @@ -32,7 +32,8 @@
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.ingestion.api.models import StackTraceError
from metadata.ingestion.api.status import Status
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.profiler.api.models import ProfileSampleConfig, TableConfig
Expand All @@ -42,6 +43,24 @@
from metadata.utils.partition import get_partition_details


class ProfilerProcessorStatus(Status):
"""Keep track of the entity being processed"""

entity: Optional[str] = None

def scanned(self, record: Any) -> None:
self.records.append(record)

def failed_profiler(self, error: str, stack_trace: Optional[str] = None) -> None:
self.failed(
StackTraceError(
name=self.entity if self.entity else "",
error=error,
stack_trace=stack_trace,
)
)


class ProfilerInterface(ABC):
"""Protocol interface for the profiler processor"""

Expand Down
4 changes: 1 addition & 3 deletions ingestion/src/metadata/utils/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.stage import Stage
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import BulkSink, Sink, Source
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.utils.class_helper import get_service_type_from_source_type
from metadata.utils.logger import utils_logger

Expand Down
Loading
Loading