Skip to content

Commit

Permalink
Fix #12770 - Cleanup DL structure & Readers & Python 3.8 (#12776)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull authored Aug 9, 2023
1 parent 71df43f commit e97d4be
Show file tree
Hide file tree
Showing 50 changed files with 1,068 additions and 629 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/py-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
py-version: ['3.7', '3.8', '3.9', '3.10']
py-version: ['3.8', '3.9', '3.10']
steps:

- name: Wait for the labeler
Expand Down
2 changes: 1 addition & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def get_long_description():
description="Ingestion Framework for OpenMetadata",
long_description=get_long_description(),
long_description_content_type="text/markdown",
python_requires=">=3.7",
python_requires=">=3.8",
options={"build_exe": build_options},
package_dir={"": "src"},
package_data={"metadata.examples": ["workflows/*.yaml"]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@
ViewName,
)
from metadata.ingestion.source.dashboard.looker.parser import LkmlParser
from metadata.readers.api_reader import ReadersCredentials
from metadata.readers.base import Reader
from metadata.readers.bitbucket import BitBucketReader
from metadata.readers.credentials import get_credentials_from_url
from metadata.readers.github import GitHubReader
from metadata.readers.file.api_reader import ReadersCredentials
from metadata.readers.file.base import Reader
from metadata.readers.file.bitbucket import BitBucketReader
from metadata.readers.file.credentials import get_credentials_from_url
from metadata.readers.file.github import GitHubReader
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart, filter_by_datamodel
from metadata.utils.helpers import clean_uri, get_standard_chart_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
LookMlView,
ViewName,
)
from metadata.readers.base import Reader, ReadException
from metadata.readers.file.base import Reader, ReadException
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Handle column logic when reading data from DataLake
"""
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR


def _get_root_col(col_name: str) -> str:
return col_name.split(COMPLEX_COLUMN_SEPARATOR)[1]


def clean_dataframe(df):
all_complex_root_columns = set(
_get_root_col(col) for col in df if COMPLEX_COLUMN_SEPARATOR in col
)
for complex_col in all_complex_root_columns:
if complex_col in df.columns:
df = df.drop(complex_col, axis=1)
return df
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,12 @@
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_helpers import truncate_column_name
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.datalake.models import (
DatalakeTableSchemaWrapper,
)
from metadata.ingestion.source.database.datalake.columns import clean_dataframe
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.readers.dataframe.reader_factory import SupportedTypes
from metadata.utils import fqn
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE
from metadata.utils.datalake.datalake_utils import (
SupportedTypes,
clean_dataframe,
fetch_dataframe,
)
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger

Expand Down Expand Up @@ -375,15 +371,13 @@ def yield_table(
schema_name = self.context.database_schema.name.__root__
try:
table_constraints = None
connection_args = self.service_connection.configSource.securityConfig
data_frame = fetch_dataframe(
config_source=self.service_connection.configSource,
client=self.client,
file_fqn=DatalakeTableSchemaWrapper(
key=table_name,
bucket_name=schema_name,
),
connection_kwargs=connection_args,
)
# If no data_frame (due to unsupported type), ignore
columns = self.get_columns(data_frame[0]) if data_frame else None
Expand Down Expand Up @@ -522,7 +516,7 @@ def fetch_col_types(data_frame, column_name):
return data_type

@staticmethod
def get_columns(data_frame: list):
def get_columns(data_frame: "DataFrame"):
"""
method to process column details
"""
Expand Down Expand Up @@ -557,7 +551,6 @@ def get_columns(data_frame: list):
"name": truncate_column_name(column),
"displayName": column,
}
parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
cols.append(Column(**parsed_string))
except Exception as exc:
logger.debug(traceback.format_exc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.database.datalake.models import (
DatalakeTableSchemaWrapper,
)
from metadata.ingestion.source.storage.s3.models import (
S3BucketResponse,
S3ContainerDetails,
)
from metadata.ingestion.source.storage.storage_service import StorageServiceSource
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -203,14 +201,12 @@ def extract_column_definitions(
"""
Extract Column related metadata from s3
"""
connection_args = self.service_connection.awsConfig
data_structure_details = fetch_dataframe(
config_source=S3Config(),
config_source=S3Config(securityConfig=self.service_connection.awsConfig),
client=self.s3_client,
file_fqn=DatalakeTableSchemaWrapper(
key=sample_key, bucket_name=bucket_name
),
connection_kwargs=connection_args,
)
columns = []
if isinstance(data_structure_details, DataFrame):
Expand Down
6 changes: 1 addition & 5 deletions ingestion/src/metadata/mixins/pandas/pandas_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
PartitionProfilerConfig,
ProfileSampleType,
)
from metadata.ingestion.source.database.datalake.models import (
DatalakeTableSchemaWrapper,
)
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.logger import test_suite_logger

Expand Down Expand Up @@ -89,15 +87,13 @@ def return_ometa_dataframes_sampled(
"""
returns sampled ometa dataframes
"""
connection_args = service_connection_config.configSource.securityConfig
data = fetch_dataframe(
config_source=service_connection_config.configSource,
client=client,
file_fqn=DatalakeTableSchemaWrapper(
key=table.name.__root__, bucket_name=table.databaseSchema.name
),
is_profiler=True,
connection_kwargs=connection_args,
)
if data:
random.shuffle(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
DatalakeConnection,
)
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.database.datalake.models import (
DatalakeTableSchemaWrapper,
)
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
Expand Down Expand Up @@ -81,12 +79,11 @@ def __init__(
self.dfs = self._convert_table_to_list_of_dataframe_objects()

def _convert_table_to_list_of_dataframe_objects(self):
"""From a tablen entity, return the conresponding dataframe object
"""From a table entity, return the corresponding dataframe object
Returns:
List[DataFrame]
"""
connection_args = self.service_connection_config.configSource.securityConfig
data = fetch_dataframe(
config_source=self.service_connection_config.configSource,
client=self.client,
Expand All @@ -95,7 +92,6 @@ def _convert_table_to_list_of_dataframe_objects(self):
bucket_name=self.table_entity.databaseSchema.name,
),
is_profiler=True,
connection_kwargs=connection_args,
)

