Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Tagger Application - Preparing the Ingestion Framework #13862

Merged
merged 17 commits into from
Nov 13, 2023
Merged
4 changes: 2 additions & 2 deletions ingestion/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ install_all: ## Install the ingestion module with all dependencies

.PHONY: install_apis
install_apis: ## Install the REST APIs module to the current environment
python -m pip install $(ROOT_DIR)openmetadata-airflow-apis/
python -m pip install $(ROOT_DIR)/openmetadata-airflow-apis/

.PHONY: lint
lint: ## Run pylint on the Python sources to analyze the codebase
Expand Down Expand Up @@ -90,7 +90,7 @@ run_apis_tests: ## Run the openmetadata airflow apis tests
coverage_apis: ## Run the python tests on openmetadata-airflow-apis
$(MAKE) run_apis_tests
coverage xml --rcfile $(ROOT_DIR)/openmetadata-airflow-apis/.coveragerc -o $(ROOT_DIR)/openmetadata-airflow-apis/coverage.xml
sed -e "s/$(shell python -c "import site; import os; from pathlib import Path; print(os.path.relpath(site.getsitepackages()[0], str(Path.cwd())).replace('/','\/'))")\///g" $(INGESTION_DIR)/openmetadata-airflow-apis/coverage.xml >> $(INGESTION_DIR)/openmetadata-airflow-apis/ci-coverage.xml
sed -e "s/$(shell python -c "import site; import os; from pathlib import Path; print(os.path.relpath(site.getsitepackages()[0], str(Path.cwd())).replace('/','\/'))")\///g" $(ROOT_DIR)/openmetadata-airflow-apis/coverage.xml >> $(ROOT_DIR)/openmetadata-airflow-apis/ci-coverage.xml



Expand Down
Empty file.
200 changes: 200 additions & 0 deletions ingestion/src/metadata/applications/auto_tagger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PII application
"""
import traceback
from typing import Iterable, List, Optional

from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import (
AutoTaggerAppConfig,
)
from metadata.generated.schema.entity.data.table import Column, Table, TableData
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagFQN,
TagLabel,
TagSource,
)
from metadata.ingestion.api.models import StackTraceError
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.constants import PII
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
from metadata.pii.scanners.ner_scanner import NERScanner
from metadata.utils.logger import app_logger
from metadata.workflow.application import AppRunner, InvalidAppConfiguration

logger = app_logger()

DEFAULT_CONFIDENCE = 80


class AutoTaggerApp(AppRunner):
"""
PII Application
You can execute it with `metadata app -c <path-to-yaml>`
with a YAML file like:

sourcePythonClass: metadata.applications.auto_tagger.AutoTaggerApp
config:
confidenceLevel: 80
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "..."
"""

def __init__(self, config: AutoTaggerAppConfig, metadata: OpenMetadata):
super().__init__(config, metadata)

if not isinstance(config, AutoTaggerAppConfig):
raise InvalidAppConfiguration(
f"AutoTagger Runner expects an AutoTaggerAppConfig, we got [{config}]"
)

self._ner_scanner = None
self.confidence_threshold = config.confidenceLevel or DEFAULT_CONFIDENCE

@staticmethod
def build_column_tag(tag_fqn: str, column_fqn: str) -> ColumnTag:
"""
Build the tag and run the PATCH
"""
tag_label = TagLabel(
tagFQN=TagFQN(__root__=tag_fqn),
source=TagSource.Classification,
state=State.Suggested,
labelType=LabelType.Automated,
)

return ColumnTag(column_fqn=column_fqn, tag_label=tag_label)

@property
def ner_scanner(self) -> NERScanner:
"""Load the NER Scanner only if called"""
if self._ner_scanner is None:
self._ner_scanner = NERScanner()

return self._ner_scanner

def process_column(
self,
idx: int,
column: Column,
table_data: Optional[TableData],
confidence_threshold: float,
) -> Optional[List[ColumnTag]]:
"""
Tag a column with PII if we find it using our scanners
"""

# First, check if the column we are about to process
# already has PII tags or not
column_has_pii_tag = any(
(PII in tag.tagFQN.__root__ for tag in column.tags or [])
)

# If it has PII tags, we skip the processing
# for the column
if column_has_pii_tag is True:
return None

# Scan by column name. If no results there, check the sample data, if any
tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None
)

if (
tag_and_confidence
and tag_and_confidence.tag_fqn
and tag_and_confidence.confidence >= confidence_threshold / 100
):
# We support returning +1 tags for a single column in _run
return [
self.build_column_tag(
tag_fqn=tag_and_confidence.tag_fqn,
column_fqn=column.fullyQualifiedName.__root__,
)
]

return None

def process_table(self, table: Table) -> Optional[List[ColumnTag]]:
"""Run the patching of the table"""
column_tags = []
for idx, column in enumerate(table.columns):
try:
col_tags = self.process_column(
idx=idx,
column=column,
table_data=table.sampleData,
confidence_threshold=self.confidence_threshold,
)
if col_tags:
column_tags.extend(col_tags)
except Exception as err:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error=f"Error computing PII tags for [{column}] - [{err}]",
stack_trace=traceback.format_exc(),
)
)

if column_tags:
return column_tags

return None

def patch_columns(self, table: Table, column_tags: List[ColumnTag]) -> None:
"""Patch columns with PII"""
patched = self.metadata.patch_column_tags(table=table, column_tags=column_tags)
if not patched:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error="Error patching tags for table",
)
)
else:
self.status.scanned(table)
logger.debug(
f"Successfully patched tag {column_tags} for {table.fullyQualifiedName.__root__}"
)

def run(self) -> None:
"""
The PII Application will:
1. List tables
2. Check their column names and sample data (if any)
3. PATCH PII tags when needed
"""
tables: Iterable[Table] = self.metadata.list_all_entities(
entity=Table, fields=["sampleData", "tags"]
)
for table in tables:
column_tags = self.process_table(table)
if column_tags:
self.patch_columns(table=table, column_tags=column_tags)
else:
self.status.filter(
key=table.fullyQualifiedName.__root__, reason="No PII found"
)

def close(self) -> None:
"""Nothing to close"""
47 changes: 47 additions & 0 deletions ingestion/src/metadata/cli/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Profiler utility for the metadata CLI
"""
import sys
import traceback
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.application import ApplicationWorkflow
from metadata.workflow.application_output_handler import print_status

