Skip to content

Commit

Permalink
feat: write in parallel to both table and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Aug 21, 2024
1 parent 736c441 commit 789288e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 44 deletions.
71 changes: 41 additions & 30 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ import (
)

const (
DISTRIBUTED_LOGS_TABLE = "distributed_logs"
DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes"
DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2"
DISTRIBUTED_LOGS_RESOURCE_BUCKET_V2 = "distributed_logs_v2_resource_bucket"
DISTRIBUTES_LOGS_RESOURCE_BUCKET_V2_SECONDS = 1800
DISTRIBUTED_LOGS_TABLE = "distributed_logs"
DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes"
DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2"
DISTRIBUTED_LOGS_RESOURCE_V2 = "distributed_logs_v2_resource"
DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS = 1800
)

type clickhouseLogsExporter struct {
Expand Down Expand Up @@ -314,7 +314,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
ts = ots
}

lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTES_LOGS_RESOURCE_BUCKET_V2_SECONDS)
lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS)

if _, exists := resourcesSeen[int64(lBucketStart)]; !exists {
resourcesSeen[int64(lBucketStart)] = map[string]string{}
Expand Down Expand Up @@ -396,7 +396,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L

insertResourcesStmtV2, err = e.db.PrepareBatch(
ctx,
fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_BUCKET_V2),
fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_V2),
driver.WithReleaseConnection(),
)
if err != nil {
Expand All @@ -413,31 +413,34 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
}

dbWriteStart := time.Now()
err = statement.Send()
// insert to the new table
errV2 := insertLogsStmtV2.Send()
// insert into the resource bucket table
errResource := insertResourcesStmtV2.Send()
if err != nil {
return fmt.Errorf("couldn't send batch insert resources statement:%w", err)
var wg sync.WaitGroup
chErr := make(chan error, 3)
chDuration := make(chan time.Duration, 3)

wg.Add(3)
go send(statement, chDuration, chErr, &wg)
go send(insertLogsStmtV2, chDuration, chErr, &wg)
go send(insertResourcesStmtV2, chDuration, chErr, &wg)
wg.Wait()
close(chErr)

// store the duration to send the data
for _, table := range []string{DISTRIBUTED_LOGS_TABLE, DISTRIBUTED_LOGS_TABLE_V2, DISTRIBUTED_LOGS_RESOURCE_V2} {
duration := <-chDuration
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, component.DataTypeLogs.String()),
tag.Upsert(tableKey, table),
},
writeLatencyMillis.M(int64(duration.Milliseconds())),
)
}

stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, component.DataTypeLogs.String()),
tag.Upsert(tableKey, DISTRIBUTED_LOGS_TABLE),
},
writeLatencyMillis.M(int64(time.Since(dbWriteStart).Milliseconds())),
)
if err != nil {
return fmt.Errorf("StatementSend:%w", err)
}
if errV2 != nil {
return fmt.Errorf("StatementSendV2:%w", err)
}
if errResource != nil {
return fmt.Errorf("ResourceStatementSendV2:%w", err)
// check the errors
for i := 0; i < 3; i++ {
if r := <-chErr; r != nil {
return fmt.Errorf("StatementSend:%w", err)
}
}

duration := time.Since(start)
Expand Down Expand Up @@ -466,6 +469,14 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
}

func send(statement driver.Batch, durationCh chan<- time.Duration, chErr chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
err := statement.Send()
chErr <- err
durationCh <- time.Since(start)
}

type attributesToSliceResponse struct {
StringKeys []string
StringValues []string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,3 @@ DROP TABLE IF EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CL
DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}};


ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64') CODEC(ZSTD(1));
ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64') CODEC(ZSTD(1));
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}
CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}}
(
`labels` String CODEC(ZSTD(5)),
`fingerprint` String CODEC(ZSTD(1)),
`seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)),
INDEX idx_labels lower(labels) TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1
)
ENGINE = ReplacingMergeTree
ENGINE = {{.SIGNOZ_REPLICATED}}ReplacingMergeTree
PARTITION BY toDate(seen_at_ts_bucket_start / 1000)
ORDER BY (labels, fingerprint, seen_at_ts_bucket_start)
SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192;


CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}
CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}}
(
`labels` String CODEC(ZSTD(5)),
`fingerprint` String CODEC(ZSTD(1)),
`seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1))
)
ENGINE = Distributed('cluster', 'signoz_logs', 'logs_v2_resource_bucket', cityHash64(labels, fingerprint, seen_at_ts_bucket_start));
ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2_resource', cityHash64(labels, fingerprint));


CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
Expand Down Expand Up @@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
INDEX attributes_int64_idx_val mapValues(attributes_number) TYPE bloom_filter GRANULARITY 1,
INDEX attributes_bool_idx_key mapKeys(attributes_bool) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1
)
ENGINE = MergeTree
ENGINE = {{.SIGNOZ_REPLICATED}}MergeTree
PARTITION BY toDate(timestamp / 1000000000)
ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id)
SETTINGS index_granularity = 8192;
Expand Down Expand Up @@ -80,11 +80,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ
`scope_version` String CODEC(ZSTD(1)),
`scope_string` Map(String, String) CODEC(ZSTD(1))
)
ENGINE = Distributed('cluster', 'signoz_logs', 'logs_v2', cityHash64(id));


ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64', 'number') CODEC(ZSTD(1));
ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagDataType Enum('string', 'bool', 'int64', 'float64', 'number') CODEC(ZSTD(1));
ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2', cityHash64(id));

-- remove the old mv
DROP TABLE IF EXISTS signoz_logs.resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}};
Expand Down

0 comments on commit 789288e

Please sign in to comment.