From fca862a26f0daea0fa1c5eeb1a13164fc4026153 Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Tue, 25 Jun 2024 18:59:54 -0700 Subject: [PATCH] Oracle: fix formats, freshness, other minor fixes (#2106) --- pytest.ini | 1 + soda/core/tests/data_source/test_freshness.py | 4 +- soda/core/tests/helpers/common_test_tables.py | 2 +- .../core/tests/helpers/data_source_fixture.py | 2 +- soda/oracle/setup.py | 2 +- .../soda/data_sources/oracle_data_source.py | 95 ++++++++++++++++++- .../tests/oracle_data_source_fixture.py | 60 ++++++------ 7 files changed, 131 insertions(+), 35 deletions(-) diff --git a/pytest.ini b/pytest.ini index 2bcfe70ea..d66b066b1 100644 --- a/pytest.ini +++ b/pytest.ini @@ -23,3 +23,4 @@ pythonpath = soda/vertica/tests soda/teradata/tests soda/contracts/tests + soda/oracle/tests diff --git a/soda/core/tests/data_source/test_freshness.py b/soda/core/tests/data_source/test_freshness.py index daf91590c..460fa9b3d 100644 --- a/soda/core/tests/data_source/test_freshness.py +++ b/soda/core/tests/data_source/test_freshness.py @@ -240,7 +240,9 @@ def test_fail_freshness_var_missing(data_source_fixture: DataSourceFixture): scan.assert_log_error("variable not found") -@pytest.mark.skipif(test_data_source == "dask", reason="In dask/pandas the date is casted as datetime") +@pytest.mark.skipif( + test_data_source in ["dask", "oracle"], reason="In dask/pandas/oracle the date is casted as datetime" +) def test_freshness_with_date(data_source_fixture: DataSourceFixture): table_name = data_source_fixture.ensure_test_table(customers_test_table) diff --git a/soda/core/tests/helpers/common_test_tables.py b/soda/core/tests/helpers/common_test_tables.py index 0285663ca..15ba9e7f2 100644 --- a/soda/core/tests/helpers/common_test_tables.py +++ b/soda/core/tests/helpers/common_test_tables.py @@ -32,7 +32,7 @@ ('ID3', -1.2, "-1.2", 5, ".92 %", "MEDIUM", 'BE', '2362', 'milan.lukáč@example.com', date(2020, 6, 23), datetime(2020, 6, 23, 0, 2, 10), datetime(2020, 6, 23, 0, 2, 10, tzinfo=utc)), ('ID4', -.4, "-.4", 10, "0.26 %", "LOW", 'BE', '2363', 'john.doe+1@ĚxamplÉ.com', date(2020, 6, 23), datetime(2020, 6, 23, 0, 3, 10), datetime(2020, 6, 23, 0, 3, 10, tzinfo=utc)), ('ID5', -3, "-3", 999, "18,32%", None, 'BE', '2364', 'invalid@email', date(2020, 6, 23), datetime(2020, 6, 23, 0, 4, 10), datetime(2020, 6, 23, 0, 4, 10, tzinfo=utc)), - ('ID6', 5, "5", 999, "18,32%", None, 'BE', '2365', '', date(2020, 6, 23), datetime(2020, 6, 23, 0, 5, 10), datetime(2020, 6, 23, 0, 5, 10, tzinfo=utc)), + ('ID6', 5, "5", 999, "18,32%", None, 'BE', '2365', ' ', date(2020, 6, 23), datetime(2020, 6, 23, 0, 5, 10), datetime(2020, 6, 23, 0, 5, 10, tzinfo=utc)), ('ID7', 6, "6", 999, "error", None, 'NL', '2360', None, date(2020, 6, 24), datetime(2020, 6, 24, 0, 1, 10), datetime(2020, 6, 24, 0, 1, 10, tzinfo=utc)), ('ID8', None, None, 999, "No value", None, 'NL', '2361', None, date(2020, 6, 24), datetime(2020, 6, 24, 0, 2, 10), datetime(2020, 6, 24, 0, 2, 10, tzinfo=utc)), ('ID9', None, None, 999, "N/A", None, 'NL', '2362', None, date(2020, 6, 24), datetime(2020, 6, 24, 0, 3, 10), datetime(2020, 6, 24, 0, 3, 10, tzinfo=utc)), diff --git a/soda/core/tests/helpers/data_source_fixture.py b/soda/core/tests/helpers/data_source_fixture.py index aa780a748..96f62cc65 100644 --- a/soda/core/tests/helpers/data_source_fixture.py +++ b/soda/core/tests/helpers/data_source_fixture.py @@ -79,10 +79,10 @@ def _create_schema_name(self): return schema_name def _test_session_starts(self): + self.data_source = self._create_test_data_source() self.schema_data_source = self._create_schema_data_source() self._drop_schema_if_exists() self._create_schema_if_not_exists() - self.data_source = self._create_test_data_source() def _create_schema_data_source(self) -> DataSource: configuration_dict = self._build_configuration_dict() diff --git a/soda/oracle/setup.py b/soda/oracle/setup.py index 531705708..1856a179f 100644 --- a/soda/oracle/setup.py +++ b/soda/oracle/setup.py @@ -7,7 +7,7 @@ # TODO Add proper description description = "Soda Core Oracle Package" -requires = [f"soda-core=={package_version}", "oracledb==1.1.1"] +requires = [f"soda-core=={package_version}", "oracledb>=1.1.1,<3.0.0"] # TODO Fix the params setup( name=package_name, diff --git a/soda/oracle/soda/data_sources/oracle_data_source.py b/soda/oracle/soda/data_sources/oracle_data_source.py index 58ada22cc..bef9a3538 100644 --- a/soda/oracle/soda/data_sources/oracle_data_source.py +++ b/soda/oracle/soda/data_sources/oracle_data_source.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import re from datetime import date, datetime from textwrap import dedent @@ -80,6 +81,10 @@ def __init__(self, logs: Logs, data_source_name: str, data_source_properties: di def connect(self): self.connection = oracledb.connect(user=self.username, password=self.password, dsn=self.connectstring) + def output_type_handler(cursor, name, default_type, size, precision, scale): + if default_type == oracledb.DATETIME: + return cursor.var(oracledb.DATETIME, arraysize=cursor.arraysize, outconverter=datetime.fromisoformat) + def sql_test_connection(self) -> str: return "SELECT 1 FROM DUAL" @@ -121,8 +126,13 @@ def literal_date(self, date: date): return f"DATE'{date_string}'".strip() def literal_datetime(self, datetime: datetime): - datetime_str = datetime.strftime("%Y-%m-%d %H:%M:%S %Z") - return f"TIMESTAMP '{datetime_str}'".strip() + if datetime.tzinfo: + datetime_str = datetime.strftime("%Y-%m-%d %H:%M:%S %z") + datetime_str_formatted = datetime_str[:-2] + ":" + datetime_str[-2:] + else: + datetime_str_formatted = datetime.strftime("%Y-%m-%d %H:%M:%S") + + return f"TIMESTAMP '{datetime_str_formatted}'".strip() def sql_find_table_names( self, @@ -307,3 +317,84 @@ def sql_get_duplicates( sql += f"\nFETCH FIRST {limit} ROWS ONLY" return sql + + def expr_regexp_like(self, expr: str, regex_pattern: str): + return f"NVL({super().expr_regexp_like(expr, regex_pattern)}, 0)" + + def sql_get_duplicates_aggregated( + self, + column_names: str, + table_name: str, + filter: str, + limit: str | None = None, + invert_condition: bool = False, + exclude_patterns: list[str] | None = None, + ) -> str | None: + qualified_table_name = self.qualified_table_name(table_name) + main_query_columns = f"{column_names}, frequency" if exclude_patterns else "*" + + sql = dedent( + f""" + WITH frequencies AS ( + SELECT {column_names}, {self.expr_count_all()} AS frequency + FROM {qualified_table_name} + WHERE {filter} + GROUP BY {column_names}) + SELECT {main_query_columns} + FROM frequencies + WHERE frequency {'<=' if invert_condition else '>'} 1 + ORDER BY frequency DESC""" + ) + + if limit: + sql += f"\nFETCH FIRST {limit} ROWS ONLY" + + return sql + + def sql_groupby_count_categorical_column( + self, + select_query: str, + column_name: str, + limit: int | None = None, + ) -> str: + cte = select_query.replace("\n", " ") + # delete multiple spaces + cte = re.sub(" +", " ", cte) + sql = dedent( + f""" + WITH processed_table AS ( + {cte} + ) + SELECT + {column_name} + , {self.expr_count_all()} AS frequency + FROM processed_table + GROUP BY {column_name} + """ + ) + sql += f"FETCH FIRST {limit} ROWS ONLY" if limit else "" + return dedent(sql) + + def sql_reference_query( + self, + columns: str, + table_name: str, + target_table_name: str, + join_condition: str, + where_condition: str, + limit: int | None = None, + ) -> str: + sql = dedent( + f""" + SELECT {columns} + FROM {table_name} SOURCE + LEFT JOIN {target_table_name} TARGET on {join_condition} + WHERE {where_condition}""" + ) + if limit: + sql += f"\nFETCH FIRST {limit} ROWS ONLY" + + return sql + + def regex_replace_flags(self) -> str: + return "" diff --git a/soda/oracle/tests/oracle_data_source_fixture.py b/soda/oracle/tests/oracle_data_source_fixture.py index 47175559f..bceef3a91 100644 --- a/soda/oracle/tests/oracle_data_source_fixture.py +++ b/soda/oracle/tests/oracle_data_source_fixture.py @@ -17,18 +17,13 @@ def _build_configuration_dict(self, schema_name: str | None = None) -> dict: "username": os.getenv("ORACLE_USERNAME", "sodacore"), "password": os.getenv("ORACLE_PASSWORD", "password123"), "connectstring": os.getenv("ORACLE_CONNECTSTRING", "localhost:1521/xepdb1"), + "schema": schema_name.upper() if schema_name else self.schema_name.upper(), } } def __init__(self, test_data_source: str): super().__init__(test_data_source) - def _create_schema_if_not_exists_sql(self): - return "select * from dual" - - def _drop_schema_if_exists_sql(self): - return "select * from dual" - def _insert_test_table_sql(self, test_table: TestTable) -> str: if test_table.values: quoted_table_name = ( @@ -42,6 +37,8 @@ def sql_test_table_row(row): return ",".join(self.data_source.literal(value) for value in row) test_table_columns = ", ".join([c.name for c in test_table.test_columns]) + if test_table.quote_names: + test_table_columns = ", ".join([self.data_source.quote_column(c.name) for c in test_table.test_columns]) rows_sql = "\n".join( [ f" INTO {qualified_table_name} ({test_table_columns}) VALUES ( {sql_test_table_row(row)})" @@ -50,26 +47,31 @@ def sql_test_table_row(row): ) return f"INSERT ALL {rows_sql} \n SELECT 1 FROM DUAL" - # def _create_schema_if_not_exists_sql(self): - # return f""" - # declare - # userexist integer; - # begin - # select count(*) into userexist from dba_users where username='{self.schema_name}'; - # if (userexist = 0) then - # execute immediate 'create user {self.schema_name}'; - # end if; - # end; - # """ - # - # def _drop_schema_if_exists_sql(self): - # return f""" - # declare - # userexist integer; - # begin - # select count(*) into userexist from dba_users where username = '{self.schema_name}'; - # if (userexist = 1) then - # execute immediate 'drop user {self.schema_name} cascade'; - # end if; - # end; - # """ + def _create_schema_if_not_exists_sql(self): + casify = self.data_source.default_casify_system_name + return f""" + declare + userexist integer; + begin + select count(*) into userexist from dba_users where username='{casify(self.schema_name)}'; + if (userexist = 0) then + execute immediate 'create user {casify(self.schema_name)}'; + execute immediate 'ALTER USER {casify(self.schema_name)} QUOTA UNLIMITED ON SYSTEM'; + execute immediate 'ALTER SESSION SET TIME_ZONE = ''+00:00'''; + + end if; + end; + """ + + def _drop_schema_if_exists_sql(self): + casify = self.data_source.default_casify_system_name + return f""" + declare + userexist integer; + begin + select count(*) into userexist from dba_users where username = '{casify(self.schema_name)}'; + if (userexist = 1) then + execute immediate 'drop user {casify(self.schema_name)} cascade'; + end if; + end; + """