From 1796ead5784d8b0a4a65bf1176ae58f1c069128f Mon Sep 17 00:00:00 2001 From: Muhammad Azhar Date: Mon, 28 Aug 2023 21:50:45 +0300 Subject: [PATCH] Clickhouse ReplicatedMergeTree family support added (#36) * Added engine type support (#1) * Add support for engine types * added cluster name to table creation args * Removed unsupported engines * Added comments to create empty table function * moved table path, replica & cluster name to config * variables added to config * Add engine type support (#2) * Add support for engine types * added cluster name to table creation args * Removed unsupported engines * Added comments to create empty table function * moved table path, replica & cluster name to config * variables added to config * Moved engine type to config * add cluster support to alter table command --------- Co-authored-by: RamlahAziz --- README.md | 4 +++ target_clickhouse/connectors.py | 36 ++++++++++++++++---- target_clickhouse/engine_class.py | 56 +++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 6 deletions(-) create mode 100644 target_clickhouse/engine_class.py diff --git a/README.md b/README.md index 08f9b4e..57aa325 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,10 @@ target-clickhouse --about --format=markdown |:---------------------|:--------:|:-------:|:------------| | sqlalchemy_url | False | None | SQLAlchemy connection string | | table_name | False | None | The name of the table to write to. | +| engine_type | False | MergeTree | The engine type to use for the table. This must be one of the following engine types: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplicatedMergeTree, ReplicatedReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. | +| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information | +| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. | +| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. | | default_target_schema| False | None | The default target database schema name to use for all streams. | | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | diff --git a/target_clickhouse/connectors.py b/target_clickhouse/connectors.py index 2b7bb87..817787e 100644 --- a/target_clickhouse/connectors.py +++ b/target_clickhouse/connectors.py @@ -6,12 +6,13 @@ import sqlalchemy.types from clickhouse_sqlalchemy import ( Table, - engines, ) from singer_sdk import typing as th from singer_sdk.connectors import SQLConnector from sqlalchemy import Column, MetaData, create_engine +from target_clickhouse.engine_class import create_engine_wrapper, SupportedEngines + if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -79,7 +80,6 @@ def create_empty_table( primary_keys: list of key properties. partition_keys: list of partition keys. as_temp_table: True to create a temp table. - Raises: NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. @@ -100,6 +100,13 @@ def create_empty_table( meta = MetaData(schema=None, bind=self._engine) columns: list[Column] = [] primary_keys = primary_keys or [] + + # If config engine type is set, then use it instead of the default engine type. + if self.config.get("engine_type"): + engine_type = self.config.get("engine_type") + else: + engine_type = SupportedEngines.MERGE_TREE + try: properties: dict = schema["properties"] except KeyError as e: @@ -115,8 +122,15 @@ def create_empty_table( ), ) - table_engine = engines.MergeTree(primary_key=primary_keys) - _ = Table(table_name, meta, *columns, table_engine) + table_engine = create_engine_wrapper( + engine_type=engine_type, primary_keys=primary_keys, config=self.config + ) + + table_args = {} + if self.config.get("cluster_name"): + table_args["clickhouse_cluster"] = self.config.get("cluster_name") + + _ = Table(table_name, meta, *columns, table_engine, **table_args) meta.create_all(self._engine) def prepare_schema(self, _: str) -> None: @@ -129,15 +143,15 @@ def prepare_schema(self, _: str) -> None: """ return - @staticmethod def get_column_alter_ddl( + self, table_name: str, column_name: str, column_type: sqlalchemy.types.TypeEngine, ) -> sqlalchemy.DDL: """Get the alter column DDL statement. - Override this if your database uses a different syntax for altering columns. + Overrides the static method in the base class to support ON CLUSTER. Args: table_name: Fully qualified table name of column to alter. @@ -147,6 +161,16 @@ def get_column_alter_ddl( Returns: A sqlalchemy DDL instance. """ + if self.config.get("cluster_name"): + return sqlalchemy.DDL( + "ALTER TABLE %(table_name)s ON CLUSTER %(cluster_name)s MODIFY COLUMN %(column_name)s %(column_type)s", + { + "table_name": table_name, + "column_name": column_name, + "column_type": column_type, + "cluster_name": self.config.get("cluster_name"), + }, + ) return sqlalchemy.DDL( "ALTER TABLE %(table_name)s MODIFY COLUMN %(column_name)s %(column_type)s", { diff --git a/target_clickhouse/engine_class.py b/target_clickhouse/engine_class.py new file mode 100644 index 0000000..6d6575c --- /dev/null +++ b/target_clickhouse/engine_class.py @@ -0,0 +1,56 @@ +from clickhouse_sqlalchemy import engines +from enum import Enum + + +class SupportedEngines(str, Enum): + MERGE_TREE = "MergeTree" + REPLACING_MERGE_TREE = "ReplacingMergeTree" + SUMMING_MERGE_TREE = "SummingMergeTree" + AGGREGATING_MERGE_TREE = "AggregatingMergeTree" + REPLICATED_MERGE_TREE = "ReplicatedMergeTree" + REPLICATED_REPLACING_MERGE_TREE = "ReplicatedReplacingMergeTree" + REPLICATED_SUMMING_MERGE_TREE = "ReplicatedSummingMergeTree" + REPLICATED_AGGREGATING_MERGE_TREE = "ReplicatedAggregatingMergeTree" + + +ENGINE_MAPPING = { + SupportedEngines.MERGE_TREE: engines.MergeTree, + SupportedEngines.REPLACING_MERGE_TREE: engines.ReplacingMergeTree, + SupportedEngines.SUMMING_MERGE_TREE: engines.SummingMergeTree, + SupportedEngines.AGGREGATING_MERGE_TREE: engines.AggregatingMergeTree, + SupportedEngines.REPLICATED_MERGE_TREE: engines.ReplicatedMergeTree, + SupportedEngines.REPLICATED_REPLACING_MERGE_TREE: engines.ReplicatedReplacingMergeTree, + SupportedEngines.REPLICATED_SUMMING_MERGE_TREE: engines.ReplicatedSummingMergeTree, + SupportedEngines.REPLICATED_AGGREGATING_MERGE_TREE: engines.ReplicatedAggregatingMergeTree, +} + + +def is_supported_engine(engine_type): + return engine_type in SupportedEngines.__members__.values() + + +def get_engine_class(engine_type): + return ENGINE_MAPPING.get(engine_type) + + +def create_engine_wrapper(engine_type, primary_keys, config: dict | None = None): + # check if engine type is in supported engines + if is_supported_engine(engine_type) is False: + raise ValueError(f"Engine type {engine_type} is not supported.") + + engine_args = {"primary_key": primary_keys} + if config is not None: + if engine_type in ( + SupportedEngines.REPLICATED_MERGE_TREE, + SupportedEngines.REPLICATED_REPLACING_MERGE_TREE, + SupportedEngines.REPLICATED_SUMMING_MERGE_TREE, + SupportedEngines.REPLICATED_AGGREGATING_MERGE_TREE, + ): + if config.get("table_path"): + engine_args["table_path"] = config.get("table_path") + if config.get("replica_name"): + engine_args["replica_name"] = config.get("replica_name") + + engine_class = get_engine_class(engine_type) + + return engine_class(**engine_args)