logger = cli_logger()


def run_app(config_path: Path) -> None:
"""
Run the application workflow from a config path
to a JSON or YAML file
:param config_path: Path to load JSON config
"""

config_dict = None
try:
config_dict = load_config_file(config_path)
workflow = ApplicationWorkflow.create(config_dict)
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.error(f"Error running the application {exc}")
logger.debug(traceback.format_exc())
sys.exit(1)

workflow.execute()
workflow.stop()
print_status(workflow)
workflow.raise_from_status()
38 changes: 26 additions & 12 deletions ingestion/src/metadata/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pathlib import Path

from metadata.__version__ import get_metadata_version
from metadata.cli.app import run_app
from metadata.cli.backup import UploadDestinationType, run_backup
from metadata.cli.dataquality import run_test
from metadata.cli.docker import BACKEND_DATABASES, DockerActions, run_docker
Expand Down Expand Up @@ -50,10 +51,22 @@ class MetadataCommands(Enum):
WEBHOOK = "webhook"
INSIGHT = "insight"
LINEAGE = "lineage"
APP = "app"
OPENMETADATA_IMPORTS_MIGRATION = "openmetadata_imports_migration"
OPENMETADATA_DAG_CONFIG_MIGRATION = "openmetadata_dag_config_migration"


RUN_PATH_METHODS = {
MetadataCommands.INGEST.value: run_ingest,
MetadataCommands.USAGE.value: run_usage,
MetadataCommands.LINEAGE.value: run_lineage,
MetadataCommands.INSIGHT.value: run_insight,
MetadataCommands.PROFILE.value: run_profiler,
MetadataCommands.TEST.value: run_test,
MetadataCommands.APP.value: run_app,
}


OM_IMPORTS_MIGRATION = """
Update DAG files generated after creating workflow in 0.11 and before.
In 0.12 the airflow managed API package name changed from `openmetadata` to
Expand Down Expand Up @@ -365,6 +378,12 @@ def get_parser(args=None):
MetadataCommands.TEST.value, help="Workflow for running test suites"
)
)
create_common_config_parser_args(
sub_parser.add_parser(
MetadataCommands.APP.value,
help="Workflow for running external applications",
)
)
create_openmetadata_imports_migration_args(
sub_parser.add_parser(
MetadataCommands.OPENMETADATA_IMPORTS_MIGRATION.value,
Expand Down Expand Up @@ -418,25 +437,20 @@ def metadata(args=None): # pylint: disable=too-many-branches
config_file = contains_args.get("config")
if config_file:
path = Path(config_file).expanduser()
else:
raise ValueError(
"Could not load config file! Please specify the config path with `-c` or `--config`."
)
if contains_args.get("debug"):
set_loggers_level(logging.DEBUG)
elif contains_args.get("log_level"):
set_loggers_level(contains_args.get("log_level"))
else:
set_loggers_level(logging.INFO)

if metadata_workflow == MetadataCommands.INGEST.value:
run_ingest(config_path=path)
if metadata_workflow == MetadataCommands.USAGE.value:
run_usage(config_path=path)
if metadata_workflow == MetadataCommands.LINEAGE.value:
run_lineage(config_path=path)
if metadata_workflow == MetadataCommands.INSIGHT.value:
run_insight(config_path=path)
if metadata_workflow == MetadataCommands.PROFILE.value:
run_profiler(config_path=path)
if metadata_workflow == MetadataCommands.TEST.value:
run_test(config_path=path)
if metadata_workflow in RUN_PATH_METHODS:
RUN_PATH_METHODS[metadata_workflow](path)

if metadata_workflow == MetadataCommands.BACKUP.value:
run_backup(
common_backup_obj_instance=BackupRestoreArgs(
Expand Down
5 changes: 2 additions & 3 deletions ingestion/src/metadata/utils/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import importlib
import traceback
from enum import Enum
from typing import Callable, Optional, Type, TypeVar
from typing import Any, Callable, Optional, Type, TypeVar

from pydantic import BaseModel

Expand All @@ -23,7 +23,6 @@
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.utils.class_helper import get_service_type_from_source_type
from metadata.utils.logger import utils_logger
Expand Down Expand Up @@ -86,7 +85,7 @@ def get_class_name_root(type_: str) -> str:
)


def import_from_module(key: str) -> Type[Step]:
def import_from_module(key: str) -> Type[Any]:
"""
Dynamically import an object from a module path
"""
Expand Down
9 changes: 9 additions & 0 deletions ingestion/src/metadata/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Loggers(Enum):
TEST_SUITE = "TestSuite"
DATA_INSIGHT = "DataInsight"
QUERY_RUNNER = "QueryRunner"
APP = "App"

@DynamicClassAttribute
def value(self):
Expand Down Expand Up @@ -151,6 +152,14 @@ def great_expectations_logger():
return logging.getLogger(Loggers.GREAT_EXPECTATIONS.value)


def app_logger():
"""
Method to get the APP logger
"""

return logging.getLogger(Loggers.APP.value)


def query_runner_logger():
"""
Method to get the QUERY_RUNNER logger
Expand Down
Loading
Loading