Skip to content

Commit

Permalink
Implement TIME type, stored to Databend DateTime type (#42)
Browse files Browse the repository at this point in the history
* Implement TIME type, stored to Databend DateTime type

* Fix - views are no longer included in system.tables

* Change visit name

* Add interval type (stored as datetime)
Fix literal processors

* Fixes for interval

* Adds reserved words

* Handle views now back in information_schema.tables
  • Loading branch information
rad-pat authored Aug 26, 2024
1 parent a78be72 commit b532f4b
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 18 deletions.
4 changes: 2 additions & 2 deletions databend_sqlalchemy/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import decimal
import re
import uuid
from datetime import datetime, date
from datetime import datetime, date, time, timedelta
from databend_sqlalchemy.errors import Error, ServerException, NotSupportedError

from databend_driver import BlockingDatabendClient
Expand Down Expand Up @@ -48,7 +48,7 @@ def escape_item(self, item):
return self.escape_number(item)
elif isinstance(item, decimal.Decimal):
return self.escape_number(item)
elif isinstance(item, (datetime, date)):
elif isinstance(item, (datetime, date, time, timedelta)):
return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S"))
else:
return self.escape_string(item)
Expand Down
196 changes: 181 additions & 15 deletions databend_sqlalchemy/databend_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,74 @@
from sqlalchemy.exc import DBAPIError, NoSuchTableError
from .dml import Merge

RESERVED_WORDS = {
'Error', 'EOI', 'Whitespace', 'Comment', 'CommentBlock', 'Ident', 'ColumnPosition', 'LiteralString',
'LiteralCodeString', 'LiteralAtString', 'PGLiteralHex', 'MySQLLiteralHex', 'LiteralInteger', 'LiteralFloat',
'HintPrefix', 'HintSuffix', 'DoubleEq', 'Eq', 'NotEq', 'Lt', 'Gt', 'Lte', 'Gte', 'Spaceship', 'Plus',
'Minus', 'Multiply', 'Divide', 'IntDiv', 'Modulo', 'StringConcat', 'LParen', 'RParen', 'Comma', 'Dot',
'Colon', 'DoubleColon', 'ColonEqual', 'SemiColon', 'Backslash', 'LBracket', 'RBracket', 'Caret', 'LBrace',
'RBrace', 'RArrow', 'LongRArrow', 'FatRArrow', 'HashRArrow', 'HashLongRArrow', 'TildeAsterisk',
'ExclamationMarkTilde', 'ExclamationMarkTildeAsterisk', 'BitWiseAnd', 'BitWiseOr', 'BitWiseXor',
'BitWiseNot', 'ShiftLeft', 'ShiftRight', 'Factorial', 'DoubleExclamationMark', 'Abs', 'SquareRoot',
'CubeRoot', 'Placeholder', 'QuestionOr', 'QuestionAnd', 'ArrowAt', 'AtArrow', 'AtQuestion', 'AtAt',
'HashMinus', 'ACCOUNT', 'ALL', 'ALLOWED_IP_LIST', 'ADD', 'AFTER', 'AGGREGATING', 'ANY', 'APPEND_ONLY',
'ARGS', 'AUTO', 'SOME', 'ALTER', 'ALWAYS', 'ANALYZE', 'AND', 'ARRAY', 'AS', 'AST', 'AT', 'ASC',
'ANTI', 'ASYNC', 'ATTACH', 'BEFORE', 'BETWEEN', 'BIGINT', 'BINARY', 'BREAK', 'LONGBLOB', 'MEDIUMBLOB',
'TINYBLOB', 'BLOB', 'BINARY_FORMAT', 'BITMAP', 'BLOCKED_IP_LIST', 'BOOL', 'BOOLEAN', 'BOTH', 'BY',
'BROTLI', 'BZ2', 'CALL', 'CASE', 'CAST', 'CATALOG', 'CATALOGS', 'CENTURY', 'CHANGES', 'CLUSTER',
'COMMENT', 'COMMENTS', 'COMPACT', 'CONNECTION', 'CONNECTIONS', 'CONSUME', 'CONTENT_TYPE', 'CONTINUE',
'CHAR', 'COLUMN', 'COLUMNS', 'CHARACTER', 'CONFLICT', 'COMPRESSION', 'COPY_OPTIONS', 'COPY', 'COUNT',
'CREDENTIAL', 'CREATE', 'CROSS', 'CSV', 'CURRENT', 'CURRENT_TIMESTAMP', 'DATABASE', 'DATABASES', 'DATA',
'DATE', 'DATE_ADD', 'DATE_PART', 'DATE_SUB', 'DATE_TRUNC', 'DATETIME', 'DAY', 'DECADE', 'DECIMAL',
'DECLARE', 'DEFAULT', 'DEFLATE', 'DELETE', 'DESC', 'DETAILED_OUTPUT', 'DESCRIBE', 'DISABLE',
'DISABLE_VARIANT_CHECK', 'DISTINCT', 'RESPECT', 'IGNORE', 'DIV', 'DOUBLE_SHA1_PASSWORD', 'DO', 'DOUBLE',
'DOW', 'WEEK', 'DELTA', 'DOY', 'DOWNLOAD', 'DOWNSTREAM', 'DROP', 'DRY', 'DYNAMIC', 'EXCEPT', 'EXCLUDE',
'ELSE', 'EMPTY_FIELD_AS', 'ENABLE', 'ENABLE_VIRTUAL_HOST_STYLE', 'END', 'ENDPOINT', 'ENGINE', 'ENGINES',
'EPOCH', 'ERROR_ON_COLUMN_COUNT_MISMATCH', 'ESCAPE', 'EXCEPTION_BACKTRACE', 'EXISTS', 'EXPLAIN', 'EXPIRE',
'EXTRACT', 'ELSEIF', 'FALSE', 'FIELDS', 'FIELD_DELIMITER', 'NAN_DISPLAY', 'NULL_DISPLAY', 'NULL_IF',
'FILE_FORMAT', 'FILE', 'FILES', 'FINAL', 'FLASHBACK', 'FLOAT', 'FLOAT32', 'FLOAT64', 'FOR', 'FORCE',
'FORMAT', 'FOLLOWING', 'FORMAT_NAME', 'FORMATS', 'FRAGMENTS', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS',
'TABLE_FUNCTIONS', 'SET_VAR', 'FUSE', 'GET', 'GENERATED', 'GEOMETRY', 'GLOBAL', 'GRAPH', 'GROUP', 'GZIP',
'HAVING', 'HIGH', 'HISTORY', 'HIVE', 'HOUR', 'HOURS', 'ICEBERG', 'INTERSECT', 'IDENTIFIED', 'IDENTIFIER',
'IF', 'IN', 'INCREMENTAL', 'INDEX', 'INFORMATION', 'INITIALIZE', 'INNER', 'INSERT', 'INT', 'INT16',
'INT32', 'INT64', 'INT8', 'INTEGER', 'INTERVAL', 'INTO', 'INVERTED', 'IMMEDIATE', 'IS', 'ISODOW',
'ISOYEAR', 'JOIN', 'JSON', 'JULIAN', 'JWT', 'KEY', 'KILL', 'LATERAL', 'LOCATION_PREFIX', 'LOCKS',
'LOGICAL', 'LOOP', 'SECONDARY', 'ROLES', 'L2DISTANCE', 'LEADING', 'LEFT', 'LET', 'LIKE', 'LIMIT',
'LIST', 'LOW', 'LZO', 'MASKING', 'MAP', 'MAX_FILE_SIZE', 'MASTER_KEY', 'MEDIUM', 'MEMO', 'MEMORY',
'METRICS', 'MICROSECONDS', 'MILLENNIUM', 'MILLISECONDS', 'MINUTE', 'MONTH', 'MODIFY', 'MATERIALIZED',
'MUST_CHANGE_PASSWORD', 'NON_DISPLAY', 'NATURAL', 'NETWORK', 'DISABLED', 'NDJSON', 'NO_PASSWORD', 'NONE',
'NOT', 'NOTENANTSETTING', 'DEFAULT_ROLE', 'NULL', 'NULLABLE', 'OBJECT', 'OF', 'OFFSET', 'ON',
'ON_CREATE', 'ON_SCHEDULE', 'OPTIMIZE', 'OPTIONS', 'OR', 'ORC', 'ORDER', 'OUTPUT_HEADER', 'OUTER',
'ON_ERROR', 'OVER', 'OVERWRITE', 'PARTITION', 'PARQUET', 'PASSWORD', 'PASSWORD_MIN_LENGTH',
'PASSWORD_MAX_LENGTH', 'PASSWORD_MIN_UPPER_CASE_CHARS', 'PASSWORD_MIN_LOWER_CASE_CHARS',
'PASSWORD_MIN_NUMERIC_CHARS', 'PASSWORD_MIN_SPECIAL_CHARS', 'PASSWORD_MIN_AGE_DAYS', 'PASSWORD_MAX_AGE_DAYS',
'PASSWORD_MAX_RETRIES', 'PASSWORD_LOCKOUT_TIME_MINS', 'PASSWORD_HISTORY', 'PATTERN', 'PIPELINE',
'PLAINTEXT_PASSWORD', 'POLICIES', 'POLICY', 'POSITION', 'PROCESSLIST', 'PRIORITY', 'PURGE', 'PUT',
'QUARTER', 'QUERY', 'QUOTE', 'RANGE', 'RAWDEFLATE', 'READ_ONLY', 'RECLUSTER', 'RECORD_DELIMITER',
'REFERENCE_USAGE', 'REFRESH', 'REGEXP', 'RENAME', 'REPLACE', 'RETURN_FAILED_ONLY', 'REVERSE', 'MERGE',
'MATCHED', 'MISSING_FIELD_AS', 'NULL_FIELD_AS', 'UNMATCHED', 'ROW', 'ROWS', 'ROW_TAG', 'GRANT', 'REPEAT',
'ROLE', 'PRECEDING', 'PRECISION', 'PRESIGN', 'PRIVILEGES', 'QUALIFY', 'REMOVE', 'RETAIN', 'REVOKE',
'RECURSIVE', 'RETURN', 'RETURNS', 'RESULTSET', 'RUN', 'GRANTS', 'REFRESH_MODE', 'RIGHT', 'RLIKE', 'RAW',
'OPTIMIZED', 'SCHEMA', 'SCHEMAS', 'SECOND', 'MILLISECOND', 'SELECT', 'PIVOT', 'UNPIVOT', 'SEGMENT',
'SET', 'UNSET', 'SESSION', 'SETTINGS', 'STAGES', 'STATISTIC', 'SUMMARY', 'SHA256_PASSWORD', 'SHOW',
'SINCE', 'SIGNED', 'SINGLE', 'SIZE_LIMIT', 'MAX_FILES', 'SKIP_HEADER', 'SMALLINT', 'SNAPPY', 'SNAPSHOT',
'SPLIT_SIZE', 'STAGE', 'SYNTAX', 'USAGE', 'UPDATE', 'UPLOAD', 'SEQUENCE', 'SHARE', 'SHARES', 'SUPER',
'STATUS', 'STORED', 'STREAM', 'STREAMS', 'STRING', 'SUBSTRING', 'SUBSTR', 'SEMI', 'SOUNDS', 'SYNC',
'SYSTEM', 'STORAGE_TYPE', 'TABLE', 'TABLES', 'TARGET_LAG', 'TEXT', 'LONGTEXT', 'MEDIUMTEXT', 'TINYTEXT',
'TENANTSETTING', 'TENANTS', 'TENANT', 'THEN', 'TIMESTAMP', 'TIMEZONE_HOUR', 'TIMEZONE_MINUTE', 'TIMEZONE',
'TINYINT', 'TO', 'TOKEN', 'TRAILING', 'TRANSIENT', 'TRIM', 'TRUE', 'TRUNCATE', 'TRY_CAST', 'TSV',
'TUPLE', 'TYPE', 'UNBOUNDED', 'UNION', 'UINT16', 'UINT32', 'UINT64', 'UINT8', 'UNDROP', 'UNSIGNED',
'URL', 'METHOD', 'AUTHORIZATION_HEADER', 'USE', 'USER', 'USERS', 'USING', 'VACUUM', 'VALUES',
'VALIDATION_MODE', 'VARBINARY', 'VARCHAR', 'VARIANT', 'VERBOSE', 'VIEW', 'VIEWS', 'VIRTUAL', 'WHEN',
'WHERE', 'WHILE', 'WINDOW', 'WITH', 'XML', 'XOR', 'XZ', 'YEAR', 'ZSTD', 'NULLIF', 'COALESCE', 'RANDOM',
'IFNULL', 'NULLS', 'FIRST', 'LAST', 'IGNORE_RESULT', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'INDEXES',
'ADDRESS', 'OWNERSHIP', 'READ', 'WRITE', 'UDF', 'HANDLER', 'LANGUAGE', 'TASK', 'TASKS', 'TOP',
'WAREHOUSE', 'SCHEDULE', 'SUSPEND_TASK_AFTER_NUM_FAILURES', 'CRON', 'EXECUTE', 'SUSPEND', 'RESUME', 'PIPE',
'NOTIFICATION', 'INTEGRATION', 'ENABLED', 'WEBHOOK', 'ERROR_INTEGRATION', 'AUTO_INGEST',
'PIPE_EXECUTION_PAUSED', 'PREFIX', 'MODIFIED_AFTER', 'UNTIL', 'BEGIN', 'TRANSACTION', 'COMMIT', 'ABORT',
'ROLLBACK', 'TEMPORARY', 'SECONDS', 'DAYS'
}


# Type decorators
class ARRAY(sqltypes.TypeEngine):
Expand Down Expand Up @@ -106,6 +174,45 @@ def process(value):

return process

def literal_processor(self, dialect):
def process(value):
if value is not None:
datetime_str = value.isoformat(" ", timespec="microseconds")
return f"'{datetime_str}'"

return process


class DatabendTime(sqltypes.TIME):
__visit_name__ = "TIME"

_reg = re.compile(r"(?:\d+)-(?:\d+)-(?:\d+) (\d+):(\d+):(\d+)")

def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
if isinstance(value, str):
m = self._reg.match(value)
if not m:
raise ValueError(
"could not parse %r as a datetime value" % (value,)
)
return datetime.time(*[int(x or 0) for x in m.groups()])
else:
return value.time()

return process

def literal_processor(self, dialect):
def process(value):
if value is not None:
from_min_value = datetime.datetime.combine(datetime.date(1000, 1, 1), value)
time_str = from_min_value.isoformat(timespec="microseconds")
return f"'{time_str}'"

return process


class DatabendNumeric(sqltypes.Numeric):
def result_processor(self, dialect, type_):
Expand All @@ -125,6 +232,46 @@ def process(value):
return process


class DatabendInterval(sqltypes.Interval):
"""Stores interval as a datetime relative to epoch, see base implementation."""

_reg = re.compile(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)")

def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
if isinstance(value, str):
m = self._reg.match(value)
if not m:
raise ValueError(
"could not parse %r as a datetime value" % (value,)
)
groups = m.groups()
dt = datetime.datetime(*[
int(groups[0] or self.epoch.year),
int(groups[1] or self.epoch.month),
int(groups[2] or self.epoch.day),
int(groups[3] or 0),
int(groups[4] or 0),
int(groups[5] or 0),
])
else:
dt = value
return dt - self.epoch

return process

def literal_processor(self, dialect):
def process(value):
if value is not None:
d = self.epoch + value
interval_str = d.isoformat(" ", timespec="microseconds")
return f"'{interval_str}'"

return process


# Type converters
ischema_names = {
"bigint": BIGINT,
Expand Down Expand Up @@ -156,10 +303,14 @@ def process(value):
"varchar": VARCHAR,
"boolean": BOOLEAN,
"binary": BINARY,
"time": DatabendTime,
"interval": DatabendInterval,
}

# Column spec
colspecs = {
sqltypes.Interval: DatabendInterval,
sqltypes.Time: DatabendTime,
sqltypes.Date: DatabendDate,
sqltypes.DateTime: DatabendDateTime,
sqltypes.DECIMAL: DatabendNumeric,
Expand All @@ -168,7 +319,7 @@ def process(value):


class DatabendIdentifierPreparer(PGIdentifierPreparer):
pass
reserved_words = {r.lower() for r in RESERVED_WORDS}


class DatabendCompiler(PGCompiler):
Expand Down Expand Up @@ -211,10 +362,14 @@ def visit_concat_op_binary(self, binary, operator, **kw):

def render_literal_value(self, value, type_):
value = super(DatabendCompiler, self).render_literal_value(value, type_)
if isinstance(type_, sqltypes.DateTime):
value = "toDateTime(%s)" % value
if isinstance(type_, sqltypes.Date):
value = "toDate(%s)" % value
# if isinstance(type_, sqltypes.DateTime):
# return "to_datetime(%s)" % value
# if isinstance(type_, sqltypes.Date):
# return "to_date(%s)" % value
# if isinstance(type_, sqltypes.Time):
# return "to_datetime(%s)" % value
# if isinstance(type_, sqltypes.Interval):
# return "to_datetime(%s)" % value
return value

def limit_clause(self, select, **kw):
Expand Down Expand Up @@ -334,6 +489,8 @@ def visit_when_merge_unmatched(self, merge_unmatched, **kw):
", ".join(set_cols),
", ".join(map(lambda e: e._compiler_dispatch(self, **kw), sets_vals)),
)


class DatabendExecutionContext(default.DefaultExecutionContext):
@sa_util.memoized_property
def should_autocommit(self):
Expand Down Expand Up @@ -366,6 +523,9 @@ def visit_NVARCHAR(self, type_, **kw):
def visit_JSON(self, type_, **kw):
return "JSON" # or VARIANT

def visit_TIME(self, type_, **kw):
return "DATETIME"


class DatabendDDLCompiler(compiler.DDLCompiler):

Expand Down Expand Up @@ -635,9 +795,6 @@ def get_table_names(self, connection, schema=None, **kw):
select table_name
from information_schema.tables
where table_schema = :schema_name
"""
if self.server_version_info <= (1, 2, 410):
table_name_query += """
and engine NOT LIKE '%VIEW%'
"""
query = text(
Expand All @@ -654,17 +811,19 @@ def get_table_names(self, connection, schema=None, **kw):
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
view_name_query = """
select table_name
from information_schema.views
where table_schema = :schema_name
"""
if self.server_version_info <= (1, 2, 410):
view_name_query = """
select table_name
from information_schema.tables
where table_schema = :schema_name
and engine LIKE '%VIEW%'
"""
"""
# This handles bug that existed a while, views were not included in information_schema.tables
# https://github.com/datafuselabs/databend/issues/16039
if self.server_version_info > (1, 2, 410) and self.server_version_info <= (1, 2, 566):
view_name_query = """
select table_name
from information_schema.views
where table_schema = :schema_name
"""
query = text(
view_name_query
).bindparams(
Expand Down Expand Up @@ -694,6 +853,13 @@ def get_table_options(self, connection, table_name, schema=None, **kw):
FROM system.tables
WHERE database = :schema_name
and name = :table_name
UNION
SELECT engine_full, NULL as cluster_by, NULL as is_transient
FROM system.views
WHERE database = :schema_name
and name = :table_name
"""
).bindparams(
bindparam("table_name", type_=sqltypes.Unicode),
Expand Down
9 changes: 8 additions & 1 deletion databend_sqlalchemy/requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def time(self):
"""target dialect supports representation of Python
datetime.time() objects."""

return exclusions.closed()
return exclusions.open()

@property
def time_microseconds(self):
Expand All @@ -147,6 +147,13 @@ def time_microseconds(self):

return exclusions.closed()

@property
def datetime_interval(self):
"""target dialect supports representation of Python
datetime.timedelta()."""

return exclusions.open()

@property
def autoincrement_insert(self):
"""target platform generates new surrogate integer primary key values
Expand Down
45 changes: 45 additions & 0 deletions tests/test_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
from sqlalchemy.testing.suite import JoinTest as _JoinTest
from sqlalchemy.testing.suite import BizarroCharacterFKResolutionTest as _BizarroCharacterFKResolutionTest
from sqlalchemy.testing.suite import ServerSideCursorsTest as _ServerSideCursorsTest
from sqlalchemy.testing.suite import IntervalTest as _IntervalTest
from sqlalchemy.testing.suite import PrecisionIntervalTest as _PrecisionIntervalTest
from sqlalchemy import types as sql_types
from sqlalchemy import testing, select
from sqlalchemy.testing import config, eq_
from databend_sqlalchemy.databend_dialect import DatabendInterval


class ComponentReflectionTest(_ComponentReflectionTest):
Expand Down Expand Up @@ -277,3 +280,45 @@ def test_roundtrip_fetchall(self):
@testing.skip("databend") # Skipped because requires auto increment primary key
def test_roundtrip_fetchmany(self):
pass


class IntervalTest(_IntervalTest):
__backend__ = True
datatype = DatabendInterval

@testing.skip("databend") # Skipped because cannot figure out the literal() part
def test_arithmetic_operation_literal_interval(self, connection):
pass

@testing.skip("databend") # Skipped because cannot figure out the literal() part
def test_arithmetic_operation_table_interval_and_literal_interval(
self, connection, arithmetic_table_fixture
):
pass

@testing.skip("databend") # Skipped because cannot figure out the literal() part
def test_arithmetic_operation_table_date_and_literal_interval(
self, connection, arithmetic_table_fixture
):
pass


class PrecisionIntervalTest(_PrecisionIntervalTest):
__backend__ = True
datatype = DatabendInterval

@testing.skip("databend") # Skipped because cannot figure out the literal() part
def test_arithmetic_operation_literal_interval(self, connection):
pass

@testing.skip("databend") # Skipped because cannot figure out the literal() part
def test_arithmetic_operation_table_interval_and_literal_interval(
self, connection, arithmetic_table_fixture
):
pass

@testing.skip("databend") # Skipped because cannot figure out the literal() part
def test_arithmetic_operation_table_date_and_literal_interval(
self, connection, arithmetic_table_fixture
):
pass

0 comments on commit b532f4b

Please sign in to comment.