if not data:
Expand Down
Empty file.
82 changes: 82 additions & 0 deletions ingestion/src/metadata/readers/dataframe/avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Avro DataFrame reader
"""
import io

from avro.datafile import DataFileReader
from avro.errors import InvalidAvroBinaryEncoding
from avro.io import DatumReader

from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.type.schema import DataTypeTopic
from metadata.parsers.avro_parser import parse_avro_schema
from metadata.readers.dataframe.base import DataFrameReader
from metadata.readers.dataframe.common import dataframe_to_chunks
from metadata.readers.dataframe.models import DatalakeColumnWrapper
from metadata.utils.constants import UTF_8

PD_AVRO_FIELD_MAP = {
DataTypeTopic.BOOLEAN.value: "bool",
DataTypeTopic.INT.value: "int",
DataTypeTopic.LONG.value: "float",
DataTypeTopic.FLOAT.value: "float",
DataTypeTopic.DOUBLE.value: "float",
DataTypeTopic.TIMESTAMP.value: "float",
DataTypeTopic.TIMESTAMPZ.value: "float",
}

AVRO_SCHEMA = "avro.schema"


class AvroDataFrameReader(DataFrameReader):
"""
Manage the implementation to read Avro dataframes
from any source based on its init client.
"""

@staticmethod
def read_from_avro(avro_text: bytes) -> DatalakeColumnWrapper:
"""
Method to parse the avro data from storage sources
"""
# pylint: disable=import-outside-toplevel
from pandas import DataFrame, Series

try:
elements = DataFileReader(io.BytesIO(avro_text), DatumReader())
if elements.meta.get(AVRO_SCHEMA):
return DatalakeColumnWrapper(
columns=parse_avro_schema(
schema=elements.meta.get(AVRO_SCHEMA).decode(UTF_8), cls=Column
),
dataframes=dataframe_to_chunks(DataFrame.from_records(elements)),
)
return DatalakeColumnWrapper(
dataframes=dataframe_to_chunks(DataFrame.from_records(elements))
)
except (AssertionError, InvalidAvroBinaryEncoding):
columns = parse_avro_schema(schema=avro_text, cls=Column)
field_map = {
col.name.__root__: Series(
PD_AVRO_FIELD_MAP.get(col.dataType.value, "str")
)
for col in columns
}
return DatalakeColumnWrapper(
columns=columns, dataframes=dataframe_to_chunks(DataFrame(field_map))
)

def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
text = self.reader.read(key, bucket_name=bucket_name)
return self.read_from_avro(text)
71 changes: 71 additions & 0 deletions ingestion/src/metadata/readers/dataframe/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Dataframe base reader
"""

from abc import ABC, abstractmethod
from typing import Any, Optional

from metadata.readers.dataframe.models import DatalakeColumnWrapper
from metadata.readers.file.base import Reader
from metadata.readers.file.config_source_factory import get_reader
from metadata.readers.models import ConfigSource
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class FileFormatException(Exception):
def __init__(self, config_source: Any, file_name: str) -> None:
message = f"Missing implementation for {config_source.__class__.__name__} for {file_name}"
super().__init__(message)


class DataFrameReadException(Exception):
"""
To be raised by any errors with the read calls
"""


class DataFrameReader(ABC):
"""
Abstract class for all readers.
Readers are organized by Format, not by Source Type (S3, GCS or ADLS).
Some DF readers first need to read the full file and then prepare the
dataframe. This is why we add the File Reader as well.
"""

config_source: ConfigSource
reader: Reader

def __init__(self, config_source: ConfigSource, client: Optional[Any]):
self.config_source = config_source
self.client = client

self.reader = get_reader(config_source=config_source, client=client)

@abstractmethod
def _read(self, *, key: str, bucket_name: str, **kwargs) -> DatalakeColumnWrapper:
"""
Pass the path, bucket, or any other necessary details
to read the dataframe from the source.
"""
raise NotImplementedError("Missing read implementation")

def read(self, *, key: str, bucket_name: str, **kwargs) -> DatalakeColumnWrapper:
try:
return self._read(key=key, bucket_name=bucket_name, **kwargs)
except Exception as err:
raise DataFrameReadException(f"Error reading dataframe due to [{err}]")
Loading

0 comments on commit e97d4be

Please sign in to comment.