Skip to content

Commit

Permalink
Add unit test for date parsing and add manual trimming for invalid da…
Browse files Browse the repository at this point in the history
…te types (#141)

* Fix date parsing

* Fix date parsing

* Update version to 0.2.3

* Fix lint
  • Loading branch information
BTheunissen authored Mar 15, 2024
1 parent 278d94c commit 887eb86
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "shaped-target-clickhouse"
version = "0.2.2"
version = "0.2.3"
description = "`target-clickhouse` is a Singer target for clickhouse, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Ben Theunissen"]
Expand Down
63 changes: 62 additions & 1 deletion target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
import simplejson as json
import sqlalchemy
from pendulum import now
from singer_sdk.helpers._compat import (
date_fromisoformat,
datetime_fromisoformat,
time_fromisoformat,
)
from singer_sdk.helpers._typing import (
DatetimeErrorTreatmentEnum,
get_datelike_property_type,
handle_invalid_timestamp_in_record,
)
from singer_sdk.sinks import SQLSink
from sqlalchemy.sql.expression import bindparam
Expand Down Expand Up @@ -174,10 +181,64 @@ def _validate_and_parse(self, record: dict) -> dict:
except jsonschema_exceptions.ValidationError as e:
if self.logger:
self.logger.exception(f"Record failed validation: {record}")
raise e # noqa: RERAISES
raise e # : RERAISES

return record

def _parse_timestamps_in_record(
self,
record: dict,
schema: dict,
treatment: DatetimeErrorTreatmentEnum,
) -> None:
"""Parse strings to datetime.datetime values, repairing or erroring on failure.
Attempts to parse every field that is of type date/datetime/time. If its value
is out of range, repair logic will be driven by the `treatment` input arg:
MAX, NULL, or ERROR.
Args:
record: Individual record in the stream.
schema: TODO
treatment: TODO
"""
for key, value in record.items():
if key not in schema["properties"]:
self.logger.warning("No schema for record field '%s'", key)
continue
datelike_type = get_datelike_property_type(schema["properties"][key])
if datelike_type:
date_val = value
try:
if value is not None:
if datelike_type == "time":
date_val = time_fromisoformat(date_val)
elif datelike_type == "date":
# Trim time value from date fields.
if "T" in date_val:
# Split on T and get the first part.
date_val = date_val.split("T")[0]
self.logger.warning(
"Trimmed time value from date field '%s': %s",
key,
date_val,
)
date_val = date_fromisoformat(date_val)
else:
date_val = datetime_fromisoformat(date_val)
except ValueError as ex:
date_val = handle_invalid_timestamp_in_record(
record,
[key],
date_val,
datelike_type,
ex,
treatment,
self.logger,
)
record[key] = date_val


def pre_validate_for_string_type(
record: dict,
schema: dict,
Expand Down
21 changes: 21 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
"""Test Configuration."""
from pathlib import Path

from singer_sdk.testing.templates import TargetFileTestTemplate

pytest_plugins = ()

class TargetClickhouseFileTestTemplate(TargetFileTestTemplate):
"""Base Target File Test Template.
Use this when sourcing Target test input from a .singer file.
"""

@property
def singer_filepath(self):
"""Get path to singer JSONL formatted messages file.
Files will be sourced from `./target_test_streams/<test name>.singer`.
Returns
The expected Path to this tests singer file.
"""
current_file_path = Path(__file__).resolve()
return current_file_path.parent / "target_test_streams" / f"{self.name}.singer"
39 changes: 39 additions & 0 deletions tests/target_test_cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

import datetime
import logging

from singer_sdk.testing.suites import TestSuite
from sqlalchemy import text

from tests.conftest import TargetClickhouseFileTestTemplate

logger = logging.getLogger(__name__)

class TestDateTypeTargetClickhouse(TargetClickhouseFileTestTemplate):
"""Test date type can be ingested into Clickhouse."""

name = "date_type"

def validate(self) -> None:
"""Validate the data in the target."""
connector = self.target.default_sink_class.connector_class(self.target.config)
result = connector.connection.execute(
statement=text("SELECT * FROM date_type"),
).fetchall()
record_id_1 = 1
record_1 = next(iter([
record for record in result if record[0] == record_id_1
]))
assert record_1[1] == datetime.date(2024, 3, 15)
record_id_2 = 2
record_2 = next(iter([
record for record in result if record[0] == record_id_2
]))
assert record_2[1] == datetime.date(2024, 3, 16)

custom_target_test_suite = TestSuite(
kind="target",
tests=[
TestDateTypeTargetClickhouse,
],
)
4 changes: 4 additions & 0 deletions tests/target_test_streams/date_type.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"type": "SCHEMA", "stream": "date_type", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "date": {"format": "date", "type": [ "null", "string" ] }}}}
{"type": "RECORD", "stream": "date_type", "record": {"id": 1, "date": "2024-03-15"}}
{"type": "RECORD", "stream": "date_type", "record": {"id": 2, "date": "2024-03-16T00:00:00+00:00"}}
{"type": "RECORD", "stream": "date_type", "record": {"id": 3, "date": null}}
7 changes: 6 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from singer_sdk.testing import get_target_test_class

from target_clickhouse.target import TargetClickhouse
from tests.target_test_cases import custom_target_test_suite

TEST_CONFIG: dict[str, t.Any] = {
"sqlalchemy_url": "clickhouse+http://default:@localhost:18123",
Expand Down Expand Up @@ -39,16 +40,20 @@
StandardTargetTests = get_target_test_class(
target_class=TargetClickhouse,
config=TEST_CONFIG,
custom_suites=[custom_target_test_suite],
)


class TestStandardTargetClickhouse(StandardTargetTests): # type: ignore[misc, valid-type]
class TestStandardTargetClickhouse(
StandardTargetTests, # type: ignore[misc, valid-type]
):
"""Standard Target Tests."""


SpreadTargetTests = get_target_test_class(
target_class=TargetClickhouse,
config=TEST_CONFIG_SPREAD,
custom_suites=[custom_target_test_suite],
)


Expand Down

0 comments on commit 887eb86

Please sign in to comment.