From 04579c667f58eff01fbbdd6307587336f215759b Mon Sep 17 00:00:00 2001 From: marregui Date: Sat, 30 Sep 2023 23:56:58 +0200 Subject: [PATCH] release 1.0.10 add support for DEDUP feature --- pyproject.toml | 2 +- src/examples/psycopg2_connect.py | 3 ++- src/questdb_connect/inspector.py | 11 ++++++++++- src/questdb_connect/table_engine.py | 16 +++++++++++++++- src/questdb_connect/types.py | 4 ++-- tests/conftest.py | 10 +++++++++- tests/test_dialect.py | 21 +++++++++++++++++++-- 7 files changed, 58 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5f03400..8d41e7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ # https://pip.pypa.io/en/stable/reference/build-system/pyproject-toml/ [project] name = 'questdb-connect' -version = '1.0.9' # Standalone production version (with engine) +version = '1.0.10' # Standalone production version (with engine) #version = '0.0.99' # testing version authors = [{ name = 'questdb.io', email = 'miguel@questdb.io' }] description = "SqlAlchemy library" diff --git a/src/examples/psycopg2_connect.py b/src/examples/psycopg2_connect.py index b00f830..97f3801 100644 --- a/src/examples/psycopg2_connect.py +++ b/src/examples/psycopg2_connect.py @@ -43,7 +43,7 @@ def print_partition(row): def print_table(row): - table_id, table_name, ts_column, p_by, _, _, is_wal, dir_name = row + table_id, table_name, ts_column, p_by, _, _, is_wal, dir_name, is_dedup = row msg = ", ".join( ( f"Table id:{table_id}", @@ -52,6 +52,7 @@ def print_table(row): f"partition-by:{p_by}", f"is-wal:{is_wal}", f"dir-name:{dir_name}", + f"is-dedup:{is_dedup}", ) ) print(msg) diff --git a/src/questdb_connect/inspector.py b/src/questdb_connect/inspector.py index 56d4b5d..a9ebea1 100644 --- a/src/questdb_connect/inspector.py +++ b/src/questdb_connect/inspector.py @@ -64,12 +64,15 @@ def reflect_table( col_ts_name = None partition_by = PartitionBy.NONE is_wal = True + dedup_upsert_keys = [] for row in self.bind.execute(f"table_columns('{table_name}')"): col_name = row[0] if include_columns and col_name not in include_columns: continue if exclude_columns and col_name in exclude_columns: continue + if row[6]: # upsertKey + dedup_upsert_keys.append(col_name) col_type = resolve_type_from_name(row[1]) if col_ts_name and col_ts_name.upper() == col_name.upper(): table.append_column( @@ -77,7 +80,13 @@ def reflect_table( ) else: table.append_column(sqlalchemy.Column(col_name, col_type)) - table.engine = QDBTableEngine(table_name, col_ts_name, partition_by, is_wal) + table.engine = QDBTableEngine( + table_name, + col_ts_name, + partition_by, + is_wal, + tuple(dedup_upsert_keys) if dedup_upsert_keys else None, + ) table.metadata = sqlalchemy.MetaData() def get_columns(self, table_name, schema=None, **kw): diff --git a/src/questdb_connect/table_engine.py b/src/questdb_connect/table_engine.py index 5ed7aee..841ecd4 100644 --- a/src/questdb_connect/table_engine.py +++ b/src/questdb_connect/table_engine.py @@ -20,9 +20,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import typing + import sqlalchemy -from .common import PartitionBy +from .common import PartitionBy, quote_identifier class QDBTableEngine( @@ -34,12 +36,14 @@ def __init__( ts_col_name: str, partition_by: PartitionBy = PartitionBy.DAY, is_wal: bool = True, + dedup_upsert_keys: typing.Tuple[str] = None, ): sqlalchemy.sql.visitors.Traversible.__init__(self) self.name = table_name self.ts_col_name = ts_col_name self.partition_by = partition_by self.is_wal = is_wal + self.dedup_upsert_keys = dedup_upsert_keys self.compiled = None def get_table_suffix(self): @@ -63,7 +67,17 @@ def get_table_suffix(self): ) if self.is_wal: self.compiled += " WAL" + if self.dedup_upsert_keys: + self.compiled += " DEDUP UPSERT KEYS(" + self.compiled += ",".join( + map(quote_identifier, self.dedup_upsert_keys) + ) + self.compiled += ")" else: + if self.dedup_upsert_keys: + raise sqlalchemy.exc.ArgumentError( + None, "DEDUP only applies to WAL tables" + ) self.compiled += " BYPASS WAL" return self.compiled diff --git a/src/questdb_connect/types.py b/src/questdb_connect/types.py index a8c7cca..0400c61 100644 --- a/src/questdb_connect/types.py +++ b/src/questdb_connect/types.py @@ -42,12 +42,12 @@ def geohash_type_name(bits): f"geohash precision must be int [0, {_GEOHASH_LONG_BITS}]" ) if 0 < bits <= _GEOHASH_BYTE_MAX: - return f"GEOHASH(8b)" + return "GEOHASH(8b)" elif _GEOHASH_BYTE_MAX < bits <= _GEOHASH_SHORT_MAX: return "GEOHASH(3c)" elif _GEOHASH_SHORT_MAX < bits <= _GEOHASH_INT_MAX: return "GEOHASH(6c)" - return f"GEOHASH(12c)" + return "GEOHASH(12c)" def geohash_class(bits): diff --git a/tests/conftest.py b/tests/conftest.py index 1de209e..779d36c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -105,7 +105,15 @@ def test_metrics_fixture(test_engine): class TableMetrics(Base): __tablename__ = METRICS_TABLE_NAME - __table_args__ = (qdbc.QDBTableEngine(METRICS_TABLE_NAME, 'ts', qdbc.PartitionBy.HOUR, is_wal=True),) + __table_args__ = ( + qdbc.QDBTableEngine( + METRICS_TABLE_NAME, + 'ts', + qdbc.PartitionBy.HOUR, + is_wal=True, + dedup_upsert_keys=('source', 'attr_name', 'ts') + ), + ) source = Column(qdbc.Symbol) attr_name = Column(qdbc.Symbol) attr_value = Column(qdbc.Double) diff --git a/tests/test_dialect.py b/tests/test_dialect.py index bae3f99..d630d99 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -26,7 +26,12 @@ import sqlalchemy as sqla from sqlalchemy.orm import Session -from tests.conftest import ALL_TYPES_TABLE_NAME, collect_select_all, collect_select_all_raw_connection +from tests.conftest import ( + ALL_TYPES_TABLE_NAME, + METRICS_TABLE_NAME, + collect_select_all, + collect_select_all_raw_connection, +) def test_insert(test_engine, test_model): @@ -82,7 +87,7 @@ def test_insert(test_engine, test_model): assert collect_select_all_raw_connection(test_engine, expected_rows=2) == expected -def test_inspect(test_engine, test_model): +def test_inspect_1(test_engine, test_model): now = datetime.datetime(2023, 4, 12, 23, 55, 59, 342380) now_date = now.date() session = Session(test_engine) @@ -130,6 +135,18 @@ def test_inspect(test_engine, test_model): ]) +def test_inspect_2(test_engine, test_metrics): + metadata = sqla.MetaData() + table = sqla.Table(METRICS_TABLE_NAME, metadata, autoload_with=test_engine) + table_columns = str([(col.name, col.type, col.primary_key) for col in table.columns]) + assert table_columns == str([ + ('source', qdbc.Symbol(), False), + ('attr_name', qdbc.Symbol(), False), + ('attr_value', qdbc.Double(), False), + ('ts', qdbc.Timestamp(), True), + ]) + + def test_multiple_insert(test_engine, test_model): now = datetime.datetime(2023, 4, 12, 23, 55, 59, 342380) now_date = now.date()