From 789288e3605989fd68da2a90683cdc250181ec9d Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Wed, 21 Aug 2024 14:33:09 +0530 Subject: [PATCH] feat: write in parallel to both table and minor fixes --- exporter/clickhouselogsexporter/exporter.go | 71 +++++++++++-------- .../migrations/000014_new_schema.down.sql | 4 -- .../logs/migrations/000014_new_schema.up.sql | 16 ++--- 3 files changed, 47 insertions(+), 44 deletions(-) diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index ef55c529..e48e31f3 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -42,11 +42,11 @@ import ( ) const ( - DISTRIBUTED_LOGS_TABLE = "distributed_logs" - DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" - DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2" - DISTRIBUTED_LOGS_RESOURCE_BUCKET_V2 = "distributed_logs_v2_resource_bucket" - DISTRIBUTES_LOGS_RESOURCE_BUCKET_V2_SECONDS = 1800 + DISTRIBUTED_LOGS_TABLE = "distributed_logs" + DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" + DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2" + DISTRIBUTED_LOGS_RESOURCE_V2 = "distributed_logs_v2_resource" + DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS = 1800 ) type clickhouseLogsExporter struct { @@ -314,7 +314,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L ts = ots } - lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTES_LOGS_RESOURCE_BUCKET_V2_SECONDS) + lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS) if _, exists := resourcesSeen[int64(lBucketStart)]; !exists { resourcesSeen[int64(lBucketStart)] = map[string]string{} @@ -396,7 +396,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L insertResourcesStmtV2, err = e.db.PrepareBatch( ctx, - fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_BUCKET_V2), + fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_V2), driver.WithReleaseConnection(), ) if err != nil { @@ -413,31 +413,34 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } } - dbWriteStart := time.Now() - err = statement.Send() - // insert to the new table - errV2 := insertLogsStmtV2.Send() - // insert into the resource bucket table - errResource := insertResourcesStmtV2.Send() - if err != nil { - return fmt.Errorf("couldn't send batch insert resources statement:%w", err) + var wg sync.WaitGroup + chErr := make(chan error, 3) + chDuration := make(chan time.Duration, 3) + + wg.Add(3) + go send(statement, chDuration, chErr, &wg) + go send(insertLogsStmtV2, chDuration, chErr, &wg) + go send(insertResourcesStmtV2, chDuration, chErr, &wg) + wg.Wait() + close(chErr) + + // store the duration to send the data + for _, table := range []string{DISTRIBUTED_LOGS_TABLE, DISTRIBUTED_LOGS_TABLE_V2, DISTRIBUTED_LOGS_RESOURCE_V2} { + duration := <-chDuration + stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(exporterKey, component.DataTypeLogs.String()), + tag.Upsert(tableKey, table), + }, + writeLatencyMillis.M(int64(duration.Milliseconds())), + ) } - stats.RecordWithTags(ctx, - []tag.Mutator{ - tag.Upsert(exporterKey, component.DataTypeLogs.String()), - tag.Upsert(tableKey, DISTRIBUTED_LOGS_TABLE), - }, - writeLatencyMillis.M(int64(time.Since(dbWriteStart).Milliseconds())), - ) - if err != nil { - return fmt.Errorf("StatementSend:%w", err) - } - if errV2 != nil { - return fmt.Errorf("StatementSendV2:%w", err) - } - if errResource != nil { - return fmt.Errorf("ResourceStatementSendV2:%w", err) + // check the errors + for i := 0; i < 3; i++ { + if r := <-chErr; r != nil { + return fmt.Errorf("StatementSend:%w", err) + } } duration := time.Since(start) @@ -466,6 +469,14 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } } +func send(statement driver.Batch, durationCh chan<- time.Duration, chErr chan<- error, wg *sync.WaitGroup) { + defer wg.Done() + start := time.Now() + err := statement.Send() + chErr <- err + durationCh <- time.Since(start) +} + type attributesToSliceResponse struct { StringKeys []string StringValues []string diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql index 7686148b..6f6531de 100644 --- a/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql @@ -37,7 +37,3 @@ DROP TABLE IF EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CL DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}; DROP TABLE IF EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}; DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}; - - -ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64') CODEC(ZSTD(1)); -ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64') CODEC(ZSTD(1)); \ No newline at end of file diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql index e8f60399..5122a42f 100644 --- a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql @@ -1,23 +1,23 @@ -CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}} +CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}} ( `labels` String CODEC(ZSTD(5)), `fingerprint` String CODEC(ZSTD(1)), `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)), INDEX idx_labels lower(labels) TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1 ) -ENGINE = ReplacingMergeTree +ENGINE = {{.SIGNOZ_REPLICATED}}ReplacingMergeTree PARTITION BY toDate(seen_at_ts_bucket_start / 1000) ORDER BY (labels, fingerprint, seen_at_ts_bucket_start) SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192; -CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}} +CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}} ( `labels` String CODEC(ZSTD(5)), `fingerprint` String CODEC(ZSTD(1)), `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)) ) -ENGINE = Distributed('cluster', 'signoz_logs', 'logs_v2_resource_bucket', cityHash64(labels, fingerprint, seen_at_ts_bucket_start)); +ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2_resource', cityHash64(labels, fingerprint)); CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} @@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} INDEX attributes_int64_idx_val mapValues(attributes_number) TYPE bloom_filter GRANULARITY 1, INDEX attributes_bool_idx_key mapKeys(attributes_bool) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1 ) -ENGINE = MergeTree +ENGINE = {{.SIGNOZ_REPLICATED}}MergeTree PARTITION BY toDate(timestamp / 1000000000) ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id) SETTINGS index_granularity = 8192; @@ -80,11 +80,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ `scope_version` String CODEC(ZSTD(1)), `scope_string` Map(String, String) CODEC(ZSTD(1)) ) -ENGINE = Distributed('cluster', 'signoz_logs', 'logs_v2', cityHash64(id)); - - -ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64', 'number') CODEC(ZSTD(1)); -ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64', 'number') CODEC(ZSTD(1)); +ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2', cityHash64(id)); -- remove the old mv DROP TABLE IF EXISTS signoz_logs.resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}};