diff --git a/pyproject.toml b/pyproject.toml index 11419c3..7f5fa1f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "shaped-target-clickhouse" -version = "0.2.2" +version = "0.2.3" description = "`target-clickhouse` is a Singer target for clickhouse, built with the Meltano Singer SDK." readme = "README.md" authors = ["Ben Theunissen"] diff --git a/target_clickhouse/sinks.py b/target_clickhouse/sinks.py index f6aed66..fc738ac 100644 --- a/target_clickhouse/sinks.py +++ b/target_clickhouse/sinks.py @@ -9,8 +9,15 @@ import simplejson as json import sqlalchemy from pendulum import now +from singer_sdk.helpers._compat import ( + date_fromisoformat, + datetime_fromisoformat, + time_fromisoformat, +) from singer_sdk.helpers._typing import ( DatetimeErrorTreatmentEnum, + get_datelike_property_type, + handle_invalid_timestamp_in_record, ) from singer_sdk.sinks import SQLSink from sqlalchemy.sql.expression import bindparam @@ -174,10 +181,64 @@ def _validate_and_parse(self, record: dict) -> dict: except jsonschema_exceptions.ValidationError as e: if self.logger: self.logger.exception(f"Record failed validation: {record}") - raise e # noqa: RERAISES + raise e # : RERAISES return record + def _parse_timestamps_in_record( + self, + record: dict, + schema: dict, + treatment: DatetimeErrorTreatmentEnum, + ) -> None: + """Parse strings to datetime.datetime values, repairing or erroring on failure. + + Attempts to parse every field that is of type date/datetime/time. If its value + is out of range, repair logic will be driven by the `treatment` input arg: + MAX, NULL, or ERROR. + + Args: + record: Individual record in the stream. + schema: TODO + treatment: TODO + """ + for key, value in record.items(): + if key not in schema["properties"]: + self.logger.warning("No schema for record field '%s'", key) + continue + datelike_type = get_datelike_property_type(schema["properties"][key]) + if datelike_type: + date_val = value + try: + if value is not None: + if datelike_type == "time": + date_val = time_fromisoformat(date_val) + elif datelike_type == "date": + # Trim time value from date fields. + if "T" in date_val: + # Split on T and get the first part. + date_val = date_val.split("T")[0] + self.logger.warning( + "Trimmed time value from date field '%s': %s", + key, + date_val, + ) + date_val = date_fromisoformat(date_val) + else: + date_val = datetime_fromisoformat(date_val) + except ValueError as ex: + date_val = handle_invalid_timestamp_in_record( + record, + [key], + date_val, + datelike_type, + ex, + treatment, + self.logger, + ) + record[key] = date_val + + def pre_validate_for_string_type( record: dict, schema: dict, diff --git a/tests/conftest.py b/tests/conftest.py index b7f8bc8..62a0a69 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,24 @@ """Test Configuration.""" +from pathlib import Path + +from singer_sdk.testing.templates import TargetFileTestTemplate pytest_plugins = () + +class TargetClickhouseFileTestTemplate(TargetFileTestTemplate): + """Base Target File Test Template. + + Use this when sourcing Target test input from a .singer file. + """ + + @property + def singer_filepath(self): + """Get path to singer JSONL formatted messages file. + + Files will be sourced from `./target_test_streams/.singer`. + + Returns + The expected Path to this tests singer file. + """ + current_file_path = Path(__file__).resolve() + return current_file_path.parent / "target_test_streams" / f"{self.name}.singer" diff --git a/tests/target_test_cases.py b/tests/target_test_cases.py new file mode 100644 index 0000000..167f367 --- /dev/null +++ b/tests/target_test_cases.py @@ -0,0 +1,39 @@ + +import datetime +import logging + +from singer_sdk.testing.suites import TestSuite +from sqlalchemy import text + +from tests.conftest import TargetClickhouseFileTestTemplate + +logger = logging.getLogger(__name__) + +class TestDateTypeTargetClickhouse(TargetClickhouseFileTestTemplate): + """Test date type can be ingested into Clickhouse.""" + + name = "date_type" + + def validate(self) -> None: + """Validate the data in the target.""" + connector = self.target.default_sink_class.connector_class(self.target.config) + result = connector.connection.execute( + statement=text("SELECT * FROM date_type"), + ).fetchall() + record_id_1 = 1 + record_1 = next(iter([ + record for record in result if record[0] == record_id_1 + ])) + assert record_1[1] == datetime.date(2024, 3, 15) + record_id_2 = 2 + record_2 = next(iter([ + record for record in result if record[0] == record_id_2 + ])) + assert record_2[1] == datetime.date(2024, 3, 16) + +custom_target_test_suite = TestSuite( + kind="target", + tests=[ + TestDateTypeTargetClickhouse, + ], +) diff --git a/tests/target_test_streams/date_type.singer b/tests/target_test_streams/date_type.singer new file mode 100644 index 0000000..159c738 --- /dev/null +++ b/tests/target_test_streams/date_type.singer @@ -0,0 +1,4 @@ +{"type": "SCHEMA", "stream": "date_type", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "date": {"format": "date", "type": [ "null", "string" ] }}}} +{"type": "RECORD", "stream": "date_type", "record": {"id": 1, "date": "2024-03-15"}} +{"type": "RECORD", "stream": "date_type", "record": {"id": 2, "date": "2024-03-16T00:00:00+00:00"}} +{"type": "RECORD", "stream": "date_type", "record": {"id": 3, "date": null}} diff --git a/tests/test_core.py b/tests/test_core.py index c892993..74e9722 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -7,6 +7,7 @@ from singer_sdk.testing import get_target_test_class from target_clickhouse.target import TargetClickhouse +from tests.target_test_cases import custom_target_test_suite TEST_CONFIG: dict[str, t.Any] = { "sqlalchemy_url": "clickhouse+http://default:@localhost:18123", @@ -39,16 +40,20 @@ StandardTargetTests = get_target_test_class( target_class=TargetClickhouse, config=TEST_CONFIG, + custom_suites=[custom_target_test_suite], ) -class TestStandardTargetClickhouse(StandardTargetTests): # type: ignore[misc, valid-type] +class TestStandardTargetClickhouse( + StandardTargetTests, # type: ignore[misc, valid-type] +): """Standard Target Tests.""" SpreadTargetTests = get_target_test_class( target_class=TargetClickhouse, config=TEST_CONFIG_SPREAD, + custom_suites=[custom_target_test_suite], )