- 🚿Stream — a mode when Bulker inserts data to a destination on per record basis. Usually, databases don't like when a large amount of data is streamed. Don't use at production scale (more than 10-100 records per minute, depending on database).
- 🛢️Batch — a mode when Bulker inserts data to a destination in batches. Preferred mode for large amounts of data.
- 🔑Primary Key - a primary key is the column or columns that contain values that uniquely identify each row in a table. Enabled via stream options. Required for 'deduplication' option.
- 🐫Deduplication — a mode that avoid duplication of data rows with the equal values of key columns (primary key). It means that if Bulker receives a record with the same primary key values, the old one will be replaced. Bulker maintains uniqueness of rows based on primary key columns even for warehouses that doesn't enforce uniqueness natively. Enabled via stream options. Require primary key option. May comes with performance tradeoffs.
- 🗓️Timestamp Column - timestamp column option helps Bulker to create tables optimized for range queries and sorting by time, e.g. event creation time.
Redshift | BigQuery | ClickHouse | Snowflake | Postgres | MySQL | S3 (coming soon) | |
---|---|---|---|---|---|---|---|
Stream | ✅ Supported |
❌ Not supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | |
Batch | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | |
Primary key | ✅ Supported | ℹ️ Emulated | ✅️ Supported | ✅️ Supported | ✅️ Supported | ✅️ Supported | |
Deduplication | ✅ Supported | ✅ Supported | ✅ Supported |
✅ Supported | ✅ Supported | ✅ Supported | |
Timestamp Column | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported |
Those features are not exposed as HTTP API and supported only on Go-lib API level.
- Replace Table - a special version of batch mode that assumes that a single batch contains all data for a table. Depending on database implementation bulker tries to atomically replace old table with a new one.
- Replace Partition - a special version of batch mode that replaces a part of target table. Part of table to replace is defined by 'partition' stream option. Each batch loads data for virtual partition identified by 'partition' option value. If table already contains data for provided 'partition', this data will be deleted and replaced with new data from current batch. Enabled via stream options.
Redshift | BigQuery | ClickHouse | Snowflake | Postgres | MySQL | S3 (coming soon) | |
---|---|---|---|---|---|---|---|
Replace Table | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported | |
Replace Partition | ✅ Supported | ✅ Supported |
✅ Supported | ✅ Supported | ✅ Supported | ✅ Supported |
✅ Supported
⚠️ Performance considerations
Supported as plain insert statements. Don't use at production scale (more than 10 records per minute)
✅ Supported
Algorithm:
- Write to tmp file
- Load tmp file to s3
BEGIN TRANSACTION
COPY from s3 to tmp_table
INSERT into target_table select from tmp_table
COMMIT
✅ Supported
For batch mode the following algorithm is used:
- Write to tmp file
- Deduplicate rows in tmp file
- Load tmp file to s3
BEGIN TRANSACTION
COPY from s3 to tmp_table
DELETE from target_table using tmp_table
where primary key matchesINSERT into target_table select from tmp_table
COMMIT
For stream mode:
SELECT
by primary key. Then either INSERT
or UPDATE
depending on result. Don't use at production scale (more than 10 records per minute)
✅ Supported
In Redshift primary keys doesn’t enforce uniqueness. Bulker performs deduplication itself when deduplication option is enabled and primary key is specified.
If primary key consists of a single column, that column will also be selected as the DIST KEY
.
✅ Supported
Selected timestamp column will be used as sort key for target table.
✅ Supported
Algorithm:
- Write to tmp file
- Load tmp file to s3
BEGIN TRANSACTION
COPY from s3 to tmp_table
RENAME target_table to deprecated_target_table_20060101_150405
RENAME tmp_table to target_table
DROP TABLE deprecated_target_table_20060101_150405
COMMIT
✅ Supported
Algorithm:
- Write to tmp file
- Load tmp file to s3
BEGIN TRANSACTION
DELETE from target_table where partition_id=partiton option value
COPY from s3 to target_table
COMMIT
❌ Not supported
It's possible to implement, but without deduplication, so we decided not to.
✅ Supported
- Write to tmp file
- Use Loader API to load to tmp_table from tmp file
- Use Copier API to copy from tmp_table to target_table
✅ Supported
Algorithm for batch mode:
- Write to tmp file
- Dedup tmp file
- Use Loader API to load to tmp_table from tmp file
MERGE into target_table on tmp_table when matched then UPDATE when not matched them INSERT
Emulated - bulker fully handles uniqueness. Primary keys columns meta information stored in table labels.
✅ Supported
Bulker creates time-unit column-partitioned table with specified timestamp column and monthly partitioning.
✅ Supported
Algorithm:
- Write to tmp file
- Use Loader API to load to tmp_table from tmp file
- Use Copier API to copy from tmp_table to target_table with WriteTruncate mode
- Drop tmp_table
✅ Supported
⚠️ Not atomic – during completion of bulker stream it is possible that target table will be missing some data for specified 'partiton' for a short period of time.
Algorithm:
DELETE from target_table where partition_id=
partition option value- Write to tmp file
- Use Loader API to load to target_table from tmp file
✅ Supported
For single node instance:
INSERT INTO target_table (...) VALUES (..)
For cluster bulker insert into distributed table so data evenly distributed across cluster nodes:
INSERT INTO dist_target_table (...) VALUES (...)
✅ Supported
Algorithm:
- Write to tmp file
INSERT INTO tmp_table (...) VALUES (...)
- bulk load data from tmp file into tmp_table using bulk insertINSERT INTO target_table(...) SELECT ... FROM tmp_table
✅ Supported
⚠️ Eventual deduplication
Bulker relies on underlying table engine. Unless table created prior to Bulker, ReplacingMergeTree will be used.
Primary key column configured for destination will be used both as PRIMARY KEY
and ORDER BY
.
ReplacingMergeTree engine performs deduplication in background during some time after insertion
So it's still possible to get rows with duplicated primary key columns using ordinary SELECT
.
To make sure that no duplicates are present in query results use FINAL modifier, e.g:
SELECT * FROM target_table FINAL
.
Note**
ReplacingMergeTree
is not only way to deduplicate data in ClickHouse. There other approaches too. To implement them, create destination table before Bulker starts inserting the data. In this case Bulker will respect table engine and primary key columns you specified.
✅ Supported
Primary keys columns also used as sorting key for ReplacingMergeTree engine.
✅ Supported
Bulker creates tables partitioned by specified timestamp column and monthly partitioning, e.g. PARTITION BY toYYYYMM(_timestamp)
✅ Supported
Algorithm:
- Write to tmp file
INSERT INTO tmp_table (...) VALUES (...)
- bulk load data from tmp file into tmp_table using bulk insertEXCHANGE TABLES target_table tmp_table
✅ Supported
Algorithm:
- Write to tmp file
ALTER TABLE target_table DELETE where partition_id=partiton option value
INSERT INTO tmp_table(...) VALUES (...)
- bulk load data from tmp file into tmp_table using bulk insertINSERT INTO target_table(...) SELECT ... FROM tmp_table
✅ Supported
INSERT INTO target_table (...) VALUES (..)
✅ Supported
Algorithm:
- Write to tmp file
- Load tmp file to
stage
BEGIN TRANSACTION
COPY from stage to tmp_table
INSERT into target_table select from tmp_table
COMMIT
✅ Supported
For batch mode the following algorithm is used:
- Write to tmp file
- Deduplicate rows in tmp file
- Load tmp file to s3
BEGIN TRANSACTION
COPY from stage to tmp_table
MERGE into target_table using (select from tmp_table) ...
COMMIT
For stream mode:
SELECT
by primary key. Then either INSERT
or UPDATE
depending on result.
✅ Supported
In Snowflake primary keys doesn’t enforce uniqueness. Bulker performs deduplication itself when deduplication option is enabled and primary key is specified.
✅ Supported
Bulker sets clustering key to the month part of specified timestamp column values, e.g. CLUSTER BY (DATE_TRUNC('MONTH', _timestamp))
✅ Supported
Algorithm:
- Write to tmp file
- Load tmp file to
stage
BEGIN TRANSACTION
COPY from stage to tmp_table
CREATE OR REPLACE TABLE target_table CLONE tmp_table
DROP TABLE tmp_table
COMMIT
✅ Supported
Algorithm:
- Write to tmp file
- Load tmp file to
stage
BEGIN TRANSACTION
DELETE from target_table where partition_id=partiton option value
COPY from stage to target_table
COMMIT
✅ Supported
INSERT INTO target_table (...) VALUES (..)
✅ Supported
Algorithm:
- Write to tmp file
BEGIN TRANSACTION
COPY from STDIN to tmp_table
- load tmp file into tmp_tableINSERT into target_table select from tmp_table
COMMIT
✅ Supported
For batch mode the following algorithm is used:
- Write to tmp file
- Deduplicate rows in tmp file
BEGIN TRANSACTION
COPY from STDIN to tmp_table
- load tmp file into tmp_tableINSERT into target_table select from tmp_table ON CONFLICT UPDATE ...
COMMIT
For stream mode:
INSERT INTO target_table (...) VALUES (..) ON CONFLICT UPDATE ...
✅ Supported
✅ Supported
Regular index is created on specified timestamp column.
✅ Supported
Algorithm:
- Write to tmp file
BEGIN TRANSACTION
COPY from STDIN to tmp_table
- load tmp file into tmp_tableRENAME target_table to deprecated_target_table_20060101_150405
RENAME tmp_table to target_table
DROP TABLE deprecated_target_table_20060101_150405
COMMIT
✅ Supported
Algorithm:
- Write to tmp file
BEGIN TRANSACTION
DELETE from target_table where partition_id=partiton option value
COPY from STDIN to target_table
- load tmp file into tmp_tableCOMMIT
✅ Supported
INSERT INTO target_table (...) VALUES (..)
✅ Supported
Algorithm:
BEGIN TRANSACTION
INSERT into tmp_table
INSERT into target_table select from tmp_table
COMMIT
✅ Supported
For batch mode the following algorithm is used:
BEGIN TRANSACTION
INSERT into tmp_table ... ON DUPLICATE KEY UPDATE ...
INSERT into target_table select from tmp_table ... ON DUPLICATE KEY UPDATE ...
COMMIT
For stream mode:
INSERT INTO target_table ... ON DUPLICATE KEY UPDATE ...
✅ Supported
✅ Supported
Regular index is created on specified timestamp column.
✅ Supported
Algorithm:
BEGIN TRANSACTION
INSERT into tmp_table
RENAME target_table to deprecated_target_table_20060101_150405
RENAME tmp_table to target_table
DROP TABLE deprecated_target_table_20060101_150405
COMMIT
✅ Supported
Algorithm:
BEGIN TRANSACTION
DELETE from target_table where partition_id=partiton option value
INSERT into target_table ... ON DUPLICATE KEY UPDATE ...
COMMIT