- Running Bulker
- Common Parameters
- Kafka Connection
- Batching
- Streaming
- Error Handling and Retries
- Advanced Kafka Tuning
- Events Log (optional)
- Defining Destination
See also HTTP API
The best way to run Bulker is to use docker image.
- Use
jitsucom/bulker:latest
for the last stable version - Use
jitsucom/bulker:canary
for the last build
Alternatively, you can build your own binary by running go mod download && go build -o bulker
Bulker is configured via environment variables. All variables are prefixed with
BULKER_
. See the list of available variables below.
Optional, default value: random uuid
ID of bulker instance. It is used for identifying Kafka consumers and metrics. If is not set,
instance id will be generated and persisted to disk (~/.bulkerapp/instance_id
) and
reused on next restart.
Optional, default value: 3042
Optional, default value: ''
A list of hashed auth tokens that authorizes user in HTTP interface separated by comma. Each must have format:
${salt}.${hash}
where ${salt}
should be random string. Hash is hex(sha512($token + $salt + BULKER_TOKEN_SECRET)
.
$token
must consist only of letters, digits, underscore and dash
Optional, default value: empty string
See above. A secret that is used for hashing tokens.
Optional, default value: ''
A list of plain non hashed tokens separated by comma. Each token must consist only of letters, digits, underscore and dash
Can be used instead of BULKER_AUTH_TOKENS
,BULKER_TOKEN_SECRET
pair. It offers simplicity at cost of lower security.
Not recommended for production.
Required
List of Kafka brokers separated by comma. Each broker should be in format host:port
.
If SSL should be enabled
Skip SSL verification of kafka server certificate.
Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"}
Bulker buffers events and sends them to destination in batches if mode=batch
. The batch is sent when
either one of the following is true:
batchSize
events are bufferedfrequency
minutes passed since the first event in the batch was buffered. (float))
Batch settings that are default for all destinations may be set with following variables:
Optional, default value: 300
(5 min)
Default period for batch processing for destinations where frequency
is not set explicitly.
Read more about batch processing configuration below
Optional, default value: 10000
Default batch size for destinations where batchSize
is not set explicitly.
Read more about batch processing configuration below
See also DB Feature Matrix
If mode is stream
, Bulker will send events to destination as soon as they are received.
If Bulker fails to send events to destination, it can retry sending them with exponential backoff.
When error occurs, Bulker move events to Kafka topic dedicated to Retry Consumer.
In streaming mode single failed event is moved to retry
topic while in batch mode whole batch is moved to retry
topic.
Retry Consumer is responsible for requeuing events from retry
topic. It runs periodically and
relocate events from retry
topic to the original topic while incrementing retries attempt counter.
If stream or batch consumer reaches max retry attempts for specific event, that event is moved to dead
topic.
Parameters:
Optional, default value: 5
Max number of retry attempts.
Optional, default value: 5
Defines base for exponential backoff in minutes for retry attempts. For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes.
Optional, default value: 1440
Defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours
Optional, default value: 300
(5 min)
Default period of running Retry Consumer for destinations where retryPeriodSec
is not set explicitly.
Read more about batch processing configuration below
Optional, default value: 100
Default batch size for destination's Retry Consumer where retryBatchSize
is not set explicitly.
Read more about batch processing configuration below
Bulker automatically creates 3 topics per each table in destination. One topic is for main processing, one is for failed events that should be retried and the last one for failed events that won't be retried - dead. The topic names has format in.id.{destinationId}.m.{mode}.t.{tableName}
.
Mode can be: batch
or stream
, retry
, dead
.
Parameters above define how topics are created
Optional, default value: `` (none)
String prefixed to all destination topic names.
e.g. if BULKER_KAFKA_TOPIC_PREFIX=some.prefix.
, then a full topic name could be some.prefix.in.id.clyzlw-.m.batch.t.events
Optional, default value: 168
(7 days)
Main topic retention time in hours.
Optional, default value: 168
(7 days)
Topic for retried events retention time in hours.
Optional, default value: 168
(7 days)
Topic for dead events retention time in hours.
Optional, default value: 1
Replication factor for topics.
Note For production, it should be set to at least 2.
If BULKER_CLICKHOUSE_HOST
is set, Bulker will use ClickHouse for storing a history of processed events
Optional
Clickhouse host and port to store events log. E.g. clickhouse.example.com:9440
Optional
Clickhouse database where to store events log.
Optional
Enable SSL for Clickhouse connection
Optional
Optional
Bulker operates with destinations. Each destination is a connection to database or storage services (GCS, S3, etc).
Each destination is a JSON-object
There are two ways how to define list of destinations:
Each environment variable BULKER_DESTINATION_*
defines a destination. The value of the variable is a JSON object. Example:
BULKER_DESTINATION_POSTGRES="{id: 'postgres', }"
URL of endpoint that returns configuration of destination entities entities.
E.g. jitsucom/console
's export endpoint: http://<consoles-domain>/api/admin/export/bulker-connections
Auth token for accessing BULKER_CONFIG_SOURCE
endpoint.
E.g. for jitsucom/console
's export endpoint: service-admin-account:CONSOLE_AUTH_TOKENS
Default value: 5
Period in seconds for refreshing configuration from BULKER_CONFIG_SOURCE
endpoint.
Set BULKER_CONFIG_SOURCE
to redis://...
or rediss://...
and Bulker will read destinations from Redis enrichedConnections
key.
Each destination is a JSON object:
{
//unique id of destination. The id is referenced in HTTP-api
id: "string", // unique destination id
//"clickhouse", "postgres", "mysql", "snowflake", "redshift" or "bigquery"
//"s3" and "gcs" are coming soom
type: "string", // destination type, see below
//optional (time in ISO8601 format) when destination has been updated
updatedAt: "2020-01-01T00:00:00Z",
//how to connect to destination. Values are destination specific. See
credentials: {},
options: {
mode: "string", // "stream" or "batch"
//maximum batch size. If not set, value of BULKER_BATCH_RUNNER_DEFAULT_BATCH_SIZE is used
//see "Batching" section above
//default value: 10000
batchSize: 10000,
//period of running batch consumer in minutes (float). If not set, value of BULKER_BATCH_RUNNER_DEFAULT_PERIOD_SEC is used
//see "Batching" section above
//default value: 5
frequency: 5,
//name of the field that contains unique event id.
//optional
primaryKey: "id",
//whether bulker should deduplicate events by primary key. See db-feature-matrix.md Requires primaryKey to be set.
//default value: false
deduplicate: false,
//field that contains timestamp of an event. If set bulker will create destination tables optimized for range queries and sorting by provided column
//optional
timestamp: "timestamp",
//batch size of retry consumer. If not set, value of BULKER_BATCH_RUNNER_DEFAULT_RETRY_BATCH_SIZE is used
//see "Error Handling and Retries" section above
//default value: 100
retryBatchSize: 100,
//period of running retry consumer in minutes (float). If not set batchPeriodSec is used or BULKER_BATCH_RUNNER_DEFAULT_RETRY_PERIOD_SEC if batchPeriodSec is not set too.
//see "Error Handling and Retries" section above
//default value: 5
retryFrequency: 5,
},
}
Postrgres, MySQL, Redshift and Snowflake credentials
shares same configuration structure
{
host: "string",
port: 5432,
database: "string",
defaultSchema: "",
username: "string",
password: "string",
//custom SQL connection parameters
parameters: {},
//Only for Redshift. Intermediate S3 bucket for uploading data
s3Config: {
//bucket name
bucket: "string",
//bucker region. Seehttps://docs.aws.amazon.com/general/latest/gr/s3.html
region: "string",
//access credentials
accessKeyId: "string",
secretAccessKey: "string",
//(optional) Folder inside bucker
folder: "",
}
}
{
//Clickhouse protocol: clickhouse, clickhouse-secure, http or https
protocol: "string",
//list of clickhouse servers as host:port. If port is not specified, default port for respective protocol will be used. http → 8123, https → 8443, clickhouse → 9000, clickhouse-secure → 9440
hosts: ["string"],
//map of parameters. See https://clickhouse.com/docs/en/integrations/go/clickhouse-go/database-sql-api/#connection-settings
parameters: {},
username: "string",
password: "string",
//name of the database
database: "string",
//cluster name
cluster: "string",
//clickhouse engine settings. Defines how new tables are created in clickhouse
engine: {
//todo
}
}
{
//service account credentials. See https://cloud.google.com/docs/authentication/production
//Google Cloud project ID
project: "string",
//key file. Either JSON object or path to local file
keyFile: "string",
//BigQuery dataset name
bqDataset: "string",
}