Skip to content

Commit

Permalink
GEN-996 - Allow PII Processor without storing Sample Data (#17927)
Browse files Browse the repository at this point in the history
* GEN-996 - Allow PII Processor without storing Sample Data

* fix import

* fix import
  • Loading branch information
pmbrull committed Sep 20, 2024
1 parent b0563cc commit 4cccaae
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 36 deletions.
4 changes: 2 additions & 2 deletions ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,9 @@ def write_profiler_response(self, record: ProfilerResponse) -> Either[Table]:
f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.root}"
)

if record.sample_data:
if record.sample_data and record.sample_data.store:
table_data = self.metadata.ingest_table_sample_data(
table=record.table, sample_data=record.sample_data
table=record.table, sample_data=record.sample_data.data
)
if not table_data:
self.status.failed(
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/pii/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _run(
col_tags = self.process_column(
idx=idx,
column=column,
table_data=record.sample_data,
table_data=record.sample_data.data,
confidence_threshold=self.confidence_threshold,
)
if col_tags:
Expand Down
15 changes: 13 additions & 2 deletions ingestion/src/metadata/profiler/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

from typing import List, Optional, Type, Union

from pydantic import ConfigDict
from pydantic import ConfigDict, Field
from sqlalchemy import Column
from sqlalchemy.orm import DeclarativeMeta
from typing_extensions import Annotated

from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTableProfile import (
Expand All @@ -39,6 +40,7 @@
)
from metadata.generated.schema.tests.customMetric import CustomMetric
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.processor.models import ProfilerDef
Expand Down Expand Up @@ -109,6 +111,15 @@ class ProfilerProcessorConfig(ConfigModel):
databaseConfig: Optional[List[DatabaseAndSchemaConfig]] = []


class SampleData(BaseModel):
"""TableData wrapper to handle ephemeral SampleData"""

data: Annotated[TableData, Field(None, description="Table Sample Data")]
store: Annotated[
bool, Field(False, description="Is the sample data should be stored or not")
]


class ProfilerResponse(ConfigModel):
"""
ORM Profiler processor response.
Expand All @@ -119,7 +130,7 @@ class ProfilerResponse(ConfigModel):

table: Table
profile: CreateTableProfileRequest
sample_data: Optional[TableData] = None
sample_data: Optional[SampleData] = None
column_tags: Optional[List[ColumnTag]] = None

def __str__(self):
Expand Down
17 changes: 12 additions & 5 deletions ingestion/src/metadata/profiler/processor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@
ColumnProfile,
ColumnProfilerConfig,
SystemProfile,
TableData,
TableProfile,
)
from metadata.generated.schema.settings.settings import Settings
from metadata.generated.schema.tests.customMetric import (
CustomMetric as CustomMetricEntity,
)
from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.api.models import ProfilerResponse, ThreadPoolMetrics
from metadata.profiler.api.models import ProfilerResponse, SampleData, ThreadPoolMetrics
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import (
ComposedMetric,
Expand Down Expand Up @@ -489,7 +488,12 @@ def process(self) -> ProfilerResponse:
)
self.compute_metrics()

if self.source_config.generateSampleData:
# We need the sample data for Sample Data or PII Sensitive processing.
# We'll nullify the Sample Data after the PII processing so that it's not stored.
if (
self.source_config.generateSampleData
or self.source_config.processPiiSensitive
):
sample_data = self.generate_sample_data()
else:
sample_data = None
Expand All @@ -507,7 +511,7 @@ def process(self) -> ProfilerResponse:
return table_profile

@calculate_execution_time(store=False)
def generate_sample_data(self) -> Optional[TableData]:
def generate_sample_data(self) -> Optional[SampleData]:
"""Fetch and ingest sample data
Returns:
Expand All @@ -529,7 +533,10 @@ def generate_sample_data(self) -> Optional[TableData]:
SAMPLE_DATA_DEFAULT_COUNT, self.profiler_interface.sample_data_count
)
]
return table_data
return SampleData(
data=table_data, store=self.source_config.generateSampleData
)

except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching sample data: {err}")
Expand Down
43 changes: 19 additions & 24 deletions ingestion/src/metadata/profiler/processor/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,28 @@
"""
from typing import List, Optional

from pydantic import BaseModel, validator
from pydantic import BaseModel, BeforeValidator
from typing_extensions import Annotated

from metadata.profiler.metrics.registry import Metrics


def valid_metric(value: str):
"""
Validate that the input metrics are correctly named
and can be found in the Registry
"""
if not Metrics.get(value.upper()):
raise ValueError(
f"Metric name {value} is not a proper metric name from the Registry"
)

return value.upper()


ValidMetric = Annotated[str, BeforeValidator(valid_metric)]


class ProfilerDef(BaseModel):
"""
Incoming profiler definition from the
Expand All @@ -30,26 +47,4 @@ class ProfilerDef(BaseModel):
timeout_seconds: Optional[
int
] = None # Stop running a query after X seconds and continue
metrics: Optional[
List[str]
] = None # names of currently supported Static and Composed metrics
# TBD:
# time_metrics: List[TimeMetricDef] = None
# custom_metrics: List[CustomMetricDef] = None
# rule_metrics: ...

# pylint: disable=no-self-argument
@validator("metrics", each_item=True)
def valid_metric(cls, value):
"""
We are using cls as per pydantic docs
Validate that the input metrics are correctly named
and can be found in the Registry
"""
if not Metrics.get(value.upper()):
raise ValueError(
f"Metric name {value} is not a proper metric name from the Registry"
)

return value.upper()
metrics: Optional[List[ValidMetric]] = None
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.processor import PIIProcessor
from metadata.profiler.api.models import ProfilerResponse
from metadata.profiler.api.models import ProfilerResponse, SampleData

table_data = TableData(
columns=[
Expand Down Expand Up @@ -314,7 +314,7 @@ def test_ner_scanner_process(self):
)
)
),
sample_data=table_data,
sample_data=SampleData(data=table_data),
)

updated_record: ProfilerResponse = self.pii_processor.run(record)
Expand Down
25 changes: 25 additions & 0 deletions ingestion/tests/unit/profiler/test_profiler_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Profiler models behave properly"""
import pytest

from metadata.profiler.processor.models import ProfilerDef


def test_valid_metrics():
"""
Test that the metrics are valid
"""
profiler_def = ProfilerDef(name="test", metrics=["count"])
assert profiler_def.metrics == ["COUNT"]

with pytest.raises(ValueError):
ProfilerDef(name="test", metrics=["potato"])

0 comments on commit 4cccaae

Please sign in to comment.