diff --git a/README.md b/README.md
index cc7f600..5368183 100644
--- a/README.md
+++ b/README.md
@@ -14,13 +14,14 @@ To learn more about collector types and ODD Platform's architecture, [read the d
## Implemented adapters
| Service | Config example |
-| ------------------------------------------------------------ | ----------------------------------------------------- |
+|--------------------------------------------------------------|-------------------------------------------------------|
| Athena | [config](config_examples/athena.yaml) |
| DynamoDB | [config](config_examples/dynamodb.yaml) |
| Glue | [config](config_examples/glue.yaml) |
| Kinesis | [config](config_examples/kinesis.yaml) |
| Quicksight | [config](config_examples/quicksight.yaml) |
| S3 | [config](config_examples/s3.yaml) |
+| S3_Delta | [config](config_examples/s3_delta.yaml) |
| Sagemaker | [config](config_examples/sagemaker.yaml) |
| SQS | [config](config_examples/sqs.yaml) |
| SagemakerFeaturestore | [config](config_examples/sagemaker_featurestore.yaml) |
diff --git a/config_examples/s3_delta.yaml b/config_examples/s3_delta.yaml
index f9d4db1..cb6f6f2 100644
--- a/config_examples/s3_delta.yaml
+++ b/config_examples/s3_delta.yaml
@@ -27,9 +27,12 @@ plugins:
delta_tables:
- bucket: bucket
prefix: delta_data
- object_filter: # will exclude all folders with _pii at the end of name from ingestion
+ filter: # will include only tables with "events" and exclude all tables with "_pii" in the table prefix and name
exclude:
- - ".*_pii"
+ include: [ "events" ]
+ exclude: [ "_pii" ]
+
+
# Minio S3 Delta Lake Adapter
default_pulling_interval: 10
diff --git a/odd_collector_aws/__init__.py b/odd_collector_aws/__init__.py
index e69de29..fabaf86 100644
--- a/odd_collector_aws/__init__.py
+++ b/odd_collector_aws/__init__.py
@@ -0,0 +1,11 @@
+from odd_collector_aws.logger import logger
+
+
+def get_version() -> str:
+ try:
+ from odd_collector_aws.__version__ import VERSION
+
+ return VERSION
+ except Exception as e:
+ logger.warning(f"Can't get version from odd_collector_aws.__version__. {e}")
+ return "-"
diff --git a/odd_collector_aws/__main__.py b/odd_collector_aws/__main__.py
index cc6ab2f..acc3ab5 100644
--- a/odd_collector_aws/__main__.py
+++ b/odd_collector_aws/__main__.py
@@ -1,12 +1,18 @@
from pathlib import Path
+import odd_collector_sdk as sdk
from odd_collector_sdk.collector import Collector
+from odd_collector_aws import get_version
from odd_collector_aws.domain.plugin import PLUGIN_FACTORY
+from odd_collector_aws.logger import logger
COLLECTOR_PACKAGE = __package__
CONFIG_PATH = Path().cwd() / "collector_config.yaml"
+logger.info(f"AWS collector version: {get_version()}")
+logger.info(f"SDK: {sdk.get_version()}")
+
collector = Collector(
config_path=CONFIG_PATH,
root_package=COLLECTOR_PACKAGE,
diff --git a/odd_collector_aws/__version__.py b/odd_collector_aws/__version__.py
new file mode 100644
index 0000000..ab1bdd5
--- /dev/null
+++ b/odd_collector_aws/__version__.py
@@ -0,0 +1 @@
+VERSION = "0.1.4"
diff --git a/odd_collector_aws/adapters/s3_delta/adapter.py b/odd_collector_aws/adapters/s3_delta/adapter.py
index 5855eed..65900cd 100644
--- a/odd_collector_aws/adapters/s3_delta/adapter.py
+++ b/odd_collector_aws/adapters/s3_delta/adapter.py
@@ -13,7 +13,6 @@
from .mappers.delta_table import map_delta_table
# TODO: Add tags
-# TODO: Complex types types
class Adapter(BaseAdapter):
diff --git a/odd_collector_aws/adapters/s3_delta/client.py b/odd_collector_aws/adapters/s3_delta/client.py
index 878428e..043f61c 100644
--- a/odd_collector_aws/adapters/s3_delta/client.py
+++ b/odd_collector_aws/adapters/s3_delta/client.py
@@ -15,7 +15,7 @@
def handle_values(
- obj: dict, handler: tuple[str, callable]
+ obj: dict, handler: tuple[str, callable]
) -> tuple[str, Optional[any]]:
key, callback = handler
return key, silent(callback)(obj.get(key))
@@ -76,8 +76,9 @@ def handle_folder(self, config: DeltaTableConfig) -> Iterable[DTable]:
folders = filter(lambda obj: not obj.is_file, objects)
allowed = filter(lambda folder: folder.base_name, folders)
+ filtered = filter(lambda item: config.allow(item.path), allowed)
- for obj in allowed:
+ for obj in filtered:
new_config = config.append_prefix(obj.base_name)
yield from self.get_table(new_config)
diff --git a/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py b/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py
index 7735a8e..c3895b5 100644
--- a/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py
+++ b/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py
@@ -2,6 +2,7 @@
from oddrn_generator import S3Generator
from odd_collector_aws.utils.parse_s3_url import parse_s3_url
+from ..models.field import DField
from ..models.table import DTable
from .field import map_field
@@ -10,9 +11,14 @@
def map_delta_table(generator: S3Generator, delta_table: DTable) -> DataEntity:
bucket, key = parse_s3_url(delta_table.table_uri)
-
+ fields = [DField(field) for field in delta_table.schema.fields]
generator.set_oddrn_paths(buckets=bucket, keys=key)
+ field_list = []
+ for field in fields:
+ processed_ds_fields = map_field(generator, field)
+ field_list.extend(processed_ds_fields)
+
return DataEntity(
oddrn=generator.get_oddrn_by_path("keys"),
name=key,
@@ -21,10 +27,5 @@ def map_delta_table(generator: S3Generator, delta_table: DTable) -> DataEntity:
updated_at=delta_table.updated_at,
owner=None,
metadata=map_metadata(delta_table),
- dataset=DataSet(
- rows_number=delta_table.num_rows,
- field_list=[
- map_field(generator, field) for field in delta_table.schema.fields
- ],
- ),
+ dataset=DataSet(rows_number=delta_table.num_rows, field_list=field_list),
)
diff --git a/odd_collector_aws/adapters/s3_delta/mappers/field.py b/odd_collector_aws/adapters/s3_delta/mappers/field.py
index 4181355..443c070 100644
--- a/odd_collector_aws/adapters/s3_delta/mappers/field.py
+++ b/odd_collector_aws/adapters/s3_delta/mappers/field.py
@@ -1,60 +1,38 @@
-from deltalake._internal import ArrayType, Field, MapType, PrimitiveType, StructType
-from odd_models.models import DataSetField, DataSetFieldType, Type
+from pathlib import Path
+from odd_collector_sdk.grammar_parser.build_dataset_field import DatasetFieldBuilder
+from odd_models.models import DataSetField, Type
from oddrn_generator import S3Generator
-
-from ..logger import logger
-
-DELTA_TO_ODD_TYPE_MAP = {
+from ..models.field import DField
+
+DELTA_TO_ODD_TYPE_MAP: dict[str, Type] = {
+ "float": Type.TYPE_NUMBER,
+ "struct": Type.TYPE_STRUCT,
+ "bigint": Type.TYPE_INTEGER,
+ "binary": Type.TYPE_BINARY,
+ "boolean": Type.TYPE_BOOLEAN,
+ "date": Type.TYPE_DATETIME,
+ "decimal": Type.TYPE_NUMBER,
+ "double": Type.TYPE_NUMBER,
+ "void": Type.TYPE_UNKNOWN,
+ "smallint": Type.TYPE_INTEGER,
+ "timestamp": Type.TYPE_TIME,
+ "tinyint": Type.TYPE_INTEGER,
+ "array": Type.TYPE_LIST,
+ "map": Type.TYPE_MAP,
+ "int": Type.TYPE_INTEGER,
+ "long": Type.TYPE_INTEGER,
"string": Type.TYPE_STRING,
+ "interval": Type.TYPE_DURATION,
}
-def map_to_odd_type(delta_type: str) -> Type:
- return DELTA_TO_ODD_TYPE_MAP.get(delta_type, Type.TYPE_UNKNOWN)
-
-
-def unknown_field(generator: S3Generator, field: Field) -> DataSetField:
- generator.set_oddrn_paths(columns=field.name)
- return DataSetField(
- oddrn=generator.get_oddrn_by_path("columns"),
- name=field.name,
- type=DataSetFieldType(
- type=Type.TYPE_UNKNOWN,
- logical_type=field.type,
- is_nullable=field.nullable,
- ),
+def map_field(oddrn_generator: S3Generator, column: DField) -> list[DataSetField]:
+ field_builder = DatasetFieldBuilder(
+ data_source="s3_delta",
+ oddrn_generator=oddrn_generator,
+ parser_config_path=Path(
+ "odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark"
+ ).absolute(),
+ odd_types_map=DELTA_TO_ODD_TYPE_MAP,
)
-
-
-def map_primitive(generator: S3Generator, field: Field) -> DataSetField:
- generator.set_oddrn_paths(columns=field.name)
- return DataSetField(
- oddrn=generator.get_oddrn_by_path("columns"),
- name=field.name,
- type=DataSetFieldType(
- type=map_to_odd_type(field.type.type),
- logical_type=field.type.type,
- is_nullable=field.nullable,
- ),
- )
-
-
-def map_map(generator: S3Generator, field: Field) -> DataSetField:
- ...
-
-
-def map_struct(generator: S3Generator, field: Field) -> DataSetField:
- ...
-
-
-def map_array(generator: S3Generator, field: Field) -> DataSetField:
- ...
-
-
-def map_field(generator: S3Generator, field: Field) -> DataSetField:
- type_ = field.type
-
- if isinstance(type_, PrimitiveType):
- return map_primitive(generator, field)
- else:
- logger.error(f"Unknown field type: {field.type}")
+ return field_builder.build_dataset_field(column)
diff --git a/odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark b/odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark
new file mode 100644
index 0000000..4b680b2
--- /dev/null
+++ b/odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark
@@ -0,0 +1,40 @@
+?start: type
+
+BASIC_TYPE: "string"
+ | "int"
+ | "float"
+ | "double"
+ | "bigint"
+ | "binary"
+ | "boolean"
+ | "date"
+ | "decimal"
+ | "void"
+ | "interval"
+ | "smallint"
+ | "timestamp"
+ | "timestamp_ntz"
+ | "tinyint"
+ | "long"
+
+primitive_type: "PrimitiveType(\"" BASIC_TYPE "\")"
+
+FIELD_NAME: (LETTER | "_") (LETTER | "_" | DIGIT | "-")*
+
+array: "ArrayType" "(" type ", " "contains_null=" BOOL ")"
+
+field: "Field(" FIELD_NAME ", " type ", " "nullable=" BOOL ")"
+
+struct: "StructType([" [field (", " field)*] "])"
+
+map: "MapType(" type ", " type ", " "value_contains_null=" BOOL ")"
+
+?type: primitive_type
+ | array
+ | struct
+ | map
+
+BOOL: "True" | "False"
+
+%import common.LETTER
+%import common.DIGIT
diff --git a/odd_collector_aws/adapters/s3_delta/models/field.py b/odd_collector_aws/adapters/s3_delta/models/field.py
new file mode 100644
index 0000000..ac76314
--- /dev/null
+++ b/odd_collector_aws/adapters/s3_delta/models/field.py
@@ -0,0 +1,25 @@
+from typing import Union
+
+from deltalake import Field
+from deltalake._internal import PrimitiveType, ArrayType, MapType, StructType
+
+
+class DField:
+ def __init__(self, field: Field):
+ self.field = field
+
+ @property
+ def odd_metadata(self) -> dict:
+ return self.field.metadata
+
+ @property
+ def name(self) -> str:
+ return self.field.name
+
+ @property
+ def type(self) -> Union[PrimitiveType, ArrayType, MapType, StructType]:
+ return self.field.type
+
+ @property
+ def nullable(self) -> bool:
+ return self.nullable
diff --git a/odd_collector_aws/domain/plugin.py b/odd_collector_aws/domain/plugin.py
index 5b3a96a..2c65ef2 100644
--- a/odd_collector_aws/domain/plugin.py
+++ b/odd_collector_aws/domain/plugin.py
@@ -45,7 +45,7 @@ class DeltaTableConfig(BaseModel):
scheme: str = Field(default="s3", alias="schema")
bucket: str
prefix: str
- object_filter: Optional[Filter] = Filter()
+ filter: Optional[Filter] = Filter()
@property
def path(self) -> str:
@@ -56,11 +56,11 @@ def append_prefix(self, path: str) -> "DeltaTableConfig":
schema=self.scheme,
bucket=self.bucket,
prefix=f"{self.prefix}/{path}",
- object_filter=self.object_filter,
+ filter=self.filter,
)
def allow(self, name: str) -> bool:
- return self.object_filter.is_allowed(name)
+ return self.filter.is_allowed(name)
class S3DeltaPlugin(AwsPlugin):
diff --git a/poetry.lock b/poetry.lock
index aff72bd..4d2db7c 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -874,24 +874,27 @@ files = [
[[package]]
name = "odd-collector-sdk"
-version = "0.3.31"
+version = "0.3.40"
description = "ODD Collector"
category = "main"
optional = false
python-versions = ">=3.9,<4.0"
files = [
- {file = "odd_collector_sdk-0.3.31-py3-none-any.whl", hash = "sha256:90fbec1dba972ad6f684d22bb56cbda6229580600cc7c1f08425e60ff4fb9ee6"},
- {file = "odd_collector_sdk-0.3.31.tar.gz", hash = "sha256:4ccd76659bf1b54a939544d1b73b901b8e4e9385f4b5d8272f56cc5cb0c0ae23"},
+ {file = "odd_collector_sdk-0.3.40-py3-none-any.whl", hash = "sha256:da308c2accbfdb29089cf98dba3d73d796592f5c4dcf4cb9239f689b39481f67"},
+ {file = "odd_collector_sdk-0.3.40.tar.gz", hash = "sha256:453461c5fa92e888324ffce0c48eadd9d8c4cfea4528d18412c5e6d74f4ede2c"},
]
[package.dependencies]
aiohttp = ">=3.8.1,<4.0.0"
APScheduler = ">=3.8.1,<4.0.0"
+flatdict = ">=4.0.1,<5.0.0"
funcy = ">=2.0,<3.0"
importlib-metadata = ">=5.1.0,<6.0.0"
+lark-parser = ">=0.12.0,<0.13.0"
loguru = ">=0.6.0,<0.7.0"
odd-models = ">=2.0.28,<3.0.0"
oddrn-generator = ">=0.1.73,<0.2.0"
+prettytable = ">=3.8.0,<4.0.0"
pyaml-env = ">=1.1.5,<2.0.0"
pydantic = ">=1.8.2,<2.0.0"
tqdm = ">=4.64.1,<5.0.0"
@@ -919,14 +922,14 @@ sqlparse = "0.4.2"
[[package]]
name = "oddrn-generator"
-version = "0.1.81"
+version = "0.1.87"
description = "Open Data Discovery Resource Name Generator"
category = "main"
optional = false
python-versions = ">=3.9,<4.0"
files = [
- {file = "oddrn_generator-0.1.81-py3-none-any.whl", hash = "sha256:188274459ba501db37087d88737018af1b13a69d9fa08cd258e665bdfc99caf3"},
- {file = "oddrn_generator-0.1.81.tar.gz", hash = "sha256:f9161901b4596e7262746f4d143791f07bf95b26bd0eacef5365be8c8fd82a6a"},
+ {file = "oddrn_generator-0.1.87-py3-none-any.whl", hash = "sha256:b904efba2ec8e2b81bf7c6351387c83294da547c797af7c3e06a8033923f8780"},
+ {file = "oddrn_generator-0.1.87.tar.gz", hash = "sha256:3d167c139a554428755bfc7a89cf9086b09e4a4e31ed8a39425bbecc96e0f641"},
]
[package.dependencies]
@@ -1007,6 +1010,24 @@ nodeenv = ">=0.11.1"
pyyaml = ">=5.1"
virtualenv = ">=20.10.0"
+[[package]]
+name = "prettytable"
+version = "3.8.0"
+description = "A simple Python library for easily displaying tabular data in a visually appealing ASCII table format"
+category = "main"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "prettytable-3.8.0-py3-none-any.whl", hash = "sha256:03481bca25ae0c28958c8cd6ac5165c159ce89f7ccde04d5c899b24b68bb13b7"},
+ {file = "prettytable-3.8.0.tar.gz", hash = "sha256:031eae6a9102017e8c7c7906460d150b7ed78b20fd1d8c8be4edaf88556c07ce"},
+]
+
+[package.dependencies]
+wcwidth = "*"
+
+[package.extras]
+tests = ["pytest", "pytest-cov", "pytest-lazy-fixture"]
+
[[package]]
name = "pyaml-env"
version = "1.2.1"
@@ -1452,6 +1473,18 @@ platformdirs = ">=3.2,<4"
docs = ["furo (>=2023.3.27)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=22.12)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.3.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=67.7.1)", "time-machine (>=2.9)"]
+[[package]]
+name = "wcwidth"
+version = "0.2.6"
+description = "Measures the displayed width of unicode strings in a terminal"
+category = "main"
+optional = false
+python-versions = "*"
+files = [
+ {file = "wcwidth-0.2.6-py2.py3-none-any.whl", hash = "sha256:795b138f6875577cd91bba52baf9e445cd5118fd32723b460e30a0af30ea230e"},
+ {file = "wcwidth-0.2.6.tar.gz", hash = "sha256:a5220780a404dbe3353789870978e472cfe477761f06ee55077256e509b156d0"},
+]
+
[[package]]
name = "win32-setctime"
version = "1.1.0"
@@ -1574,4 +1607,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
-content-hash = "1877f1f3f8b978205762928ee49ab5b9c88ac29a05bde03f0ef3fe567356b2dc"
+content-hash = "df2fee20a064e6051c501cbdcf46ee72b8911f518240c2d61c9bf0d5795bee9b"
diff --git a/pyproject.toml b/pyproject.toml
index e43756b..996d4bc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -9,14 +9,14 @@ repository = "https://github.com/opendatadiscovery/odd-collector-aws"
[tool.poetry.dependencies]
python = "^3.9"
-oddrn-generator = "^0.1.80"
+oddrn-generator = "^0.1.87"
python-dotenv = "0.19.0"
boto3 = "1.18.44"
pyhumps = "3.0.2"
pyarrow = "^10.0.1"
humps = "0.2.2"
flatdict = "4.0.1"
-odd-collector-sdk = "^0.3.31"
+odd-collector-sdk = "^0.3.40"
lark-parser = "^0.12.0"
deltalake = "^0.9.0"