Skip to content

Commit

Permalink
Oracle: fix formats, freshness, other minor fixes (#2106)
Browse files Browse the repository at this point in the history
  • Loading branch information
m1n0 committed Jun 26, 2024
1 parent d70f765 commit fca862a
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 35 deletions.
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pythonpath =
soda/vertica/tests
soda/teradata/tests
soda/contracts/tests
soda/oracle/tests
4 changes: 3 additions & 1 deletion soda/core/tests/data_source/test_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ def test_fail_freshness_var_missing(data_source_fixture: DataSourceFixture):
scan.assert_log_error("variable not found")


@pytest.mark.skipif(test_data_source == "dask", reason="In dask/pandas the date is casted as datetime")
@pytest.mark.skipif(
test_data_source in ["dask", "oracle"], reason="In dask/pandas/oracle the date is casted as datetime"
)
def test_freshness_with_date(data_source_fixture: DataSourceFixture):
table_name = data_source_fixture.ensure_test_table(customers_test_table)

Expand Down
2 changes: 1 addition & 1 deletion soda/core/tests/helpers/common_test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
('ID3', -1.2, "-1.2", 5, ".92 %", "MEDIUM", 'BE', '2362', 'milan.lukáč@example.com', date(2020, 6, 23), datetime(2020, 6, 23, 0, 2, 10), datetime(2020, 6, 23, 0, 2, 10, tzinfo=utc)),
('ID4', -.4, "-.4", 10, "0.26 %", "LOW", 'BE', '2363', 'john.doe+1@ĚxamplÉ.com', date(2020, 6, 23), datetime(2020, 6, 23, 0, 3, 10), datetime(2020, 6, 23, 0, 3, 10, tzinfo=utc)),
('ID5', -3, "-3", 999, "18,32%", None, 'BE', '2364', 'invalid@email', date(2020, 6, 23), datetime(2020, 6, 23, 0, 4, 10), datetime(2020, 6, 23, 0, 4, 10, tzinfo=utc)),
('ID6', 5, "5", 999, "18,32%", None, 'BE', '2365', '', date(2020, 6, 23), datetime(2020, 6, 23, 0, 5, 10), datetime(2020, 6, 23, 0, 5, 10, tzinfo=utc)),
('ID6', 5, "5", 999, "18,32%", None, 'BE', '2365', ' ', date(2020, 6, 23), datetime(2020, 6, 23, 0, 5, 10), datetime(2020, 6, 23, 0, 5, 10, tzinfo=utc)),
('ID7', 6, "6", 999, "error", None, 'NL', '2360', None, date(2020, 6, 24), datetime(2020, 6, 24, 0, 1, 10), datetime(2020, 6, 24, 0, 1, 10, tzinfo=utc)),
('ID8', None, None, 999, "No value", None, 'NL', '2361', None, date(2020, 6, 24), datetime(2020, 6, 24, 0, 2, 10), datetime(2020, 6, 24, 0, 2, 10, tzinfo=utc)),
('ID9', None, None, 999, "N/A", None, 'NL', '2362', None, date(2020, 6, 24), datetime(2020, 6, 24, 0, 3, 10), datetime(2020, 6, 24, 0, 3, 10, tzinfo=utc)),
Expand Down
2 changes: 1 addition & 1 deletion soda/core/tests/helpers/data_source_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ def _create_schema_name(self):
return schema_name

def _test_session_starts(self):
self.data_source = self._create_test_data_source()
self.schema_data_source = self._create_schema_data_source()
self._drop_schema_if_exists()
self._create_schema_if_not_exists()
self.data_source = self._create_test_data_source()

def _create_schema_data_source(self) -> DataSource:
configuration_dict = self._build_configuration_dict()
Expand Down
2 changes: 1 addition & 1 deletion soda/oracle/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# TODO Add proper description
description = "Soda Core Oracle Package"

requires = [f"soda-core=={package_version}", "oracledb==1.1.1"]
requires = [f"soda-core=={package_version}", "oracledb>=1.1.1,<3.0.0"]
# TODO Fix the params
setup(
name=package_name,
Expand Down
95 changes: 93 additions & 2 deletions soda/oracle/soda/data_sources/oracle_data_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import re
from datetime import date, datetime
from textwrap import dedent

Expand Down Expand Up @@ -80,6 +81,10 @@ def __init__(self, logs: Logs, data_source_name: str, data_source_properties: di
def connect(self):
self.connection = oracledb.connect(user=self.username, password=self.password, dsn=self.connectstring)

def output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type == oracledb.DATETIME:
return cursor.var(oracledb.DATETIME, arraysize=cursor.arraysize, outconverter=datetime.fromisoformat)

def sql_test_connection(self) -> str:
return "SELECT 1 FROM DUAL"

Expand Down Expand Up @@ -121,8 +126,13 @@ def literal_date(self, date: date):
return f"DATE'{date_string}'".strip()

def literal_datetime(self, datetime: datetime):
datetime_str = datetime.strftime("%Y-%m-%d %H:%M:%S %Z")
return f"TIMESTAMP '{datetime_str}'".strip()
if datetime.tzinfo:
datetime_str = datetime.strftime("%Y-%m-%d %H:%M:%S %z")
datetime_str_formatted = datetime_str[:-2] + ":" + datetime_str[-2:]
else:
datetime_str_formatted = datetime.strftime("%Y-%m-%d %H:%M:%S")

return f"TIMESTAMP '{datetime_str_formatted}'".strip()

def sql_find_table_names(
self,
Expand Down Expand Up @@ -307,3 +317,84 @@ def sql_get_duplicates(
sql += f"\nFETCH FIRST {limit} ROWS ONLY"

return sql

def expr_regexp_like(self, expr: str, regex_pattern: str):
return f"NVL({super().expr_regexp_like(expr, regex_pattern)}, 0)"

def sql_get_duplicates_aggregated(
self,
column_names: str,
table_name: str,
filter: str,
limit: str | None = None,
invert_condition: bool = False,
exclude_patterns: list[str] | None = None,
) -> str | None:
qualified_table_name = self.qualified_table_name(table_name)
main_query_columns = f"{column_names}, frequency" if exclude_patterns else "*"

sql = dedent(
f"""
WITH frequencies AS (
SELECT {column_names}, {self.expr_count_all()} AS frequency
FROM {qualified_table_name}
WHERE {filter}
GROUP BY {column_names})
SELECT {main_query_columns}
FROM frequencies
WHERE frequency {'<=' if invert_condition else '>'} 1
ORDER BY frequency DESC"""
)

if limit:
sql += f"\nFETCH FIRST {limit} ROWS ONLY"

return sql

def sql_groupby_count_categorical_column(
self,
select_query: str,
column_name: str,
limit: int | None = None,
) -> str:
cte = select_query.replace("\n", " ")
# delete multiple spaces
cte = re.sub(" +", " ", cte)
sql = dedent(
f"""
WITH processed_table AS (
{cte}
)
SELECT
{column_name}
, {self.expr_count_all()} AS frequency
FROM processed_table
GROUP BY {column_name}
"""
)
sql += f"FETCH FIRST {limit} ROWS ONLY" if limit else ""
return dedent(sql)

def sql_reference_query(
self,
columns: str,
table_name: str,
target_table_name: str,
join_condition: str,
where_condition: str,
limit: int | None = None,
) -> str:
sql = dedent(
f"""
SELECT {columns}
FROM {table_name} SOURCE
LEFT JOIN {target_table_name} TARGET on {join_condition}
WHERE {where_condition}"""
)
if limit:
sql += f"\nFETCH FIRST {limit} ROWS ONLY"

return sql

def regex_replace_flags(self) -> str:
return ""
60 changes: 31 additions & 29 deletions soda/oracle/tests/oracle_data_source_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@ def _build_configuration_dict(self, schema_name: str | None = None) -> dict:
"username": os.getenv("ORACLE_USERNAME", "sodacore"),
"password": os.getenv("ORACLE_PASSWORD", "password123"),
"connectstring": os.getenv("ORACLE_CONNECTSTRING", "localhost:1521/xepdb1"),
"schema": schema_name.upper() if schema_name else self.schema_name.upper(),
}
}

def __init__(self, test_data_source: str):
super().__init__(test_data_source)

def _create_schema_if_not_exists_sql(self):
return "select * from dual"

def _drop_schema_if_exists_sql(self):
return "select * from dual"

def _insert_test_table_sql(self, test_table: TestTable) -> str:
if test_table.values:
quoted_table_name = (
Expand All @@ -42,6 +37,8 @@ def sql_test_table_row(row):
return ",".join(self.data_source.literal(value) for value in row)

test_table_columns = ", ".join([c.name for c in test_table.test_columns])
if test_table.quote_names:
test_table_columns = ", ".join([self.data_source.quote_column(c.name) for c in test_table.test_columns])
rows_sql = "\n".join(
[
f" INTO {qualified_table_name} ({test_table_columns}) VALUES ( {sql_test_table_row(row)})"
Expand All @@ -50,26 +47,31 @@ def sql_test_table_row(row):
)
return f"INSERT ALL {rows_sql} \n SELECT 1 FROM DUAL"

# def _create_schema_if_not_exists_sql(self):
# return f"""
# declare
# userexist integer;
# begin
# select count(*) into userexist from dba_users where username='{self.schema_name}';
# if (userexist = 0) then
# execute immediate 'create user {self.schema_name}';
# end if;
# end;
# """
#
# def _drop_schema_if_exists_sql(self):
# return f"""
# declare
# userexist integer;
# begin
# select count(*) into userexist from dba_users where username = '{self.schema_name}';
# if (userexist = 1) then
# execute immediate 'drop user {self.schema_name} cascade';
# end if;
# end;
# """
def _create_schema_if_not_exists_sql(self):
casify = self.data_source.default_casify_system_name
return f"""
declare
userexist integer;
begin
select count(*) into userexist from dba_users where username='{casify(self.schema_name)}';
if (userexist = 0) then
execute immediate 'create user {casify(self.schema_name)}';
execute immediate 'ALTER USER {casify(self.schema_name)} QUOTA UNLIMITED ON SYSTEM';
execute immediate 'ALTER SESSION SET TIME_ZONE = ''+00:00''';
end if;
end;
"""

def _drop_schema_if_exists_sql(self):
casify = self.data_source.default_casify_system_name
return f"""
declare
userexist integer;
begin
select count(*) into userexist from dba_users where username = '{casify(self.schema_name)}';
if (userexist = 1) then
execute immediate 'drop user {casify(self.schema_name)} cascade';
end if;
end;
"""

0 comments on commit fca862a

Please sign in to comment.