Skip to content

Commit

Permalink
release 1.0.10 add support for DEDUP feature
Browse files Browse the repository at this point in the history
  • Loading branch information
marregui committed Sep 30, 2023
1 parent 459ecb6 commit 04579c6
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# https://pip.pypa.io/en/stable/reference/build-system/pyproject-toml/
[project]
name = 'questdb-connect'
version = '1.0.9' # Standalone production version (with engine)
version = '1.0.10' # Standalone production version (with engine)
#version = '0.0.99' # testing version
authors = [{ name = 'questdb.io', email = 'miguel@questdb.io' }]
description = "SqlAlchemy library"
Expand Down
3 changes: 2 additions & 1 deletion src/examples/psycopg2_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def print_partition(row):


def print_table(row):
table_id, table_name, ts_column, p_by, _, _, is_wal, dir_name = row
table_id, table_name, ts_column, p_by, _, _, is_wal, dir_name, is_dedup = row
msg = ", ".join(
(
f"Table id:{table_id}",
Expand All @@ -52,6 +52,7 @@ def print_table(row):
f"partition-by:{p_by}",
f"is-wal:{is_wal}",
f"dir-name:{dir_name}",
f"is-dedup:{is_dedup}",
)
)
print(msg)
Expand Down
11 changes: 10 additions & 1 deletion src/questdb_connect/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,29 @@ def reflect_table(
col_ts_name = None
partition_by = PartitionBy.NONE
is_wal = True
dedup_upsert_keys = []
for row in self.bind.execute(f"table_columns('{table_name}')"):
col_name = row[0]
if include_columns and col_name not in include_columns:
continue
if exclude_columns and col_name in exclude_columns:
continue
if row[6]: # upsertKey
dedup_upsert_keys.append(col_name)
col_type = resolve_type_from_name(row[1])
if col_ts_name and col_ts_name.upper() == col_name.upper():
table.append_column(
sqlalchemy.Column(col_name, col_type, primary_key=True)
)
else:
table.append_column(sqlalchemy.Column(col_name, col_type))
table.engine = QDBTableEngine(table_name, col_ts_name, partition_by, is_wal)
table.engine = QDBTableEngine(
table_name,
col_ts_name,
partition_by,
is_wal,
tuple(dedup_upsert_keys) if dedup_upsert_keys else None,
)
table.metadata = sqlalchemy.MetaData()

def get_columns(self, table_name, schema=None, **kw):
Expand Down
16 changes: 15 additions & 1 deletion src/questdb_connect/table_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import typing

import sqlalchemy

from .common import PartitionBy
from .common import PartitionBy, quote_identifier


class QDBTableEngine(
Expand All @@ -34,12 +36,14 @@ def __init__(
ts_col_name: str,
partition_by: PartitionBy = PartitionBy.DAY,
is_wal: bool = True,
dedup_upsert_keys: typing.Tuple[str] = None,
):
sqlalchemy.sql.visitors.Traversible.__init__(self)
self.name = table_name
self.ts_col_name = ts_col_name
self.partition_by = partition_by
self.is_wal = is_wal
self.dedup_upsert_keys = dedup_upsert_keys
self.compiled = None

def get_table_suffix(self):
Expand All @@ -63,7 +67,17 @@ def get_table_suffix(self):
)
if self.is_wal:
self.compiled += " WAL"
if self.dedup_upsert_keys:
self.compiled += " DEDUP UPSERT KEYS("
self.compiled += ",".join(
map(quote_identifier, self.dedup_upsert_keys)
)
self.compiled += ")"
else:
if self.dedup_upsert_keys:
raise sqlalchemy.exc.ArgumentError(
None, "DEDUP only applies to WAL tables"
)
self.compiled += " BYPASS WAL"
return self.compiled

Expand Down
4 changes: 2 additions & 2 deletions src/questdb_connect/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def geohash_type_name(bits):
f"geohash precision must be int [0, {_GEOHASH_LONG_BITS}]"
)
if 0 < bits <= _GEOHASH_BYTE_MAX:
return f"GEOHASH(8b)"
return "GEOHASH(8b)"
elif _GEOHASH_BYTE_MAX < bits <= _GEOHASH_SHORT_MAX:
return "GEOHASH(3c)"
elif _GEOHASH_SHORT_MAX < bits <= _GEOHASH_INT_MAX:
return "GEOHASH(6c)"
return f"GEOHASH(12c)"
return "GEOHASH(12c)"


def geohash_class(bits):
Expand Down
10 changes: 9 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,15 @@ def test_metrics_fixture(test_engine):

class TableMetrics(Base):
__tablename__ = METRICS_TABLE_NAME
__table_args__ = (qdbc.QDBTableEngine(METRICS_TABLE_NAME, 'ts', qdbc.PartitionBy.HOUR, is_wal=True),)
__table_args__ = (
qdbc.QDBTableEngine(
METRICS_TABLE_NAME,
'ts',
qdbc.PartitionBy.HOUR,
is_wal=True,
dedup_upsert_keys=('source', 'attr_name', 'ts')
),
)
source = Column(qdbc.Symbol)
attr_name = Column(qdbc.Symbol)
attr_value = Column(qdbc.Double)
Expand Down
21 changes: 19 additions & 2 deletions tests/test_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
import sqlalchemy as sqla
from sqlalchemy.orm import Session

from tests.conftest import ALL_TYPES_TABLE_NAME, collect_select_all, collect_select_all_raw_connection
from tests.conftest import (
ALL_TYPES_TABLE_NAME,
METRICS_TABLE_NAME,
collect_select_all,
collect_select_all_raw_connection,
)


def test_insert(test_engine, test_model):
Expand Down Expand Up @@ -82,7 +87,7 @@ def test_insert(test_engine, test_model):
assert collect_select_all_raw_connection(test_engine, expected_rows=2) == expected


def test_inspect(test_engine, test_model):
def test_inspect_1(test_engine, test_model):
now = datetime.datetime(2023, 4, 12, 23, 55, 59, 342380)
now_date = now.date()
session = Session(test_engine)
Expand Down Expand Up @@ -130,6 +135,18 @@ def test_inspect(test_engine, test_model):
])


def test_inspect_2(test_engine, test_metrics):
metadata = sqla.MetaData()
table = sqla.Table(METRICS_TABLE_NAME, metadata, autoload_with=test_engine)
table_columns = str([(col.name, col.type, col.primary_key) for col in table.columns])
assert table_columns == str([
('source', qdbc.Symbol(), False),
('attr_name', qdbc.Symbol(), False),
('attr_value', qdbc.Double(), False),
('ts', qdbc.Timestamp(), True),
])


def test_multiple_insert(test_engine, test_model):
now = datetime.datetime(2023, 4, 12, 23, 55, 59, 342380)
now_date = now.date()
Expand Down

0 comments on commit 04579c6

Please sign in to comment.