Skip to content

Commit

Permalink
Clickhouse ReplicatedMergeTree family support added (#36)
Browse files Browse the repository at this point in the history
* Added engine type support (#1)

* Add support for engine types

* added cluster name to table creation args

* Removed unsupported engines

* Added comments to create empty table function

* moved table path, replica & cluster name to config

* variables added to config

* Add engine type support (#2)

* Add support for engine types

* added cluster name to table creation args

* Removed unsupported engines

* Added comments to create empty table function

* moved table path, replica & cluster name to config

* variables added to config

* Moved engine type to config

* add cluster support to alter table command

---------

Co-authored-by: RamlahAziz <ramlah.aziz2012@gmail.com>
  • Loading branch information
azhard4int and RamlahAziz authored Aug 28, 2023
1 parent aae7305 commit 1796ead
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ target-clickhouse --about --format=markdown
|:---------------------|:--------:|:-------:|:------------|
| sqlalchemy_url | False | None | SQLAlchemy connection string |
| table_name | False | None | The name of the table to write to. |
| engine_type | False | MergeTree | The engine type to use for the table. This must be one of the following engine types: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplicatedMergeTree, ReplicatedReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. |
| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
Expand Down
36 changes: 30 additions & 6 deletions target_clickhouse/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import sqlalchemy.types
from clickhouse_sqlalchemy import (
Table,
engines,
)
from singer_sdk import typing as th
from singer_sdk.connectors import SQLConnector
from sqlalchemy import Column, MetaData, create_engine

from target_clickhouse.engine_class import create_engine_wrapper, SupportedEngines

if TYPE_CHECKING:
from sqlalchemy.engine import Engine

Expand Down Expand Up @@ -79,7 +80,6 @@ def create_empty_table(
primary_keys: list of key properties.
partition_keys: list of partition keys.
as_temp_table: True to create a temp table.
Raises:
NotImplementedError: if temp tables are unsupported and as_temp_table=True.
RuntimeError: if a variant schema is passed with no properties defined.
Expand All @@ -100,6 +100,13 @@ def create_empty_table(
meta = MetaData(schema=None, bind=self._engine)
columns: list[Column] = []
primary_keys = primary_keys or []

# If config engine type is set, then use it instead of the default engine type.
if self.config.get("engine_type"):
engine_type = self.config.get("engine_type")
else:
engine_type = SupportedEngines.MERGE_TREE

try:
properties: dict = schema["properties"]
except KeyError as e:
Expand All @@ -115,8 +122,15 @@ def create_empty_table(
),
)

table_engine = engines.MergeTree(primary_key=primary_keys)
_ = Table(table_name, meta, *columns, table_engine)
table_engine = create_engine_wrapper(
engine_type=engine_type, primary_keys=primary_keys, config=self.config
)

table_args = {}
if self.config.get("cluster_name"):
table_args["clickhouse_cluster"] = self.config.get("cluster_name")

_ = Table(table_name, meta, *columns, table_engine, **table_args)
meta.create_all(self._engine)

def prepare_schema(self, _: str) -> None:
Expand All @@ -129,15 +143,15 @@ def prepare_schema(self, _: str) -> None:
"""
return

@staticmethod
def get_column_alter_ddl(
self,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
) -> sqlalchemy.DDL:
"""Get the alter column DDL statement.
Override this if your database uses a different syntax for altering columns.
Overrides the static method in the base class to support ON CLUSTER.
Args:
table_name: Fully qualified table name of column to alter.
Expand All @@ -147,6 +161,16 @@ def get_column_alter_ddl(
Returns:
A sqlalchemy DDL instance.
"""
if self.config.get("cluster_name"):
return sqlalchemy.DDL(
"ALTER TABLE %(table_name)s ON CLUSTER %(cluster_name)s MODIFY COLUMN %(column_name)s %(column_type)s",
{
"table_name": table_name,
"column_name": column_name,
"column_type": column_type,
"cluster_name": self.config.get("cluster_name"),
},
)
return sqlalchemy.DDL(
"ALTER TABLE %(table_name)s MODIFY COLUMN %(column_name)s %(column_type)s",
{
Expand Down
56 changes: 56 additions & 0 deletions target_clickhouse/engine_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from clickhouse_sqlalchemy import engines
from enum import Enum


class SupportedEngines(str, Enum):
MERGE_TREE = "MergeTree"
REPLACING_MERGE_TREE = "ReplacingMergeTree"
SUMMING_MERGE_TREE = "SummingMergeTree"
AGGREGATING_MERGE_TREE = "AggregatingMergeTree"
REPLICATED_MERGE_TREE = "ReplicatedMergeTree"
REPLICATED_REPLACING_MERGE_TREE = "ReplicatedReplacingMergeTree"
REPLICATED_SUMMING_MERGE_TREE = "ReplicatedSummingMergeTree"
REPLICATED_AGGREGATING_MERGE_TREE = "ReplicatedAggregatingMergeTree"


ENGINE_MAPPING = {
SupportedEngines.MERGE_TREE: engines.MergeTree,
SupportedEngines.REPLACING_MERGE_TREE: engines.ReplacingMergeTree,
SupportedEngines.SUMMING_MERGE_TREE: engines.SummingMergeTree,
SupportedEngines.AGGREGATING_MERGE_TREE: engines.AggregatingMergeTree,
SupportedEngines.REPLICATED_MERGE_TREE: engines.ReplicatedMergeTree,
SupportedEngines.REPLICATED_REPLACING_MERGE_TREE: engines.ReplicatedReplacingMergeTree,
SupportedEngines.REPLICATED_SUMMING_MERGE_TREE: engines.ReplicatedSummingMergeTree,
SupportedEngines.REPLICATED_AGGREGATING_MERGE_TREE: engines.ReplicatedAggregatingMergeTree,
}


def is_supported_engine(engine_type):
return engine_type in SupportedEngines.__members__.values()


def get_engine_class(engine_type):
return ENGINE_MAPPING.get(engine_type)


def create_engine_wrapper(engine_type, primary_keys, config: dict | None = None):
# check if engine type is in supported engines
if is_supported_engine(engine_type) is False:
raise ValueError(f"Engine type {engine_type} is not supported.")

engine_args = {"primary_key": primary_keys}
if config is not None:
if engine_type in (
SupportedEngines.REPLICATED_MERGE_TREE,
SupportedEngines.REPLICATED_REPLACING_MERGE_TREE,
SupportedEngines.REPLICATED_SUMMING_MERGE_TREE,
SupportedEngines.REPLICATED_AGGREGATING_MERGE_TREE,
):
if config.get("table_path"):
engine_args["table_path"] = config.get("table_path")
if config.get("replica_name"):
engine_args["replica_name"] = config.get("replica_name")

engine_class = get_engine_class(engine_type)

return engine_class(**engine_args)

0 comments on commit 1796ead

Please sign in to comment.