From 90263a7bf4b7ae66f02805c4b6cdbc7d4a264597 Mon Sep 17 00:00:00 2001 From: Vishal Sharma Date: Thu, 22 Aug 2024 09:38:16 +0530 Subject: [PATCH 01/15] chore: update db attribute names (#369) --- exporter/clickhousetracesexporter/clickhouse_exporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 8b795314..8419249c 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -206,9 +206,9 @@ func populateOtherDimensions(attributes pcommon.Map, span *Span) { span.MsgOperation = v.Str() } else if k == "db.system" { span.DBSystem = v.Str() - } else if k == "db.name" { + } else if k == "db.name" || k == "db.namespace" { span.DBName = v.Str() - } else if k == "db.operation" { + } else if k == "db.operation" || k == "db.operation.name" { span.DBOperation = v.Str() } else if k == "peer.service" { span.PeerService = v.Str() From 6d6ba57de934d35b1eadcc63b3146aa617715c77 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Fri, 23 Aug 2024 11:46:04 +0530 Subject: [PATCH 02/15] feat: support for new logs schema (#348) * feat: support for new logs schema * fix: exporter updated * feat: add gcp.project and source_type to fingerprint hierarchy * fix: update order by * fix: update order by * fix: use map for attributes and resources * fix: correct naming * feat: update body index * fix: store int64 in float64 * fix: add bloom filter for map numbers * feat: don't write to old table * fix: comment old migration * feat: update materialzied view * feat: write to old table as well * feat: address comments * feat: write in parallel to both table and minor fixes * fix: move source_type to same level as cloud.platform * feat: use locardinality for map keys * feat: fix duration collection * fix: add ttl to tables --------- Co-authored-by: Srikanth Chekuri --- exporter/clickhouselogsexporter/exporter.go | 262 ++++++++++++++++-- .../logsv2/fingerprint.go | 201 ++++++++++++++ .../logsv2/fingerprint_test.go | 41 +++ .../clickhouselogsexporter/logsv2/hash.go | 52 ++++ .../000011_add_instrumentation_scope.up.sql | 1 - .../migrations/000014_new_schema.down.sql | 39 +++ .../logs/migrations/000014_new_schema.up.sql | 117 ++++++++ 7 files changed, 683 insertions(+), 30 deletions(-) create mode 100644 exporter/clickhouselogsexporter/logsv2/fingerprint.go create mode 100644 exporter/clickhouselogsexporter/logsv2/fingerprint_test.go create mode 100644 exporter/clickhouselogsexporter/logsv2/hash.go create mode 100644 migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql create mode 100644 migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 8d8c989d..fcf18b91 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -27,6 +27,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" driver "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter/logsv2" "github.com/SigNoz/signoz-otel-collector/usage" "github.com/SigNoz/signoz-otel-collector/utils" "github.com/google/uuid" @@ -41,15 +42,19 @@ import ( ) const ( - DISTRIBUTED_LOGS_TABLE = "distributed_logs" - DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" + DISTRIBUTED_LOGS_TABLE = "distributed_logs" + DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" + DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2" + DISTRIBUTED_LOGS_RESOURCE_V2 = "distributed_logs_v2_resource" + DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS = 1800 ) type clickhouseLogsExporter struct { - id uuid.UUID - db clickhouse.Conn - insertLogsSQL string - ksuid ksuid.KSUID + id uuid.UUID + db clickhouse.Conn + insertLogsSQL string + insertLogsSQLV2 string + ksuid ksuid.KSUID logger *zap.Logger cfg *Config @@ -71,6 +76,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro } insertLogsSQL := renderInsertLogsSQL(cfg) + insertLogsSQLV2 := renderInsertLogsSQLV2(cfg) id := uuid.New() collector := usage.NewUsageCollector( id, @@ -93,15 +99,16 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro } return &clickhouseLogsExporter{ - id: id, - db: client, - insertLogsSQL: insertLogsSQL, - logger: logger, - cfg: cfg, - ksuid: ksuid.New(), - usageCollector: collector, - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), + id: id, + db: client, + insertLogsSQL: insertLogsSQL, + insertLogsSQLV2: insertLogsSQLV2, + logger: logger, + cfg: cfg, + ksuid: ksuid.New(), + usageCollector: collector, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), }, nil } @@ -202,7 +209,15 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs) return nil } +func tsBucket(ts int64, bucketSize int64) int64 { + return (int64(ts) / int64(bucketSize)) * int64(bucketSize) +} + func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.Logs) error { + resourcesSeen := map[int64]map[string]string{} + + var insertLogsStmtV2 driver.Batch + var insertResourcesStmtV2 driver.Batch var statement driver.Batch var tagStatement driver.Batch var err error @@ -214,6 +229,12 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L if tagStatement != nil { _ = tagStatement.Abort() } + if insertLogsStmtV2 != nil { + _ = insertLogsStmtV2.Abort() + } + if insertResourcesStmtV2 != nil { + _ = insertResourcesStmtV2.Abort() + } }() select { @@ -231,6 +252,11 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return fmt.Errorf("PrepareTagBatch:%w", err) } + insertLogsStmtV2, err = e.db.PrepareBatch(ctx, e.insertLogsSQLV2, driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareBatchV2:%w", err) + } + metrics := map[string]usage.Metric{} for i := 0; i < ld.ResourceLogs().Len(); i++ { @@ -240,7 +266,15 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L resources := attributesToSlice(res.Attributes(), true) - err := addAttrsToTagStatement(tagStatement, "resource", resources) + resourcesMap := attributesToMap(res.Attributes(), true) + + serializedRes, err := json.Marshal(res.Attributes().AsRaw()) + if err != nil { + return fmt.Errorf("couldn't serialize log resource JSON: %w", err) + } + resourceJson := string(serializedRes) + + err = addAttrsToTagStatement(tagStatement, "resource", resources) if err != nil { return err } @@ -254,6 +288,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeVersion := scope.Version() scopeAttributes := attributesToSlice(scope.Attributes(), true) + scopeMap := attributesToMap(scope.Attributes(), true) err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes) if err != nil { @@ -279,7 +314,19 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L ts = ots } + lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS) + + if _, exists := resourcesSeen[int64(lBucketStart)]; !exists { + resourcesSeen[int64(lBucketStart)] = map[string]string{} + } + fp, exists := resourcesSeen[int64(lBucketStart)][resourceJson] + if !exists { + fp = logsv2.CalculateFingerprint(res.Attributes().AsRaw(), logsv2.ResourceHierarchy()) + resourcesSeen[int64(lBucketStart)][resourceJson] = fp + } + attributes := attributesToSlice(r.Attributes(), false) + attrsMap := attributesToMap(r.Attributes(), false) err := addAttrsToTagStatement(tagStatement, "tag", attributes) if err != nil { @@ -289,6 +336,31 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L // remove after sometime attributes = addTemporaryUnderscoreSupport(attributes) + err = insertLogsStmtV2.Append( + uint64(lBucketStart), + fp, + ts, + ots, + e.ksuid.String(), + utils.TraceIDToHexOrEmptyString(r.TraceID()), + utils.SpanIDToHexOrEmptyString(r.SpanID()), + uint32(r.Flags()), + r.SeverityText(), + uint8(r.SeverityNumber()), + getStringifiedBody(r.Body()), + attrsMap.StringData, + attrsMap.NumberData, + attrsMap.BoolData, + resourcesMap.StringData, + scopeName, + scopeVersion, + scopeMap.StringData, + ) + if err != nil { + return fmt.Errorf("StatementAppendLogsV2:%w", err) + } + + // old table err = statement.Append( ts, ots, @@ -321,18 +393,56 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } } } - dbWriteStart := time.Now() - err = statement.Send() - stats.RecordWithTags(ctx, - []tag.Mutator{ - tag.Upsert(exporterKey, component.DataTypeLogs.String()), - tag.Upsert(tableKey, DISTRIBUTED_LOGS_TABLE), - }, - writeLatencyMillis.M(int64(time.Since(dbWriteStart).Milliseconds())), + + insertResourcesStmtV2, err = e.db.PrepareBatch( + ctx, + fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_V2), + driver.WithReleaseConnection(), ) if err != nil { - return fmt.Errorf("StatementSend:%w", err) + return fmt.Errorf("couldn't PrepareBatch for inserting resource fingerprints :%w", err) + } + + for bucketTs, resources := range resourcesSeen { + for resourceLabels, fingerprint := range resources { + insertResourcesStmtV2.Append( + resourceLabels, + fingerprint, + bucketTs, + ) + } + } + + var wg sync.WaitGroup + chErr := make(chan error, 3) + chDuration := make(chan statementSendDuration, 3) + + wg.Add(3) + go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg) + go send(insertLogsStmtV2, DISTRIBUTED_LOGS_TABLE_V2, chDuration, chErr, &wg) + go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg) + wg.Wait() + close(chErr) + + // store the duration for send the data + for i := 0; i < 3; i++ { + sendDuration := <-chDuration + stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(exporterKey, component.DataTypeLogs.String()), + tag.Upsert(tableKey, sendDuration.Name), + }, + writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())), + ) } + + // check the errors + for i := 0; i < 3; i++ { + if r := <-chErr; r != nil { + return fmt.Errorf("StatementSend:%w", err) + } + } + duration := time.Since(start) e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()), zap.String("cost", duration.String())) @@ -359,6 +469,22 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } } +type statementSendDuration struct { + Name string + duration time.Duration +} + +func send(statement driver.Batch, tableName string, durationCh chan<- statementSendDuration, chErr chan<- error, wg *sync.WaitGroup) { + defer wg.Done() + start := time.Now() + err := statement.Send() + chErr <- err + durationCh <- statementSendDuration{ + Name: tableName, + duration: time.Since(start), + } +} + type attributesToSliceResponse struct { StringKeys []string StringValues []string @@ -401,10 +527,10 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib time.Now(), v, tagType, - "int64", + "float64", nil, - attrs.IntValues[i], nil, + attrs.IntValues[i], ) if err != nil { return fmt.Errorf("could not append number attribute to batch, err: %s", err) @@ -441,9 +567,40 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib return nil } +type attributeMap struct { + StringData map[string]string + NumberData map[string]float64 + BoolData map[string]bool +} + +func attributesToMap(attributes pcommon.Map, forceStringValues bool) (response attributeMap) { + response.BoolData = map[string]bool{} + response.StringData = map[string]string{} + response.NumberData = map[string]float64{} + attributes.Range(func(k string, v pcommon.Value) bool { + if forceStringValues { + // store everything as string + response.StringData[k] = v.AsString() + } else { + switch v.Type() { + case pcommon.ValueTypeInt: + response.NumberData[k] = float64(v.Int()) + case pcommon.ValueTypeDouble: + response.NumberData[k] = v.Double() + case pcommon.ValueTypeBool: + response.BoolData[k] = v.Bool() + default: // store it as string + response.StringData[k] = v.AsString() + } + } + + return true + }) + return response +} + func attributesToSlice(attributes pcommon.Map, forceStringValues bool) (response attributesToSliceResponse) { attributes.Range(func(k string, v pcommon.Value) bool { - // Support for . , remove the above section once done. if forceStringValues { // store everything as string response.StringKeys = append(response.StringKeys, k) @@ -519,7 +676,7 @@ const ( body, resources_string_key, resources_string_value, - attributes_string_key, + attributes_string_key, attributes_string_value, attributes_int64_key, attributes_int64_value, @@ -558,6 +715,49 @@ const ( )` ) +const ( + // language=ClickHouse SQL + insertLogsSQLTemplateV2 = `INSERT INTO %s.%s ( + ts_bucket_start, + resource_fingerprint, + timestamp, + observed_timestamp, + id, + trace_id, + span_id, + trace_flags, + severity_text, + severity_number, + body, + attributes_string, + attributes_number, + attributes_bool, + resources_string, + scope_name, + scope_version, + scope_string + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + )` +) + // newClickhouseClient create a clickhouse client. func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, error) { // use empty database to create database @@ -588,3 +788,7 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro func renderInsertLogsSQL(cfg *Config) string { return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE) } + +func renderInsertLogsSQLV2(cfg *Config) string { + return fmt.Sprintf(insertLogsSQLTemplateV2, databaseName, DISTRIBUTED_LOGS_TABLE_V2) +} diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint.go b/exporter/clickhouselogsexporter/logsv2/fingerprint.go new file mode 100644 index 00000000..9bb55666 --- /dev/null +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint.go @@ -0,0 +1,201 @@ +package logsv2 + +import ( + "fmt" + "strings" +) + +type DimensionHierarchyNode struct { + // labels that map to this node in dimension hierarchy + labels []string + + // List of potential subhierachies in the order of preference. + // Eg: k8s.cluster.name can have subhierarchies of k8s.namespace.name or k8s.node.name - the 2 ways of grouping/organizing k8s resources. + // In most cases this list will have only one entry + subHierachies []DimensionHierarchyNode +} + +type IdLabelValue struct { + Label string + Value any +} + +// Returns list of dimension labels for a set of attributes for a DimensionHierarchy +func (node *DimensionHierarchyNode) Identifier(attributes map[string]any) []IdLabelValue { + result := []IdLabelValue{} + + for _, l := range node.labels { + if lVal, exists := attributes[l]; exists { + result = append(result, IdLabelValue{ + Label: l, + Value: lVal, + }) + break + } + } + + for _, s := range node.subHierachies { + subLabels := s.Identifier(attributes) + if len(subLabels) > 0 { + result = append(result, subLabels...) + break + } + } + + return result +} + +// TODO(Raj/Nitya): Consider parsing this stuff out from json +func ResourceHierarchy() *DimensionHierarchyNode { + return &DimensionHierarchyNode{ + labels: []string{ + "cloud.provider", + }, + subHierachies: []DimensionHierarchyNode{{ + labels: []string{"cloud.account.id"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{"gcp.project"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "cloud.region", + "aws.region", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "cloud.platform", + "source_type", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.cluster.name", + "k8s.cluster.uid", + "aws.ecs.cluster.arn", + }, + + subHierachies: []DimensionHierarchyNode{ + // Logical/service oriented view + { + labels: []string{ + "service.namespace", + "k8s.namespace.name", + "ec2.tag.service-group", // is this standard enough? + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "service.name", + "cloudwatch.log.group.name", + "k8s.deployment.name", + "k8s.deployment.uid", + "k8s.statefulset.name", + "k8s.statefulset.uid", + "k8s.daemonset.name", + "k8s.daemonset.uid", + "k8s.job.name", + "k8s.job.uid", + "k8s.cronjob.name", + "k8s.cronjob.uid", + "faas.name", + "ec2.tag.service", // is this standard enough? + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "deployment.environment", + "ec2.tag.env-short", // is this standard enough? + "ec2.tag.env", // is this standard enough? + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "service.instance.id", + "k8s.pod.name", + "k8s.pod.uid", + "aws.ecs.task.id", + "aws.ecs.task.arn", + "cloudwatch.log.stream", + "cloud.resource_id", + "faas.instance", + "host.id", + "host.name", + "host.ip", + "host", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.container.name", + "container.name", + "container_name", + }, + }}, + }}, + }}, + }}, + }, + + // Node oriented view + { + labels: []string{"cloud.availability_zone"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.node.name", + "k8s.node.uid", + "host.id", + "host.name", + "host.ip", + "host", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.pod.name", + "k8s.pod.uid", + "aws.ecs.task.id", + "aws.ecs.task.arn", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.container.name", + "container.name", + }, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + } +} + +// Calculates fingerprint for attributes that when sorted would keep fingerprints +// for the same set of attributes next to each other while also colocating +// entries at all levels of the hierarchy +// For example, fingerprints like "k8s.deployment.name=webserver;k8s.pod.name=webserver-0" +// will calculate logs for each webserver pod while also calculate all logs for the webserver deployment +func CalculateFingerprint( + attributes map[string]any, hierarchy *DimensionHierarchyNode, +) string { + id := hierarchy.Identifier(attributes) + + fingerprintParts := []string{} + for _, idLabel := range id { + fingerprintParts = append( + fingerprintParts, fmt.Sprintf("%s=%s", idLabel.Label, idLabel.Value), + ) + } + + hash := FingerprintHash(attributes) + fingerprintParts = append(fingerprintParts, fmt.Sprintf("%s=%v", "hash", hash)) + + return strings.Join(fingerprintParts, ";") +} diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go b/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go new file mode 100644 index 00000000..65f55086 --- /dev/null +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go @@ -0,0 +1,41 @@ +package logsv2 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculateFingerprint(t *testing.T) { + testCases := []struct { + Name string + ResourceAttrs map[string]any + FingerPrint string + }{ + { + Name: "Random resource attr", + ResourceAttrs: map[string]any{"a": "b"}, + FingerPrint: "hash=15182603570120227210", + }, + { + Name: "Few attrs from the hierarchy", + ResourceAttrs: map[string]any{"ec2.tag.env": "fn-prod", "host.image.id": "ami-fce3c696"}, + FingerPrint: "ec2.tag.env=fn-prod;hash=5580615729524003981", + }, + { + Name: "More than one attrs from the hierarchy", + ResourceAttrs: map[string]any{"cloudwatch.log.stream": "mystr", "ec2.tag.env": "fn-prod", "host.image.id": "ami-fce3c696"}, + FingerPrint: "ec2.tag.env=fn-prod;cloudwatch.log.stream=mystr;hash=10649409385811604510", + }, + { + Name: "Vector and gcp", + ResourceAttrs: map[string]any{"gcp.project": "myproject", "source_type": "gcp", "random_key": "val"}, + FingerPrint: "gcp.project=myproject;source_type=gcp;hash=11162778839006855273", + }, + } + + for _, ts := range testCases { + res := CalculateFingerprint(ts.ResourceAttrs, ResourceHierarchy()) + assert.Equal(t, ts.FingerPrint, res) + } +} diff --git a/exporter/clickhouselogsexporter/logsv2/hash.go b/exporter/clickhouselogsexporter/logsv2/hash.go new file mode 100644 index 00000000..6592e473 --- /dev/null +++ b/exporter/clickhouselogsexporter/logsv2/hash.go @@ -0,0 +1,52 @@ +package logsv2 + +import ( + "fmt" + "sort" +) + +const ( + offset64 uint64 = 14695981039346656037 + prime64 uint64 = 1099511628211 + separatorByte byte = 255 +) + +// hashAdd adds a string to a fnv64a hash value, returning the updated hash. +func hashAdd(h uint64, s string) uint64 { + for i := 0; i < len(s); i++ { + h ^= uint64(s[i]) + h *= prime64 + } + return h +} + +// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash. +func hashAddByte(h uint64, b byte) uint64 { + h ^= uint64(b) + h *= prime64 + return h +} + +// FingerprintHash calculates a fingerprint of SORTED BY NAME labels. +// It is adopted from labelSetToFingerprint, but avoids type conversions and memory allocations. +func FingerprintHash(attribs map[string]any) uint64 { + if len(attribs) == 0 { + return offset64 + } + + keys := []string{} + for k, _ := range attribs { + keys = append(keys, k) + } + + sort.Strings(keys) + + sum := offset64 + for _, k := range keys { + sum = hashAdd(sum, k) + sum = hashAddByte(sum, separatorByte) + sum = hashAdd(sum, fmt.Sprintf("%v", attribs[k])) + sum = hashAddByte(sum, separatorByte) + } + return sum +} diff --git a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql index 2e908e57..3f65f18b 100644 --- a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql +++ b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql @@ -11,7 +11,6 @@ -- ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS instrumentation_scope_attributes_string_value Array(String) CODEC(ZSTD(1)) -- ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD INDEX IF NOT EXISTS instrumentation_scope_idx (instrumentation_scope) TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4 - -- Don't run the commands below ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource', 'scope') CODEC(ZSTD(1)); ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource', 'scope') CODEC(ZSTD(1)); diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql new file mode 100644 index 00000000..6f6531de --- /dev/null +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql @@ -0,0 +1,39 @@ +DROP TABLE IF EXISTS signoz_logs.resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_string_key) as name, 'String' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_int64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_int64_key) as name, 'Int64' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_float64_key) as name, 'Float64' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_bool_key) as name, 'Bool' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_resource_keys AS +SELECT +distinct arrayJoin(resources_string_key) as name, 'String' datatype +FROM signoz_logs.logs +ORDER BY name; + +DROP TABLE IF EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}; diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql new file mode 100644 index 00000000..5b42c686 --- /dev/null +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql @@ -0,0 +1,117 @@ +CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `labels` String CODEC(ZSTD(5)), + `fingerprint` String CODEC(ZSTD(1)), + `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)), + INDEX idx_labels lower(labels) TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1 +) +ENGINE = {{.SIGNOZ_REPLICATED}}ReplacingMergeTree +PARTITION BY toDate(seen_at_ts_bucket_start / 1000) +ORDER BY (labels, fingerprint, seen_at_ts_bucket_start) +TTL toDateTime(seen_at_ts_bucket_start) + INTERVAL 1296000 SECOND + INTERVAL 1800 SECOND DELETE +SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192; + + +CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `labels` String CODEC(ZSTD(5)), + `fingerprint` String CODEC(ZSTD(1)), + `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)) +) +ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2_resource', cityHash64(labels, fingerprint)); + + +CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `ts_bucket_start` UInt64 CODEC(DoubleDelta, LZ4), + `resource_fingerprint` String CODEC(ZSTD(1)), + `timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `observed_timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `id` String CODEC(ZSTD(1)), + `trace_id` String CODEC(ZSTD(1)), + `span_id` String CODEC(ZSTD(1)), + `trace_flags` UInt32, + `severity_text` LowCardinality(String) CODEC(ZSTD(1)), + `severity_number` UInt8, + `body` String CODEC(ZSTD(2)), + `attributes_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `attributes_number` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)), + `attributes_bool` Map(LowCardinality(String), Bool) CODEC(ZSTD(1)), + `resources_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `scope_name` String CODEC(ZSTD(1)), + `scope_version` String CODEC(ZSTD(1)), + `scope_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + INDEX body_idx lower(body) TYPE ngrambf_v1(4, 60000, 5, 0) GRANULARITY 1, + INDEX id_minmax id TYPE minmax GRANULARITY 1, + INDEX severity_number_idx severity_number TYPE set(25) GRANULARITY 4, + INDEX severity_text_idx severity_text TYPE set(25) GRANULARITY 4, + INDEX trace_flags_idx trace_flags TYPE bloom_filter GRANULARITY 4, + INDEX scope_name_idx scope_name TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4, + INDEX attributes_string_idx_key mapKeys(attributes_string) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1, + INDEX attributes_string_idx_val mapValues(attributes_string) TYPE ngrambf_v1(4, 5000, 2, 0) GRANULARITY 1, + INDEX attributes_int64_idx_key mapKeys(attributes_number) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1, + INDEX attributes_int64_idx_val mapValues(attributes_number) TYPE bloom_filter GRANULARITY 1, + INDEX attributes_bool_idx_key mapKeys(attributes_bool) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1 +) +ENGINE = {{.SIGNOZ_REPLICATED}}MergeTree +PARTITION BY toDate(timestamp / 1000000000) +ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id) +TTL toDateTime(timestamp / 1000000000) + INTERVAL 1296000 SECOND DELETE +SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192; + + + +CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `ts_bucket_start` UInt64 CODEC(DoubleDelta, LZ4), + `resource_fingerprint` String CODEC(ZSTD(1)), + `timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `observed_timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `id` String CODEC(ZSTD(1)), + `trace_id` String CODEC(ZSTD(1)), + `span_id` String CODEC(ZSTD(1)), + `trace_flags` UInt32, + `severity_text` LowCardinality(String) CODEC(ZSTD(1)), + `severity_number` UInt8, + `body` String CODEC(ZSTD(2)), + `attributes_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `attributes_number` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)), + `attributes_bool` Map(LowCardinality(String), Bool) CODEC(ZSTD(1)), + `resources_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `scope_name` String CODEC(ZSTD(1)), + `scope_version` String CODEC(ZSTD(1)), + `scope_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)) +) +ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2', cityHash64(id)); + +-- remove the old mv +DROP TABLE IF EXISTS signoz_logs.resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_int64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; + + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(mapKeys(attributes_string)) as name, 'String' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(mapKeys(attributes_number)) as name, 'Float64' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(mapKeys(attributes_bool)) as name, 'Bool' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_resource_keys AS +SELECT +distinct arrayJoin(mapKeys(resources_string)) as name, 'String' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; \ No newline at end of file From 9c3fe7f90aba6e2a709fb188235260ab6698bc63 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Fri, 23 Aug 2024 15:21:50 +0530 Subject: [PATCH 03/15] feat: hardcode replicated for the new logs tables (#370) --- .../migrators/logs/migrations/000014_new_schema.up.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql index 5b42c686..87597c1b 100644 --- a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource ON CLUSTER {{.SIGNOZ_CL `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)), INDEX idx_labels lower(labels) TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1 ) -ENGINE = {{.SIGNOZ_REPLICATED}}ReplacingMergeTree +ENGINE = ReplicatedReplacingMergeTree PARTITION BY toDate(seen_at_ts_bucket_start / 1000) ORDER BY (labels, fingerprint, seen_at_ts_bucket_start) TTL toDateTime(seen_at_ts_bucket_start) + INTERVAL 1296000 SECOND + INTERVAL 1800 SECOND DELETE @@ -53,7 +53,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} INDEX attributes_int64_idx_val mapValues(attributes_number) TYPE bloom_filter GRANULARITY 1, INDEX attributes_bool_idx_key mapKeys(attributes_bool) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1 ) -ENGINE = {{.SIGNOZ_REPLICATED}}MergeTree +ENGINE = ReplicatedMergeTree PARTITION BY toDate(timestamp / 1000000000) ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id) TTL toDateTime(timestamp / 1000000000) + INTERVAL 1296000 SECOND DELETE From 06bf82d6d8d8de78a2b41a5221abaede1377f904 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 25 Aug 2024 01:22:53 +0530 Subject: [PATCH 04/15] Use correct error variable for wrapped error (#371) --- exporter/clickhouselogsexporter/exporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index fcf18b91..0037a150 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -191,7 +191,7 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs) if err != nil { // StatementSend:code: 252, message: Too many partitions for single INSERT block // iterating twice since we want to try once after removing the old data - if i == 1 || !strings.Contains(err.Error(), "StatementSend:code: 252") { + if i == 1 || !strings.Contains(err.Error(), "code: 252") { // TODO(nitya): after returning it will be retried, ideally it should be pushed to DLQ return err } @@ -439,7 +439,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L // check the errors for i := 0; i < 3; i++ { if r := <-chErr; r != nil { - return fmt.Errorf("StatementSend:%w", err) + return fmt.Errorf("StatementSend:%w", r) } } From a32434734df0b79c419ba3d97e37bd0f9aae8987 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Tue, 27 Aug 2024 16:09:03 +0530 Subject: [PATCH 05/15] fix: write int64 tag attributes for old schema (#373) * fix: write int64 tag attributes for old schema * fix: add test config --- exporter/clickhouselogsexporter/config.go | 1 + exporter/clickhouselogsexporter/config_test.go | 6 +++++- exporter/clickhouselogsexporter/exporter.go | 18 +++++++++++++----- .../testdata/config.yaml | 3 +++ 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/exporter/clickhouselogsexporter/config.go b/exporter/clickhouselogsexporter/config.go index 4eb49f2d..33b3f0c1 100644 --- a/exporter/clickhouselogsexporter/config.go +++ b/exporter/clickhouselogsexporter/config.go @@ -34,6 +34,7 @@ type Config struct { DSN string `mapstructure:"dsn"` // Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false. DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"` + UseNewSchema bool `mapstructure:"use_new_schema" default:"false"` } var ( diff --git a/exporter/clickhouselogsexporter/config_test.go b/exporter/clickhouselogsexporter/config_test.go index 1b6ab332..7fcf3400 100644 --- a/exporter/clickhouselogsexporter/config_test.go +++ b/exporter/clickhouselogsexporter/config_test.go @@ -37,7 +37,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, len(cfg.Exporters), 2) + assert.Equal(t, len(cfg.Exporters), 3) defaultCfg := factory.CreateDefaultConfig() defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000/?dial_timeout=5s" @@ -64,4 +64,8 @@ func TestLoadConfig(t *testing.T) { QueueSize: 100, }, }) + + defaultCfg.(*Config).UseNewSchema = true + r2 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "new_schema")] + assert.Equal(t, r2, defaultCfg) } diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 0037a150..213e7e87 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -63,6 +63,8 @@ type clickhouseLogsExporter struct { wg *sync.WaitGroup closeChan chan struct{} + + useNewSchema bool } func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) { @@ -109,6 +111,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro usageCollector: collector, wg: new(sync.WaitGroup), closeChan: make(chan struct{}), + useNewSchema: cfg.UseNewSchema, }, nil } @@ -274,7 +277,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } resourceJson := string(serializedRes) - err = addAttrsToTagStatement(tagStatement, "resource", resources) + err = addAttrsToTagStatement(tagStatement, "resource", resources, e.useNewSchema) if err != nil { return err } @@ -290,7 +293,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeAttributes := attributesToSlice(scope.Attributes(), true) scopeMap := attributesToMap(scope.Attributes(), true) - err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes) + err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes, e.useNewSchema) if err != nil { return err } @@ -328,7 +331,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L attributes := attributesToSlice(r.Attributes(), false) attrsMap := attributesToMap(r.Attributes(), false) - err := addAttrsToTagStatement(tagStatement, "tag", attributes) + err := addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema) if err != nil { return err } @@ -507,7 +510,7 @@ func getStringifiedBody(body pcommon.Value) string { return strBody } -func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse) error { +func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse, useNewSchema bool) error { for i, v := range attrs.StringKeys { err := statement.Append( time.Now(), @@ -522,12 +525,17 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib return fmt.Errorf("could not append string attribute to batch, err: %s", err) } } + + intTypeName := "int64" + if useNewSchema { + intTypeName = "float64" + } for i, v := range attrs.IntKeys { err := statement.Append( time.Now(), v, tagType, - "float64", + intTypeName, nil, nil, attrs.IntValues[i], diff --git a/exporter/clickhouselogsexporter/testdata/config.yaml b/exporter/clickhouselogsexporter/testdata/config.yaml index f209f571..21313296 100644 --- a/exporter/clickhouselogsexporter/testdata/config.yaml +++ b/exporter/clickhouselogsexporter/testdata/config.yaml @@ -19,6 +19,9 @@ exporters: multiplier: 1.3 sending_queue: queue_size: 100 + clickhouselogsexporter/new_schema: + dsn: tcp://127.0.0.1:9000/?dial_timeout=5s + use_new_schema: true service: pipelines: From bb0d7aeb40fdcfbf505aba60be25c1463413e41c Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Fri, 30 Aug 2024 10:43:37 +0530 Subject: [PATCH 06/15] feat: write to old table only if use_new_schema=false (#374) --- exporter/clickhouselogsexporter/exporter.go | 76 ++++++++++++--------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 213e7e87..6bf9d836 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -245,9 +245,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return errors.New("shutdown has been called") default: start := time.Now() - statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) - if err != nil { - return fmt.Errorf("PrepareBatch:%w", err) + chLen := 2 + if !e.useNewSchema { + chLen = 3 + statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareBatch:%w", err) + } } tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection()) @@ -364,31 +368,33 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } // old table - err = statement.Append( - ts, - ots, - e.ksuid.String(), - utils.TraceIDToHexOrEmptyString(r.TraceID()), - utils.SpanIDToHexOrEmptyString(r.SpanID()), - uint32(r.Flags()), - r.SeverityText(), - uint8(r.SeverityNumber()), - getStringifiedBody(r.Body()), - resources.StringKeys, - resources.StringValues, - attributes.StringKeys, - attributes.StringValues, - attributes.IntKeys, - attributes.IntValues, - attributes.FloatKeys, - attributes.FloatValues, - attributes.BoolKeys, - attributes.BoolValues, - scopeName, - scopeVersion, - scopeAttributes.StringKeys, - scopeAttributes.StringValues, - ) + if !e.useNewSchema { + err = statement.Append( + ts, + ots, + e.ksuid.String(), + utils.TraceIDToHexOrEmptyString(r.TraceID()), + utils.SpanIDToHexOrEmptyString(r.SpanID()), + uint32(r.Flags()), + r.SeverityText(), + uint8(r.SeverityNumber()), + getStringifiedBody(r.Body()), + resources.StringKeys, + resources.StringValues, + attributes.StringKeys, + attributes.StringValues, + attributes.IntKeys, + attributes.IntValues, + attributes.FloatKeys, + attributes.FloatValues, + attributes.BoolKeys, + attributes.BoolValues, + scopeName, + scopeVersion, + scopeAttributes.StringKeys, + scopeAttributes.StringValues, + ) + } if err != nil { return fmt.Errorf("StatementAppend:%w", err) } @@ -417,18 +423,20 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } var wg sync.WaitGroup - chErr := make(chan error, 3) - chDuration := make(chan statementSendDuration, 3) + chErr := make(chan error, chLen) + chDuration := make(chan statementSendDuration, chLen) - wg.Add(3) - go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg) + wg.Add(chLen) + if !e.useNewSchema { + go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg) + } go send(insertLogsStmtV2, DISTRIBUTED_LOGS_TABLE_V2, chDuration, chErr, &wg) go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg) wg.Wait() close(chErr) // store the duration for send the data - for i := 0; i < 3; i++ { + for i := 0; i < chLen; i++ { sendDuration := <-chDuration stats.RecordWithTags(ctx, []tag.Mutator{ @@ -440,7 +448,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } // check the errors - for i := 0; i < 3; i++ { + for i := 0; i < chLen; i++ { if r := <-chErr; r != nil { return fmt.Errorf("StatementSend:%w", r) } From 47e8ae2e5d987d4900ba9e7c80bb7ae07d0974d2 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Fri, 30 Aug 2024 13:47:17 +0530 Subject: [PATCH 07/15] feat: correct id generation for log timestamp (#376) --- exporter/clickhouselogsexporter/exporter.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 6bf9d836..7775bbe2 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -54,7 +54,6 @@ type clickhouseLogsExporter struct { db clickhouse.Conn insertLogsSQL string insertLogsSQLV2 string - ksuid ksuid.KSUID logger *zap.Logger cfg *Config @@ -107,7 +106,6 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro insertLogsSQLV2: insertLogsSQLV2, logger: logger, cfg: cfg, - ksuid: ksuid.New(), usageCollector: collector, wg: new(sync.WaitGroup), closeChan: make(chan struct{}), @@ -321,6 +319,12 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L ts = ots } + // generate the id from timestamp + id, err := ksuid.NewRandomWithTime(time.Unix(0, int64(ts))) + if err != nil { + return fmt.Errorf("IdGenError:%w", err) + } + lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS) if _, exists := resourcesSeen[int64(lBucketStart)]; !exists { @@ -335,7 +339,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L attributes := attributesToSlice(r.Attributes(), false) attrsMap := attributesToMap(r.Attributes(), false) - err := addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema) + err = addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema) if err != nil { return err } @@ -348,7 +352,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L fp, ts, ots, - e.ksuid.String(), + id.String(), utils.TraceIDToHexOrEmptyString(r.TraceID()), utils.SpanIDToHexOrEmptyString(r.SpanID()), uint32(r.Flags()), @@ -372,7 +376,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L err = statement.Append( ts, ots, - e.ksuid.String(), + id.String(), utils.TraceIDToHexOrEmptyString(r.TraceID()), utils.SpanIDToHexOrEmptyString(r.SpanID()), uint32(r.Flags()), @@ -398,7 +402,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L if err != nil { return fmt.Errorf("StatementAppend:%w", err) } - e.ksuid = e.ksuid.Next() } } } From 4fd59f26c12ddf76aa6cf0ae9828382de215d210 Mon Sep 17 00:00:00 2001 From: Favour Daniel Date: Thu, 5 Sep 2024 08:24:00 +0100 Subject: [PATCH 08/15] modified CI to reference Primus workflows --- .github/workflows/codeql-analysis.yaml | 49 -------------------------- .github/workflows/lint-and-test.yaml | 33 ++++++++--------- 2 files changed, 17 insertions(+), 65 deletions(-) delete mode 100644 .github/workflows/codeql-analysis.yaml diff --git a/.github/workflows/codeql-analysis.yaml b/.github/workflows/codeql-analysis.yaml deleted file mode 100644 index ca71f3df..00000000 --- a/.github/workflows/codeql-analysis.yaml +++ /dev/null @@ -1,49 +0,0 @@ -name: "CodeQL Analysis" - -on: - push: - branches: - - main - pull_request: - branches: - - main - schedule: - - cron: "0 22 * * 6" - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - steps: - - name: Checkout repository - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: 1.21 - - - name: Initialize CodeQL - uses: github/codeql-action/init@v2 - with: - languages: go - - - name: Autobuild - uses: github/codeql-action/autobuild@v2 - - - name: Install cross-compilation tools - run: | - set -ex - sudo apt-get update - sudo apt-get install -y gcc-aarch64-linux-gnu musl-tools - - - name: Build Artifacts - run: make build-all - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/lint-and-test.yaml b/.github/workflows/lint-and-test.yaml index 0d3bcd89..5e715e42 100644 --- a/.github/workflows/lint-and-test.yaml +++ b/.github/workflows/lint-and-test.yaml @@ -6,19 +6,20 @@ on: - "**" jobs: - lint-and-test: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.21 - - - name: Install tools - run: make install-ci - - - name: Run unit tests and lint - run: make test-and-lint + test: + uses: signoz/primus/.github/workflows/go-test.yaml@main + secrets: inherit + with: + PRIMUS_REF: main + GO_TEST_CONTEXT: ./... + GO_TEST_TAGS: testing + fmt: + uses: signoz/primus/.github/workflows/go-fmt.yaml@main + secrets: inherit + with: + PRIMUS_REF: main + lint: + uses: signoz/primus/.github/workflows/go-lint.yaml@main + secrets: inherit + with: + PRIMUS_REF: main \ No newline at end of file From 92ef891323693fa111dcf86eb59df636253ef0f3 Mon Sep 17 00:00:00 2001 From: Favour Daniel Date: Thu, 5 Sep 2024 08:34:18 +0100 Subject: [PATCH 09/15] renamed workflow file to CI --- .github/workflows/{lint-and-test.yaml => ci.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/workflows/{lint-and-test.yaml => ci.yaml} (100%) diff --git a/.github/workflows/lint-and-test.yaml b/.github/workflows/ci.yaml similarity index 100% rename from .github/workflows/lint-and-test.yaml rename to .github/workflows/ci.yaml From 39a8e9954c11483c8ee7f8fc2c57da89f9fbbf6f Mon Sep 17 00:00:00 2001 From: Favour Daniel Date: Thu, 5 Sep 2024 08:35:02 +0100 Subject: [PATCH 10/15] renamed workflow file to CI --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5e715e42..04cb8966 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,4 +1,4 @@ -name: test-pipeline +name: ci on: pull_request: From 1e29829c5aa4e05fbcfa202fc1124c215e88909f Mon Sep 17 00:00:00 2001 From: Favour Daniel Date: Thu, 5 Sep 2024 08:38:24 +0100 Subject: [PATCH 11/15] renamed workflow file to CI --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 04cb8966..1affd4f6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -3,7 +3,7 @@ name: ci on: pull_request: branches: - - "**" + - "*" jobs: test: From 02f139e088ad55cb09aaa99fcce5b27f6370a374 Mon Sep 17 00:00:00 2001 From: Favour Daniel Date: Thu, 5 Sep 2024 09:01:26 +0100 Subject: [PATCH 12/15] renamed workflow file to CI --- .github/workflows/ci.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1affd4f6..d3c59ce0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,7 +12,6 @@ jobs: with: PRIMUS_REF: main GO_TEST_CONTEXT: ./... - GO_TEST_TAGS: testing fmt: uses: signoz/primus/.github/workflows/go-fmt.yaml@main secrets: inherit From f2f184d3de4115c5153872a5aa8364aa15a31c2e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 5 Sep 2024 13:50:57 +0530 Subject: [PATCH 13/15] Add support for disabling v2 (#362) By default not disabled. This is not needed for any new customers. --- exporter/clickhousemetricsexporter/clickhouse.go | 9 +++++++++ exporter/clickhousemetricsexporter/config.go | 1 + exporter/clickhousemetricsexporter/config_test.go | 1 + exporter/clickhousemetricsexporter/exporter.go | 1 + exporter/clickhousemetricsexporter/factory.go | 1 + 5 files changed, 13 insertions(+) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 21407e2d..9b58300a 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -75,6 +75,7 @@ type clickHouse struct { prevShardCount uint64 watcherInterval time.Duration writeTSToV4 bool + disableV2 bool mWrittenTimeSeries prometheus.Counter @@ -89,6 +90,7 @@ type ClickHouseParams struct { MaxTimeSeriesInQuery int WatcherInterval time.Duration WriteTSToV4 bool + DisableV2 bool ExporterId uuid.UUID } @@ -139,6 +141,7 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { }), watcherInterval: params.WatcherInterval, writeTSToV4: params.WriteTSToV4, + disableV2: params.DisableV2, exporterID: params.ExporterId, } @@ -267,6 +270,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr ch.timeSeriesRW.Unlock() err := func() error { + if ch.disableV2 { + return nil + } statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE), driver.WithReleaseConnection()) if err != nil { return err @@ -306,6 +312,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr } err = func() error { + if ch.disableV2 { + return nil + } ctx := context.Background() statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", ch.database, DISTRIBUTED_SAMPLES_TABLE), driver.WithReleaseConnection()) diff --git a/exporter/clickhousemetricsexporter/config.go b/exporter/clickhousemetricsexporter/config.go index 736e35aa..d362ed6a 100644 --- a/exporter/clickhousemetricsexporter/config.go +++ b/exporter/clickhousemetricsexporter/config.go @@ -49,6 +49,7 @@ type Config struct { WatcherInterval time.Duration `mapstructure:"watcher_interval"` WriteTSToV4 bool `mapstructure:"write_ts_to_v4"` + DisableV2 bool `mapstructure:"disable_v2"` } // RemoteWriteQueue allows to configure the remote write queue. diff --git a/exporter/clickhousemetricsexporter/config_test.go b/exporter/clickhousemetricsexporter/config_test.go index 29f0a6ee..c6c562ca 100644 --- a/exporter/clickhousemetricsexporter/config_test.go +++ b/exporter/clickhousemetricsexporter/config_test.go @@ -87,6 +87,7 @@ func Test_loadConfig(t *testing.T) { ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, WatcherInterval: 30 * time.Second, WriteTSToV4: true, + DisableV2: false, }) } diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index 2b25ef1e..43263b5a 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -92,6 +92,7 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err MaxTimeSeriesInQuery: 50, WatcherInterval: cfg.WatcherInterval, WriteTSToV4: cfg.WriteTSToV4, + DisableV2: cfg.DisableV2, ExporterId: id, } ch, err := NewClickHouse(params) diff --git a/exporter/clickhousemetricsexporter/factory.go b/exporter/clickhousemetricsexporter/factory.go index eda2e0bb..67a418bb 100644 --- a/exporter/clickhousemetricsexporter/factory.go +++ b/exporter/clickhousemetricsexporter/factory.go @@ -126,5 +126,6 @@ func createDefaultConfig() component.Config { }, WatcherInterval: 30 * time.Second, WriteTSToV4: true, + DisableV2: false, } } From 4536e2e8306ec0fd4d2c9b18bf362db963370d24 Mon Sep 17 00:00:00 2001 From: Daniel Favour Date: Thu, 5 Sep 2024 09:24:27 +0100 Subject: [PATCH 14/15] Revert "modified CI to reference Primus workflows" (#381) Reverts SigNoz/signoz-otel-collector#378 This PR is reverted as GitHub doesn't allow public repositories to call workflows from private repositories. --- .github/workflows/ci.yaml | 24 ------------- .github/workflows/codeql-analysis.yaml | 49 ++++++++++++++++++++++++++ .github/workflows/lint-and-test.yaml | 24 +++++++++++++ 3 files changed, 73 insertions(+), 24 deletions(-) delete mode 100644 .github/workflows/ci.yaml create mode 100644 .github/workflows/codeql-analysis.yaml create mode 100644 .github/workflows/lint-and-test.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml deleted file mode 100644 index d3c59ce0..00000000 --- a/.github/workflows/ci.yaml +++ /dev/null @@ -1,24 +0,0 @@ -name: ci - -on: - pull_request: - branches: - - "*" - -jobs: - test: - uses: signoz/primus/.github/workflows/go-test.yaml@main - secrets: inherit - with: - PRIMUS_REF: main - GO_TEST_CONTEXT: ./... - fmt: - uses: signoz/primus/.github/workflows/go-fmt.yaml@main - secrets: inherit - with: - PRIMUS_REF: main - lint: - uses: signoz/primus/.github/workflows/go-lint.yaml@main - secrets: inherit - with: - PRIMUS_REF: main \ No newline at end of file diff --git a/.github/workflows/codeql-analysis.yaml b/.github/workflows/codeql-analysis.yaml new file mode 100644 index 00000000..ca71f3df --- /dev/null +++ b/.github/workflows/codeql-analysis.yaml @@ -0,0 +1,49 @@ +name: "CodeQL Analysis" + +on: + push: + branches: + - main + pull_request: + branches: + - main + schedule: + - cron: "0 22 * * 6" + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: go + + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + - name: Install cross-compilation tools + run: | + set -ex + sudo apt-get update + sudo apt-get install -y gcc-aarch64-linux-gnu musl-tools + + - name: Build Artifacts + run: make build-all + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/lint-and-test.yaml b/.github/workflows/lint-and-test.yaml new file mode 100644 index 00000000..0d3bcd89 --- /dev/null +++ b/.github/workflows/lint-and-test.yaml @@ -0,0 +1,24 @@ +name: test-pipeline + +on: + pull_request: + branches: + - "**" + +jobs: + lint-and-test: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.21 + + - name: Install tools + run: make install-ci + + - name: Run unit tests and lint + run: make test-and-lint From d1401eb8ead52e55df6d80ee0172341d2634c0d5 Mon Sep 17 00:00:00 2001 From: Daniel Favour Date: Thu, 5 Sep 2024 09:48:18 +0100 Subject: [PATCH 15/15] feat(github): update CI workflows to use Primus workflows (#382) The CI workflow is being updated to call the workflows from the Primus repository. --- .github/workflows/ci.yaml | 24 +++++++++++++ .github/workflows/codeql-analysis.yaml | 49 -------------------------- .github/workflows/lint-and-test.yaml | 24 ------------- 3 files changed, 24 insertions(+), 73 deletions(-) create mode 100644 .github/workflows/ci.yaml delete mode 100644 .github/workflows/codeql-analysis.yaml delete mode 100644 .github/workflows/lint-and-test.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..241741d6 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,24 @@ +name: ci + +on: + pull_request: + branches: + - "*" + +jobs: + test: + uses: signoz/primus.workflows/.github/workflows/go-test.yaml@main + secrets: inherit + with: + PRIMUS_REF: main + GO_TEST_CONTEXT: ./... + fmt: + uses: signoz/primus.workflows/.github/workflows/go-fmt.yaml@main + secrets: inherit + with: + PRIMUS_REF: main + lint: + uses: signoz/primus.workflows/.github/workflows/go-lint.yaml@main + secrets: inherit + with: + PRIMUS_REF: main \ No newline at end of file diff --git a/.github/workflows/codeql-analysis.yaml b/.github/workflows/codeql-analysis.yaml deleted file mode 100644 index ca71f3df..00000000 --- a/.github/workflows/codeql-analysis.yaml +++ /dev/null @@ -1,49 +0,0 @@ -name: "CodeQL Analysis" - -on: - push: - branches: - - main - pull_request: - branches: - - main - schedule: - - cron: "0 22 * * 6" - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - steps: - - name: Checkout repository - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: 1.21 - - - name: Initialize CodeQL - uses: github/codeql-action/init@v2 - with: - languages: go - - - name: Autobuild - uses: github/codeql-action/autobuild@v2 - - - name: Install cross-compilation tools - run: | - set -ex - sudo apt-get update - sudo apt-get install -y gcc-aarch64-linux-gnu musl-tools - - - name: Build Artifacts - run: make build-all - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/lint-and-test.yaml b/.github/workflows/lint-and-test.yaml deleted file mode 100644 index 0d3bcd89..00000000 --- a/.github/workflows/lint-and-test.yaml +++ /dev/null @@ -1,24 +0,0 @@ -name: test-pipeline - -on: - pull_request: - branches: - - "**" - -jobs: - lint-and-test: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.21 - - - name: Install tools - run: make install-ci - - - name: Run unit tests and lint - run: make test-and-lint